unitelabs.bus
unitelabs/bus/__init__.py
Packages
Attributes
- __all__= [ "unitelabs.bus.commands.byte_command.ByteCommand", "unitelabs.bus.commands.command.Command", "unitelabs.bus.commands.builder.CommandBuilder", "unitelabs.bus.exceptions.CommandExecutionError", "unitelabs.bus.exceptions.CommandValidationError", "unitelabs.bus.transports.hid_transport.HIDTransport", "unitelabs.bus.commands.http_command.HTTPCommand", "unitelabs.bus.exceptions.InputValidationError", "unitelabs.bus.exceptions.NoSuchDeviceFound", "unitelabs.bus.protocols.protocol.Protocol", "unitelabs.bus.commands.request.Request", "unitelabs.bus.commands.response.Response", "unitelabs.bus.commands.serial_command.SerialCommand", "unitelabs.bus.utils.device_manager.SerialDeviceManager", "unitelabs.bus.transports.serial_transport.SerialTransport", "unitelabs.bus.factories.transport_factory.TransportFactory", "unitelabs.bus.transports.usb_transport.USBTransport", "unitelabs.bus.factories.hid_factory.create_hid_connection", "unitelabs.bus.factories.serial_factory.create_serial_connection", "unitelabs.bus.factories.tcp_factory.create_tcp_connection", "unitelabs.bus.factories.udp_factory.create_udp_connection", "unitelabs.bus.factories.usb_factory.create_usb_connection" ]
Classes
ByteCommand
classThe most basic form of a `Command`, which ingests and returns bytes.
- MRO
- Bases
- Command[bytes, bytes]
Methods
__init__(- self,
- message : bytes,
- timeout : float | None,
- is_void : bool | None,
- **kwargs
messagebytestimeoutfloat | None = Noneis_voidbool | None = False**kwargs= {}_serialize(self, message : bytes | None) -> bytesmessagebytes | None = Nonebytes_deserialize(self, response : bytes) -> bytesresponsebytesbytes
Command
classGeneric Command that can be used with `Protocol.execute`. The first type parameter of the `Command` determines the type that the `Command` accepts on init and serialization, the second type parameter determines the type returned by deserialization, and the third type parameter determines the type returned by the `result` method, e.g. `Command[str, list[str], list[bool]]` would ingest strings from the user, be converted to a list of strings for the parser and then return a list of booleans when used with `Protocol.execute`. Subclasses which change the constructor signature should be careful to preserve the `timeout` and `is_void` parameters. Removal of these arguments may result in compatability issues with the `CommandBuilder`.
Methods
__init__(- self,
- message : InType,
- timeout : float | None,
- is_void : bool
messageThe contents of the message to be sent to the device, pre-serialization.timeoutfloat | None = NoneHow long is seconds to wait for a response.is_voidbool = FalseIf true, does not return a response. Void commands ignore all response validations.__repr__(self) -> strstrserialize(self, message : InType | None) -> bytesSerialize the message into bytes. Uses `self.message` if `message` is None. NB: Calling with `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messageInType | None = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : bytes | None) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsebytes | None = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneSet `Response.payload` if it belongs to this Command. This method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolCheck if `data` belongs to this command and then validate the response. For devices that allow parallel command processing.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _parserslist[Parser] | None = None
- _responseResponse | None = None
- _requestRequest | None = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandBuilder
classClass that coordinates the chainable building of a `Command` instance.
Methods
__copy__(self) -> CommandBuilder[InType, OutType, R]__deepcopy__(self, memo : dict | None) -> CommandBuilder[InType, OutType, R]memodict | None = Nonewith_serializer(- self,
- serializer : Serializer[S_InType] | SelfSerializer[S_InType],
- **kwargs
Set a serializer to apply to the `Command.message` before sending it to the device. The serializer will be called with the message to send to the device as the first and only argument. Additional arguments can be preloaded into the serializer with `**kwargs`.
serializerThe serializer function to apply.**kwargs= {}Keyword arguments to pass to the serializer.with_deserializer(- self,
- deserializer : Deserializer[D_OutType] | SelfDeserializer[D_OutType],
- **kwargs
Set a deserializer function to apply to the `Command.response` recieved from the device. The deserializer will be called with the response bytes as the first and only argument. Additional arguments can be preloaded into the deserializer with `**kwargs`. This function will wrap the deserializer in a check for null values to ensure that calls without `response' use the `Response.payload.result()`.
deserializerThe deserializer function to apply.**kwargs= {}Keyword arguments to pass to the deserializer.Add a partial parser function to the `Command`. Parsers are chained together consequetively from `Command._parsers` when calling `Command.result()`. The first parser receives the deserialized data and must pass data through to the next parser.
with_timeout(self, timeout : float) -> CommandBuilder[InType, OutType, R]Set the timeout to apply to the command. Alternative to setting `timeout` in `build`, will be overriden by `timeout` in `build`, if provided.
timeoutfloatThe timeout to apply to the command.without_response(self) -> CommandBuilder[InType, None, None]Set the command to be a void command, i.e. return None as a response. Alternative to setting `is_void` in `build`, will be overriden by `is_void` in `build`, if provided.
CommandBuilder[InType, None, None]with_multiline(- self,
- timeout : float,
- validate_response : typing.Callable[[], bool] | typing.Callable[[], bool] | None
Handle multi-line responses.
timeoutfloatThe time in seconds to wait for more data before calling the `_validate_response` method.validate_responsetyping.Callable[[], bool] | typing.Callable[[], bool] | None = NoneThe function to replace the `Command._validate_response` method.build(- self,
- message : InType,
- *args,
- timeout : float | None,
- is_void : bool | None,
- **kwargs
Build the `Command` instance with the previously set serializer, deserializer and parsers.
messageThe message to send to the device.*args= ()Additional positional arguments to pass to the command.timeoutfloat | None = NoneThe timeout to apply to the command.is_voidbool | None = NoneWhether the command is a void command.**kwargs= {}Additional keyword arguments to pass to the command.
Attributes
- _command= command
- _cmd_cls_name= cls_name
- _serializer= None
- _deserializer= None
- _response_validator= None
- _response_validator_timeout= None
- _timeout= None
- _is_void= False
- _parserslist[Parser] = []
HTTPCommand
classAn HTTP Request.
- MRO
- Bases
- Command[bytes, HTTPResponse]
Methods
__init__(- self,
- message : bytes | None,
- host : str | None,
- path : str,
- method : _Method,
- headers : dict[str, str] | None,
- protocol_version : int,
- timeout : float | None,
- is_void : bool
messagebytes | None = NoneThe `body` of the request.hoststr | None = NoneThe host of the remote server.pathstr = '/'The path of the HTTP request.headersdict[str, str] | None = NoneThe headers of the HTTP request.protocol_versionint = 11The protocol version to use. Either 10 or 11.timeoutfloat | None = NoneHow long in seconds to wait for a response.is_voidbool = FalseWhether or not we should wait for a response at all._serialize(self, message : bytes) -> bytesmessagebytesbytes_validate_response(self, data : bytes) -> booldatabytesbool_parse_http_response(self, data : bytes) -> HTTPResponseRead and parse the given binary data into an `HTTPResponse`.
databytesThe binary data that contains an HTTP response message.An `HTTPResponse` instance with its status code, headers and
Attributes
- Methodtype[_Method] = _Method
- host= host
- path= path
- method= method
- headers= headers or {}
- protocol_version= protocol_version
- http_responseHTTPResponse | None = None
Request
classData that is sent to a `Transport`.
- Decorators
- dataclasses.dataclass
Methods
__init__(self, payload : bytes, timeout : float | None) -> NonepayloadbytesThe payload of the request.timeoutfloat | None = NoneThe duration in seconds to wait for a response before raising a timeout exception. Set to 0.0 to wait indefinitely.__post_init__(self) -> None
Attributes
- payloadbytes = None
- timeoutfloat | None = None
Response
classProtocols use `Response`s to specify data that is received from a transport.
- Decorators
- dataclasses.dataclass
Methods
__post_init__(self) -> None__handle_done(self, _payload : asyncio.Future[bytes]) -> NoneSet a callback to be run when the payload `Future` becomes done.
_payloadasyncio.Future[bytes]The `Future` object.
Attributes
- requestRequest = None
SerialCommand
classCommand for use with serial communication device.
- MRO
- Bases
- Command[str, str]
Methods
__init__(- self,
- message : str,
- read_terminator : bytes,
- write_terminator : bytes,
- encoding : str,
- **kwargs
messagestrthe string message to send to the device.read_terminatorbytes = b'\r\n'the byte-string expected at the end of messages coming from the device.write_terminatorbytes = b'\r\n'the byte-string to append to our string `message` which indicates to the device that a complete message has been received.encodingstr = 'ascii'The encoding used to convert between strings and bytes.**kwargs= {}Additional `Command` kwargs._serialize(self, message : str | None) -> bytesmessagestr | None = Nonebytes_deserialize(self, response : bytes | None) -> strresponsebytes | Nonestr_validate_response(self, data : bytes) -> booldatabytesbool
Attributes
- _read_terminator= read_terminator
- _write_terminator= write_terminator
- _encoding= encoding
CommandExecutionError
classCommand cannot be executed in current state.
- Bases
- Exception
CommandValidationError
classCommand input is invalid.
InputValidationError
classThe inputs given are invalid for the interface that received them.
- Bases
- ValueError
NoSuchDeviceFound
classNo matching device found.
- Bases
- Exception
TransportFactory
classInterface representing a factory for creating transports.
Protocol
classBase communication Protocol.
- Bases
- asyncio.Protocol
Methods
__init__(- self,
- transport_factory : TransportFactory,
- reconnect : bool,
- reconnect_delay : float,
- max_reconnect_attempts : int,
- autodetect : bool,
- max_parallel_commands : int,
- **kwargs
transport_factoryA callable used to create a connection to a transport.reconnectbool = TrueWhether or not to a attempt to reconnect to a device when the connection is lost.reconnect_delayfloat = DEFAULT_RECONNECT_DELAYHow long in seconds to wait between reconnection attempts.max_reconnect_attemptsHow many times to attempt to reconnect to a device before connection is considered lost.autodetectbool = FalseWhether or not to use autodetection for device connectivity.max_parallel_commandsThe maximum number of commands to process in parallel. This should be 1 for serial transports, but can be configured to allow more depending on the processing capacity of alternative transports.**kwargs= {}additional kwargs, including kwargs for use with `TransportFactory`.connect(- self,
- transport_factory : TransportFactory | None,
- **transport_kwargs
Open a connection to a new `Transport`. Closes the old connection, if it exists, before opening the new connection.
transport_factoryTransportFactory | None = NoneA callable used to create a connection to a transport, defaults to `self._transport_factory` set from the constructor.**transport_kwargs= {}Kwargs to pass into the `transport_factory`.tuple[TransportFactory, dict[str, typing.Any]]A tuple containing the `TransportFactory` and kwargs for the previously connected `Transport`._connect_transport(self, **kwargs) -> NoneCreate a new transport instance.
**kwargs= {}identity(self, **config_kwargs) -> boolValidate the identity of the connected device. This method will call another user-defined method on `Protocol` and compare the device's response (i.e. the method's return value) to values provided in `config_kwargs`.
**config_kwargs= {}kwargs sent from `validate`boolTrue if the result of the inner call matches the expectation from `config_kwargs` else False.validate(- self,
- timeout : float,
- **validation_kwargs
Validate the opened connection. This method will be called by user after `__init__` via `open`; it calls `identity` to determine if the connected device is the one that was expected. If autodetect=True this is called internally by `unitelabs.bus.utils.AutoDetector` to cycle through possile devices until the correct device or no device is found. `validation_kwargs` and `__init__` kwargs are stored on the `Protocol` such that they must only be provided once. If there be values which overlap, `__init__` values are overwritten by values in `validation_kwargs`. `validation_kwargs` may contain stable information about the device. Check `unitelabs.bus.utils.device_manager` for more information about valid device filter kwargs.
timeoutfloatHow long in seconds to wait for a response from the device.**validation_kwargs= {}Kwargs to use to run validation of the device, in the case of use with `autodetect=True`, these kwargs will be stored on the first call for all future validations.boolPropagated return value from `identity`; True if `identity` returns True else Falseopen(- self,
- validation_timeout : float,
- **validation_kwargs
Open underlying `Transport`, establish a connection to a device and validate the device's identity.
validation_timeoutfloat = 1.0How long in seconds to wait for a response to `Protocol.validate`.**validation_kwargs= {}kwargs to be passed to `Protocol.validate` to test device identity against.close(self) -> NoneClose underlying `Transport`. Explicitly calling `close` will NOT attempt to reconnect to the `Transport`.
connection_made(self, transport : asyncio.Transport) -> NoneRespond to a connection made event invoked by the transport.
transportasyncio.Transportconnection_lost(self, exc : Exception | None) -> NoneRespond to a lost connection event invoked by the transport. Attempts to reconnect after `reconnect_delay` seconds. Here `exc` can be None as a result of : - manual abort through direct call of transport's `abort` method - connection closing after `_safe_write` successfully wrote all data in write-buffer
excException | None = Nonepause_writing(self) -> NoneSet state to disallow writing.
resume_writing(self) -> NoneSet state to allow writing.
data_received(self, data : bytes) -> NoneReceive data from the `Transport`. Logs the data and sets the response if not already set. Further invocations with the same `Response` will only be logged.
databytesThe data received.datagram_received(self, data : bytes, addr : tuple[str, int]) -> NoneReceive a datagram from the `Transport`.
databytesThe datagram data received.addrtuple[str, int]The address of the sender.error_received(self, exc : Exception | type[Exception]) -> NoneReceive an error from the `Transport`. Logs the error and sets the response if not already set. Further invocations with the same `Response` will only be logged.
excException | type[Exception]The error received.Execute a `Command` by sending the `Request` within the `Command` to the `Transport`.
The deserialized response, created by `command.result()` or None if `Command.is_void` is True.
Attributes
- transportasyncio.Transport | None = None
- _transport_allows_writing= True
- _transport_factory= transport_factory
- _transport_kwargs= kwargs
- _commandslist[Command] = []
- lock= asyncio.BoundedSemaphore(value=max_parallel_commands)
- is_open= asyncio.Event()
- autodetect= autodetect
- _autodetector= None
- _validation_kwargs= None
- is_validated= asyncio.Event()
- reconnect= reconnect
- reconnect_delay= reconnect_delay
- remaining_reconnect_attempts= max_reconnect_attempts
- max_reconnect_attempts= max_reconnect_attempts
- loggerlogging.Logger = NoneA standard python logger.
- autodetectorAutoDetector = NoneAn `Autodetector` for connecting/re-connecting to certian devices.
HIDTransport
classTransport for devices connected to an HID USB interface.
- MRO
- └── Transport
- └─── HIDTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- interface_index : int,
- **kwargs
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0.**kwargs= {}Additional configuration options: - serial_number: Specific device serial number to connect to - usage_page: HID usage page filter - usage: HID usage filter - write_report_id: HID report ID to use - read_report_id: HID read report ID to use_open(self) -> None_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> bytes | Nonebytes | None_ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> intdatabytesint
Attributes
- read_sizeint = 64
- vendor= vendor
- product= product
- interface_number= interface_index
- serial_number= kwargs.get('serial_number')
- usage_page= kwargs.get('usage_page')
- usage= kwargs.get('usage')
- write_report_idbytes = kwargs.get('write_report_id', b'')
- read_report_idbytes = kwargs.get('read_report_id', b'')
- _devicehid.device | None = None
- _was_opened= False
- _reader_event= threading.Event()
- loggerlogging.Logger = NoneA standard python logger.
SerialTransport
classTransport for serial devices.
- MRO
- └── Transport
- └─── SerialTransport
Methods
_open(self) -> NoneOpen underlying serial port, if not already open.
_close(self) -> NoneClose underlying serial port, if open.
_ensure_reader(self) -> None_poll_read(self) -> None_remove_reader(self) -> None_read(self) -> bytes | Nonebytes | None_ensure_writer(self) -> NoneAdd a writer to the loop if not already added.
_poll_write(self) -> None_remove_writer(self) -> NoneRemove a writer from the loop.
_write(self, data : bytes) -> intdatabytesint
Attributes
- _serial= serial.serial_for_url(port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0, write_timeout=0, do_not_open=True)
- _max_read_size= 1024
- _read_buffer= []
USBTransport
classTransport for devices connected via USB. By default, uses the `libusb` backend and Interface 0 of Configuration 1 on the device.
- MRO
- └── Transport
- └─── USBTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- interface_index : int,
- **kwargs
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0.**kwargs= {}Additional backend-specific configuration options._open(self) -> NoneOpen the USB device using the selected backend.
_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> bytes | NoneRead data from the transport.
bytes | NoneData read from the device, if any._ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> intWrite data to USB device. Write errors are caught by `WriteTransport._safe_write`, which wraps this method.
databytesBytes to write.intNumber of bytes written or 0 if the device connection is not currently available.
Attributes
- read_sizeint = None
- vendor= vendor
- product= product
- interface_number= interface_index
- _backend= None
- _deviceusb.core.Device | None = None
- read_endpointusb.core.Endpoint | None = None
- write_endpointusb.core.Endpoint | None = None
- _was_opened= False
- _reader_event= threading.Event()
- loggerlogging.Logger = NoneA standard python logger.
SerialDeviceManager
classDetect, filter, and get info for connected serial devices.
Methods
@typing.override
@classmethod
filter_kwargs(cls, kwargs : dict[str, str]) -> dict[str, str]kwargsdict[str, str]dict[str, str]@typing.override
@classmethod
get_all(cls) -> list[DeviceInfo]list[DeviceInfo]@typing.override
@classmethod
check_device_match(- cls,
- device_info : DeviceInfo,
- **kwargs
device_info**kwargs= {}bool
Attributes
- SERIAL_SEARCH_KEYStyping_extensions.ClassVar = ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', 'product', 'interface']