unitelabs.bus
unitelabs/bus/__init__.py
Packages
Attributes
- __all__= [ "Protocol", "Command", "Request", "Response", "TransportFactory", "create_usb_connection", "create_serial_connection", "create_usb_connection", "SerialTransport", "UsbTransport", "SerialDeviceManager", "testing" ]
Functions
create_serial_connection(- protocol_factory : typing.Callable[..., P],
- port : str,
- baudrate : int,
- bytesize : ByteSize,
- parity : Parity,
- stopbits : StopBits,
- **kwargs
Create a serial connection with the specified port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.portstrThe port of the serial device.baudrateint = 9600**kwargs= {}Additional keyword arguments to be passed to the `SerialTransport` constructor.create_usb_connection(- protocol_factory : typing.Callable[..., P],
- vendor : int,
- product : int,
- **kwargs
Create a USB connection with a device based on specified `vendor` and `product` IDs.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.**kwargs= {}Additional keyword arguments to be passed to the `UsbTransport` constructor.
Classes
Command
classGeneric Command that can be used with `Protocol.execute`. The first type parameter of the `Command` determines the type that the `Command` accepts on init and serialization, and the second type parameter determines the type returned by deserialization, e.g. `Command[str, typing.List[bool]]` would ingest strings and returns a list of booleans.
- Bases
- typing.Generic[InType_co, OutType_co]
Methods
__init__(- self,
- message : InType_co,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool]
messageThe contents of the message to be sent to the device, pre-serialization.timeouttyping.Optional[float] = NoneHow long is seconds to wait for a response.is_voidtyping.Optional[bool] = FalseIf true, does not return a response. Void commands ignore all response validations.result(self) -> typing.Optional[OutType_co]Deserializes the `Response.payload.result()` bytes.
typing.Optional[OutType_co]The deserialized `Response.payload`@abc.abstractmethod
serialize(self, message : typing.Optional[InType_co]) -> bytesSerializes the message into bytes. Should use `self.message` if `message` is None.
messagetyping.Optional[InType_co] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.@abc.abstractmethod
deserialize(self, response : typing.Optional[bytes]) -> typing.Optional[OutType_co]Deserializes the `response` bytes. Should call `self.result()` if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `self.result()`.typing.Optional[OutType_co]The deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished or 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolValidate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior when validating a response.
databytesThe bytes to evaluate for completeness.boolmatch_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.
Attributes
- receivertyping.Optional[Protocol] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
Request
classProtocols use `Request`s to specify data that is sent to a transport.
- Decorators
- dataclasses.dataclass
Methods
__init__(self, payload : bytes, timeout : typing.Optional[float]) -> Nonepayloadbytestimeouttyping.Optional[float] = None__post_init__(self) -> None
Attributes
- payloadbytes = None
- timeouttyping.Optional[float] = None
Response
classProtocols use `Response`s to specify data that is received from a transport.
- Decorators
- dataclasses.dataclass
Methods
__post_init__(self) -> None__handle_done(self, _payload : asyncio.Future[bytes]) -> NoneThe callback to be run when the payload `Future` becomes done.
_payloadasyncio.Future[bytes]The `Future` object.
Attributes
- requestRequest = None
TransportFactory
classInterface representing a factory for creating transports.
Protocol
classBase communication Protocol.
- Bases
- asyncio.Protocol
Methods
__init__(- self,
- transport_factory : TransportFactory,
- reconnect : bool,
- reconnect_delay : float,
- max_reconnect_attempts : int,
- autodetect : bool,
- max_parallel_commands : int,
- **kwargs
transport_factoryA callable used to create a connection to a transport.reconnectbool = TrueWhether or not to a attempt to reconnect to a device when the connection is lost.reconnect_delayfloat = DEFAULT_RECONNECT_DELAYHow long in seconds to wait between reconnection attempts.max_reconnect_attemptsHow many times to attempt to reconnect to a device before connection is considered lost.autodetectbool = FalseWhether or not to use autodetection for device connectivity.max_parallel_commandsThe maximum number of commands to process in parallel. This should be 1 for serial devices, but can be configured to allow more depending on the processing capacity of the usb device.**kwargs= {}additional kwargs, including kwargs for use with `TransportFactory`._connect_transport(self, **kwargs) -> NoneCreate a new transport instance.
**kwargs= {}@abc.abstractmethod
identity(self, **config_kwargs) -> boolMethod for validating the identity of the connected device. This method will call another user-defined method on `Protocol` and compare the device's response (i.e. the method's return value) to values provided in `config_kwargs`.
**config_kwargs= {}kwargs sent from `validate`boolTrue if the result of the inner call matches the expectation from `config_kwargs` else False.validate(- self,
- timeout : float,
- **validation_kwargs
This method will be called by user after `__init__` via `open`; it calls `identity` to determine if the connected device is the one that was expected. If autodetect=True this is called internally by `unitelabs.bus.utils.AutoDetector` to cycle through possile devices until the correct device or no device is found. `validation_kwargs` and `__init__` kwargs are stored on the `Protocol` such that they must only be provided once. Should there be values which overlap, `__init__` values are overwritten by values in `validation_kwargs`. `validation_kwargs` may contain stable information about the device. Check `unitelabs.bus.utils.device_manager` for more information about valid device filter kwargs.
timeoutfloatHow long in seconds to wait for a response from the device.**validation_kwargs= {}Kwargs to use to run validation of the device, in the case of use with `autodetect=True`, these kwargs will be stored on the first call for all future validations.boolPropagated return value from `identity`; True if `identity` returns True else Falseopen(- self,
- validation_timeout : float,
- **validation_kwargs
Open underlying `Transport`, establish a connection to a device and validate the device's identity.
validation_timeoutfloat = 1.0How long in seconds to wait for a response to `Protocol.validate`.**validation_kwargs= {}kwargs to be passed to `Protocol.validate` to test device identity against.close(self) -> NoneClose underlying `Transport`. Explicitly calling `close` will NOT attempt to reconnect to the `Transport`.
connection_made(self, transport : asyncio.Transport) -> NoneInvoked by `transport` when connection is made. Logs the connection.
transportasyncio.Transportconnection_lost(self, exc : typing.Optional[Exception]) -> NoneInvoked by transport when connection is lost. Attempts to reconnect after `reconnect_delay` seconds. Here `exc` can be None as a result of : - manual abort through direct call of transport's `abort` method - connection closing after `_safe_write` successfully wrote all data in write-buffer
exctyping.Optional[Exception] = Nonepause_writing(self) -> Noneresume_writing(self) -> Nonedata_received(self, data : bytes) -> NoneInvoked by transport when data is received. Logs the data and sets the response if not already set. Further invocations with the same `Response` will only be logged.
databytesThe data received.error_received(self, exc : typing.Union[Exception, type[Exception]]) -> NoneInvoked by transport when an error is received. Logs the error and sets the response if not already set. Further invocations with the same `Response` will only be logged.
exctyping.Union[Exception, type[Exception]]The error received.execute(self, command : Command[InType_co, OutType_co]) -> typing.Optional[OutType_co]Executes a `Command` by sending the `Request` within the `Command` to the `Transport`.
commandThe `Command` to be executed.typing.Optional[OutType_co]The deserialized response, created by `command.result()` or None if `Command.is_void` is True.
Attributes
- _transport_allows_writing= True
- _transport_factory= transport_factory
- _transport_kwargs= kwargs
- _commandslist[Command] = []
- max_parallel_commands= max_parallel_commands
- is_open= asyncio.Event()
- autodetect= autodetect
- _autodetector= None
- _validation_kwargs= None
- is_validated= asyncio.Event()
- reconnect= reconnect
- reconnect_delay= reconnect_delay
- remaining_reconnect_attempts= max_reconnect_attempts
- max_reconnect_attempts= max_reconnect_attempts
- loggerlogging.Logger = NoneA standard python logger.
- autodetectorAutoDetector = None
SerialTransport
classTransport for serial devices.
- MRO
- └── Transport
- └─── SerialTransport
Methods
_open(self) -> NoneOpens underlying serial port, if not already open.
_close(self) -> NoneCloses underlying serial port, if open.
_ensure_reader(self) -> None_poll_read(self) -> None_remove_reader(self) -> None_read(self) -> typing.Optional[bytes]typing.Optional[bytes]_ensure_writer(self) -> NoneAdds a writer to the loop if not already added.
_poll_write(self) -> None_remove_writer(self) -> NoneRemoves a writer from the loop.
_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- _serial= _serial
- _max_read_size= 1024
- _read_buffer= []
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P_co] = None
- _is_closing= True
UsbTransport
classTransport for devices connected via USB. By default, this implementation uses Interface 0 of Configuration 1 on the device.
- MRO
- └── Transport
- └─── UsbTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- protocol : asyncio.BaseProtocol,
- interface_index : int
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.protocolasyncio.BaseProtocolThe protocol instance to use for communication.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0._open(self) -> None_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> NoneRead data from the transport.
_ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data) -> Nonedataopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- vendor= vendor
- product= product
- _closing= True
- _protocol= protocol
- _interface_index= interface_index
- _devicetyping.Optional[usb.core.Device] = None
- read_endpointtyping.Optional[usb.core.Endpoint] = None
- write_endpointtyping.Optional[usb.core.Endpoint] = None
- _reader_event= threading.Event()
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _is_closing= True
SerialDeviceManager
classDetect, filter, and get info for connected serial devices.
Methods
@classmethod
filter_kwargs(cls, kwargs : typing.Dict[str, str]) -> typing.Dict[str, str]Filter kwargs to those which are returned from `serial.tools.list_ports.comports`, i.e. the attributes of `DeviceInfo`. Supports the use of `port` as alternative name for `device`.
kwargstyping.Dict[str, str]A dictionary of key-value pairs to filter.typing.Dict[str, str]The filtered dictionary.@classmethod
get_all(cls) -> list[DeviceInfo]Get all connected devices.
list[DeviceInfo]A list of all devices detected.@classmethod
check_device_match(- cls,
- device_info : DeviceInfo,
- **kwargs
Determine if the `DeviceInfo` instance's attributes match the filter `**kwargs`
device_infoThe device to check for a match against.**kwargs= {}The key-value pairs, which will be filtered, and then used to evaluate the device for a match.boolTrue if the `DeviceInfo` matches (or no kwargs provided, or all kwargs have been filtered), else False.@classmethod
filter(cls, **kwargs) -> list[DeviceInfo]Search through all detectable devices. `**kwargs` are first filtered by `filter_kwargs` based on attrs of `DeviceInfo`. This allows the number of devices detected to be pared down based on known, stable information about the device being searched for.
**kwargs= {}Search criteria for finding a device.list[DeviceInfo]A list of devices which match all `**kwargs` provided.
Attributes
- SERIAL_SEARCH_KEYS= ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', 'product', 'interface']