unitelabs.bus
unitelabs/bus/__init__.py
Packages
Attributes
- __all__= [ "ByteCommand", "Command", "CommandBuilder", "CommandExecutionError", "CommandValidationError", "HIDTransport", "HTTPCommand", "InputValidationError", "NoSuchDeviceFound", "Protocol", "Request", "Response", "SerialCommand", "SerialDeviceManager", "SerialTransport", "TransportFactory", "USBTransport", "create_hid_connection", "create_serial_connection", "create_tcp_connection", "create_udp_connection", "create_usb_connection", "testing" ]
Functions
create_hid_connection(- protocol_factory : typing.Callable[..., P],
- vendor : int,
- product : int,
- **kwargs
Create a HID connection with a device based on specified `vendor` and `product` IDs.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.vendorintThe vendor ID of the HID device.productintThe product ID of the HID device.**kwargs= {}Additional keyword arguments to be passed to the `HIDTransport` constructor.tuple[HIDTransport, P]A tuple containing the `HIDTransport` instance and the protocol instance.create_serial_connection(- protocol_factory : typing.Callable[..., P],
- port : str,
- baudrate : int,
- bytesize : ByteSize,
- parity : Parity,
- stopbits : StopBits,
- **kwargs
Create a serial connection with the specified port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.portstrThe port of the serial device.baudrateint = 9600The baud rate.**kwargs= {}Additional keyword arguments to be passed to the `SerialTransport` constructor.tuple[SerialTransport, P]A tuple containing the `SerialTransport` instance and the `Protocol` instance.create_usb_connection(- protocol_factory : typing.Callable[..., P],
- vendor : int,
- product : int,
- **kwargs
Create a USB connection with a device based on specified `vendor` and `product` IDs.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.**kwargs= {}Additional keyword arguments to be passed to the `UsbTransport` constructor.tuple[USBTransport, P]A tuple containing the `UsbTransport` instance and the protocol instance.create_tcp_connection(- protocol_factory : typing.Callable[..., P],
- host : str,
- port : int,
- verify : typing.Union[None, bool, ssl.SSLContext],
- **kwargs
Create and open a streaming TCP connection to a given address specified by host and port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance if the `Protocol` to be used.hoststr = 'localhost'The host of the remote server.portint = 80The port number of the remote server.verifytyping.Union[None, bool, ssl.SSLContext] = NoneEither `True` to use an SSL context with the default CA bundle, `False` to disable verification, or an instance of `ssl.SSLContext` to use a custom context.**kwargs= {}Additional keyword arguments passed to `loop.create_connection`.tuple[asyncio.Transport, P]A tuple containing the transport and protocol.create_udp_connection(- protocol_factory : typing.Callable[..., P],
- server_port : int,
- server_host : str,
- client_port : int,
- client_host : str,
- **kwargs
Create a UDP connection with the specified server (sender) and client (receiver) addresses.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.server_portint = 0The port of the UDP server.server_hoststr = ''The IP address of the UDP server.client_portint = 0The desired port of the client.client_hoststr = ''The desired IP address of the client.**kwargs= {}Additional keyword arguments to be passed to the `UDPTransport` constructor.tuple[asyncio.Transport, P]A tuple containing the `UDPTransport` instance and the `Protocol` instance.
Classes
ByteCommand
classThe most basic form of a `Command`, which ingests and returns bytes.
- MRO
- Bases
- Command[bytes, bytes]
Methods
__init__(- self,
- message : bytes,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool],
- **kwargs
messagebytestimeouttyping.Optional[float] = Noneis_voidtyping.Optional[bool] = False**kwargs= {}_serialize(self, message : typing.Optional[bytes]) -> bytesmessagetyping.Optional[bytes] = Nonebytes_deserialize(self, response : bytes) -> bytesresponsebytesbytes__repr__(self) -> strstrserialize(self, message : typing.Optional[InType]) -> bytesSerialize the message into bytes. Uses `self.message` if `message` is None. NB: Calling with `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneSet `Response.payload` if it belongs to this Command. This method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolCheck if `data` belongs to this command and then validate the response. For devices that allow parallel command processing.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
Command
classGeneric Command that can be used with `Protocol.execute`. The first type parameter of the `Command` determines the type that the `Command` accepts on init and serialization, the second type parameter determines the type returned by deserialization, and the third type parameter determines the type returned by the `result` method, e.g. `Command[str, list[str], list[bool]]` would ingest strings from the user, be converted to a list of strings for the parser and then return a list of booleans when used with `Protocol.execute`. Subclasses which change the constructor signature should be careful to preserve the `timeout` and `is_void` parameters. Removal of these arguments may result in compatability issues with the `CommandBuilder`.
Methods
__init__(- self,
- message : InType,
- timeout : typing.Optional[float],
- is_void : bool
messageThe contents of the message to be sent to the device, pre-serialization.timeouttyping.Optional[float] = NoneHow long is seconds to wait for a response.is_voidbool = FalseIf true, does not return a response. Void commands ignore all response validations.__repr__(self) -> strstrserialize(self, message : typing.Optional[InType]) -> bytesSerialize the message into bytes. Uses `self.message` if `message` is None. NB: Calling with `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneSet `Response.payload` if it belongs to this Command. This method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolCheck if `data` belongs to this command and then validate the response. For devices that allow parallel command processing.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandBuilder
classClass that coordinates the chainable building of a `Command` instance.
Methods
__copy__(self) -> CommandBuilder[InType, OutType, R]__deepcopy__(self, memo : typing.Optional[dict]) -> CommandBuilder[InType, OutType, R]memotyping.Optional[dict] = Nonewith_serializer(- self,
- serializer : typing.Union[Serializer[S_InType], SelfSerializer[S_InType]],
- **kwargs
Set a serializer to apply to the `Command.message` before sending it to the device. The serializer will be called with the message to send to the device as the first and only argument. Additional arguments can be preloaded into the serializer with `**kwargs`.
serializertyping.Union[Serializer[S_InType], SelfSerializer[S_InType]]The serializer function to apply.**kwargs= {}Keyword arguments to pass to the serializer.with_deserializer(- self,
- deserializer : typing.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]],
- **kwargs
Set a deserializer function to apply to the `Command.response` recieved from the device. The deserializer will be called with the response bytes as the first and only argument. Additional arguments can be preloaded into the deserializer with `**kwargs`. This function will wrap the deserializer in a check for null values to ensure that calls without `response' use the `Response.payload.result()`.
deserializertyping.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]]The deserializer function to apply.**kwargs= {}Keyword arguments to pass to the deserializer.Add a partial parser function to the `Command`. Parsers are chained together consequetively from `Command._parsers` when calling `Command.result()`. The first parser receives the deserialized data and must pass data through to the next parser.
with_timeout(self, timeout : float) -> CommandBuilder[InType, OutType, R]Set the timeout to apply to the command. Alternative to setting `timeout` in `build`, will be overriden by `timeout` in `build`, if provided.
timeoutfloatThe timeout to apply to the command.without_response(self) -> CommandBuilder[InType, None, None]Set the command to be a void command, i.e. return None as a response. Alternative to setting `is_void` in `build`, will be overriden by `is_void` in `build`, if provided.
CommandBuilder[InType, None, None]with_multiline(- self,
- timeout : float,
- validate_response : typing.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]]
Handle multi-line responses.
timeoutfloatThe time in seconds to wait for more data before calling the `_validate_response` method.validate_responsetyping.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]] = NoneThe function to replace the `Command._validate_response` method.build(- self,
- message : InType,
- *args,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool],
- **kwargs
Build the `Command` instance with the previously set serializer, deserializer and parsers.
messageThe message to send to the device.*args= ()Additional positional arguments to pass to the command.timeouttyping.Optional[float] = NoneThe timeout to apply to the command.is_voidtyping.Optional[bool] = NoneWhether the command is a void command.**kwargs= {}Additional keyword arguments to pass to the command.
Attributes
- _command= command
- _cmd_cls_name= cls_name
- _serializer= None
- _deserializer= None
- _response_validator= None
- _response_validator_timeout= None
- _timeout= None
- _is_void= False
- _parserslist[Parser] = []
HTTPCommand
classAn HTTP Request.
- MRO
- Bases
- Command[bytes, HTTPResponse]
Methods
__init__(- self,
- message : typing.Optional[bytes],
- host : typing.Optional[str],
- path : str,
- method : _Method,
- headers : typing.Optional[dict[str, str]],
- protocol_version : int,
- timeout : typing.Optional[float],
- is_void : bool
messagetyping.Optional[bytes] = NoneThe `body` of the request.hosttyping.Optional[str] = NoneThe host of the remote server.pathstr = '/'The path of the HTTP request.headerstyping.Optional[dict[str, str]] = NoneThe headers of the HTTP request.protocol_versionint = 11The protocol version to use. Either 10 or 11.timeouttyping.Optional[float] = NoneHow long in seconds to wait for a response.is_voidbool = FalseWhether or not we should wait for a response at all._serialize(self, message : bytes) -> bytesmessagebytesbytes_validate_response(self, data : bytes) -> booldatabytesbool_parse_http_response(self, data : bytes) -> HTTPResponseRead and parse the given binary data into an `HTTPResponse`.
databytesThe binary data that contains an HTTP response message.An `HTTPResponse` instance with its status code, headers and__repr__(self) -> strstrserialize(self, message : typing.Optional[InType]) -> bytesSerialize the message into bytes. Uses `self.message` if `message` is None. NB: Calling with `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneSet `Response.payload` if it belongs to this Command. This method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness.match_response(self, data : bytes) -> boolCheck if `data` belongs to this command and then validate the response. For devices that allow parallel command processing.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- Methodtype[_Method] = _Method
- host= host
- path= path
- method= method
- headers= headers or {}
- protocol_version= protocol_version
- http_responsetyping.Optional[HTTPResponse] = None
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
Request
classData that is sent to a `Transport`.
- Decorators
- dataclasses.dataclass
Methods
__init__(self, payload : bytes, timeout : typing.Optional[float]) -> NonepayloadbytesThe payload of the request.timeouttyping.Optional[float] = NoneThe duration in seconds to wait for a response before raising a timeout exception. Set to 0.0 to wait indefinitely.__post_init__(self) -> None
Attributes
- payloadbytes = None
- timeouttyping.Optional[float] = None
Response
classProtocols use `Response`s to specify data that is received from a transport.
- Decorators
- dataclasses.dataclass
Methods
__post_init__(self) -> None__handle_done(self, _payload : asyncio.Future[bytes]) -> NoneSet a callback to be run when the payload `Future` becomes done.
_payloadasyncio.Future[bytes]The `Future` object.
Attributes
- requestRequest = None
SerialCommand
classCommand for use with serial communication device.
- MRO
- Bases
- Command[str, str]
Methods
__init__(- self,
- message : str,
- read_terminator : bytes,
- write_terminator : bytes,
- encoding : str,
- **kwargs
messagestrthe string message to send to the device.read_terminatorbytes = b'\r\n'the byte-string expected at the end of messages coming from the device.write_terminatorbytes = b'\r\n'the byte-string to append to our string `message` which indicates to the device that a complete message has been received.encodingstr = 'ascii'The encoding used to convert between strings and bytes.**kwargs= {}Additional `Command` kwargs._serialize(self, message : typing.Optional[str]) -> bytesmessagetyping.Optional[str] = Nonebytes_deserialize(self, response : typing.Optional[bytes]) -> strresponsetyping.Optional[bytes]str_validate_response(self, data : bytes) -> booldatabytesbool__repr__(self) -> strstrserialize(self, message : typing.Optional[InType]) -> bytesSerialize the message into bytes. Uses `self.message` if `message` is None. NB: Calling with `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneSet `Response.payload` if it belongs to this Command. This method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness.match_response(self, data : bytes) -> boolCheck if `data` belongs to this command and then validate the response. For devices that allow parallel command processing.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- _read_terminator= read_terminator
- _write_terminator= write_terminator
- _encoding= encoding
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandExecutionError
classCommand cannot be executed in current state.
- Bases
- Exception
CommandValidationError
classCommand input is invalid.
InputValidationError
classThe inputs given are invalid for the interface that received them.
- Bases
- ValueError
NoSuchDeviceFound
classNo matching device found.
- Bases
- Exception
TransportFactory
classInterface representing a factory for creating transports.
Protocol
classBase communication Protocol.
- Bases
- asyncio.Protocol
Methods
__init__(- self,
- transport_factory : TransportFactory,
- reconnect : bool,
- reconnect_delay : float,
- max_reconnect_attempts : int,
- autodetect : bool,
- max_parallel_commands : int,
- **kwargs
transport_factoryA callable used to create a connection to a transport.reconnectbool = TrueWhether or not to a attempt to reconnect to a device when the connection is lost.reconnect_delayfloat = DEFAULT_RECONNECT_DELAYHow long in seconds to wait between reconnection attempts.max_reconnect_attemptsHow many times to attempt to reconnect to a device before connection is considered lost.autodetectbool = FalseWhether or not to use autodetection for device connectivity.max_parallel_commandsThe maximum number of commands to process in parallel. This should be 1 for serial transports, but can be configured to allow more depending on the processing capacity of alternative transports.**kwargs= {}additional kwargs, including kwargs for use with `TransportFactory`.connect(- self,
- transport_factory : typing.Optional[TransportFactory],
- **transport_kwargs
Open a connection to a new `Transport`. Closes the old connection, if it exists, before opening the new connection.
transport_factorytyping.Optional[TransportFactory] = NoneA callable used to create a connection to a transport, defaults to `self._transport_factory` set from the constructor.**transport_kwargs= {}Kwargs to pass into the `transport_factory`.tuple[TransportFactory, dict[str, typing.Any]]A tuple containing the `TransportFactory` and kwargs for the previously connected `Transport`._connect_transport(self, **kwargs) -> NoneCreate a new transport instance.
**kwargs= {}identity(self, **config_kwargs) -> boolValidate the identity of the connected device. This method will call another user-defined method on `Protocol` and compare the device's response (i.e. the method's return value) to values provided in `config_kwargs`.
**config_kwargs= {}kwargs sent from `validate`boolTrue if the result of the inner call matches the expectation from `config_kwargs` else False.validate(- self,
- timeout : float,
- **validation_kwargs
Validate the opened connection. This method will be called by user after `__init__` via `open`; it calls `identity` to determine if the connected device is the one that was expected. If autodetect=True this is called internally by `unitelabs.bus.utils.AutoDetector` to cycle through possile devices until the correct device or no device is found. `validation_kwargs` and `__init__` kwargs are stored on the `Protocol` such that they must only be provided once. If there be values which overlap, `__init__` values are overwritten by values in `validation_kwargs`. `validation_kwargs` may contain stable information about the device. Check `unitelabs.bus.utils.device_manager` for more information about valid device filter kwargs.
timeoutfloatHow long in seconds to wait for a response from the device.**validation_kwargs= {}Kwargs to use to run validation of the device, in the case of use with `autodetect=True`, these kwargs will be stored on the first call for all future validations.boolPropagated return value from `identity`; True if `identity` returns True else Falseopen(- self,
- validation_timeout : float,
- **validation_kwargs
Open underlying `Transport`, establish a connection to a device and validate the device's identity.
validation_timeoutfloat = 1.0How long in seconds to wait for a response to `Protocol.validate`.**validation_kwargs= {}kwargs to be passed to `Protocol.validate` to test device identity against.close(self) -> NoneClose underlying `Transport`. Explicitly calling `close` will NOT attempt to reconnect to the `Transport`.
connection_made(self, transport : asyncio.Transport) -> NoneRespond to a connection made event invoked by the transport.
transportasyncio.Transportconnection_lost(self, exc : typing.Optional[Exception]) -> NoneRespond to a lost connection event invoked by the transport. Attempts to reconnect after `reconnect_delay` seconds. Here `exc` can be None as a result of : - manual abort through direct call of transport's `abort` method - connection closing after `_safe_write` successfully wrote all data in write-buffer
exctyping.Optional[Exception] = Nonepause_writing(self) -> NoneSet state to disallow writing.
resume_writing(self) -> NoneSet state to allow writing.
data_received(self, data : bytes) -> NoneReceive data from the `Transport`. Logs the data and sets the response if not already set. Further invocations with the same `Response` will only be logged.
databytesThe data received.datagram_received(self, data : bytes, addr : tuple[str, int]) -> NoneReceive a datagram from the `Transport`.
databytesThe datagram data received.addrtuple[str, int]The address of the sender.error_received(self, exc : typing.Union[Exception, type[Exception]]) -> NoneReceive an error from the `Transport`. Logs the error and sets the response if not already set. Further invocations with the same `Response` will only be logged.
exctyping.Union[Exception, type[Exception]]The error received.Execute a `Command` by sending the `Request` within the `Command` to the `Transport`.
The deserialized response, created by `command.result()` or None if `Command.is_void` is True.
Attributes
- transporttyping.Optional[asyncio.Transport] = None
- _transport_allows_writing= True
- _transport_factory= transport_factory
- _transport_kwargs= kwargs
- _commandslist[Command] = []
- lock= asyncio.BoundedSemaphore(value=max_parallel_commands)
- is_open= asyncio.Event()
- autodetect= autodetect
- _autodetector= None
- _validation_kwargs= None
- is_validated= asyncio.Event()
- reconnect= reconnect
- reconnect_delay= reconnect_delay
- remaining_reconnect_attempts= max_reconnect_attempts
- max_reconnect_attempts= max_reconnect_attempts
- loggerlogging.Logger = NoneA standard python logger.
- autodetectorAutoDetector = NoneAn `Autodetector` for connecting/re-connecting to certian devices.
HIDTransport
classTransport for devices connected to an HID USB interface.
- MRO
- └── Transport
- └─── HIDTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- interface_index : int,
- **kwargs
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0.**kwargs= {}Additional configuration options: - serial_number: Specific device serial number to connect to - usage_page: HID usage page filter - usage: HID usage filter - write_report_id: HID report ID to use - read_report_id: HID read report ID to use_open(self) -> None_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> typing_extensions.Optional[bytes]typing_extensions.Optional[bytes]_ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpen the transport and set state to allow future read operations.
close(self) -> NoneClose the transport and set state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneClose the transport immediately and update state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly call `_read` and aggregate the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClear out all available read data without notifying the protocol. Call `read_all` and discard the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- read_sizeint = 64
- vendor= vendor
- product= product
- interface_number= interface_index
- serial_number= kwargs.get('serial_number')
- usage_page= kwargs.get('usage_page')
- usage= kwargs.get('usage')
- write_report_idbytes = kwargs.get('write_report_id', b'')
- read_report_idbytes = kwargs.get('read_report_id', b'')
- _devicetyping_extensions.Optional[hid.device] = None
- _was_opened= False
- _reader_event= threading.Event()
- loggerlogging.Logger = NoneA standard python logger.
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P] = None
- _is_closing= True
SerialTransport
classTransport for serial devices.
- MRO
- └── Transport
- └─── SerialTransport
Methods
_open(self) -> NoneOpen underlying serial port, if not already open.
_close(self) -> NoneClose underlying serial port, if open.
_ensure_reader(self) -> None_poll_read(self) -> None_remove_reader(self) -> None_read(self) -> typing.Optional[bytes]typing.Optional[bytes]_ensure_writer(self) -> NoneAdd a writer to the loop if not already added.
_poll_write(self) -> None_remove_writer(self) -> NoneRemove a writer from the loop.
_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpen the transport and set state to allow future read operations.
close(self) -> NoneClose the transport and set state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneClose the transport immediately and update state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly call `_read` and aggregate the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClear out all available read data without notifying the protocol. Call `read_all` and discard the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- _serial= serial.serial_for_url(port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0, write_timeout=0, do_not_open=True)
- _max_read_size= 1024
- _read_buffer= []
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P] = None
- _is_closing= True
USBTransport
classTransport for devices connected via USB. By default, uses the `libusb` backend and Interface 0 of Configuration 1 on the device.
- MRO
- └── Transport
- └─── USBTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- interface_index : int,
- **kwargs
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0.**kwargs= {}Additional backend-specific configuration options._open(self) -> NoneOpen the USB device using the selected backend.
_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> typing.Optional[bytes]Read data from the transport.
typing.Optional[bytes]Data read from the device, if any._ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> intWrite data to USB device. Write errors are caught by `WriteTransport._safe_write`, which wraps this method.
databytesBytes to write.intNumber of bytes written or 0 if the device connection is not currently available.open(self) -> NoneOpen the transport and set state to allow future read operations.
close(self) -> NoneClose the transport and set state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneClose the transport immediately and update state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly call `_read` and aggregate the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClear out all available read data without notifying the protocol. Call `read_all` and discard the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- read_sizeint = None
- vendor= vendor
- product= product
- interface_number= interface_index
- _backend= None
- _devicetyping.Optional[usb.core.Device] = None
- read_endpointtyping.Optional[usb.core.Endpoint] = None
- write_endpointtyping.Optional[usb.core.Endpoint] = None
- _was_opened= False
- _reader_event= threading.Event()
- loggerlogging.Logger = NoneA standard python logger.
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P] = None
- _is_closing= True
SerialDeviceManager
classDetect, filter, and get info for connected serial devices.
Methods
@typing.override
@classmethod
filter_kwargs(cls, kwargs : dict[str, str]) -> dict[str, str]kwargsdict[str, str]dict[str, str]@typing.override
@classmethod
get_all(cls) -> list[DeviceInfo]list[DeviceInfo]@typing.override
@classmethod
check_device_match(- cls,
- device_info : DeviceInfo,
- **kwargs
device_info**kwargs= {}bool
Attributes
- SERIAL_SEARCH_KEYStyping_extensions.ClassVar = ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', 'product', 'interface']