unitelabs.bus
unitelabs/bus/__init__.py
Packages
Attributes
- __all__= [ "Protocol", "Command", "CommandBuilder", "Request", "Response", "ByteCommand", "SerialCommand", "HTTPCommand", "TransportFactory", "create_serial_connection", "create_usb_connection", "create_udp_connection", "create_tcp_connection", "SerialTransport", "USBTransport", "SerialDeviceManager", "InputValidationError", "CommandValidationError", "CommandExecutionError", "NoSuchDeviceFound", "testing" ]
Functions
create_serial_connection(- protocol_factory : typing.Callable[..., P],
- port : str,
- baudrate : int,
- bytesize : ByteSize,
- parity : Parity,
- stopbits : StopBits,
- **kwargs
Create a serial connection with the specified port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.portstrThe port of the serial device.baudrateint = 9600The baud rate.**kwargs= {}Additional keyword arguments to be passed to the `SerialTransport` constructor.tuple[SerialTransport, P]A tuple containing the `SerialTransport` instance and the `Protocol` instance.create_tcp_connection(- protocol_factory : typing.Callable[..., P],
- host : str,
- port : int,
- **kwargs
Create and open a streaming TCP connection to a given address specified by host and port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance if the `Protocol` to be used.hoststr = 'localhost'The host of the remote server.portint = 80The port number of the remote server.**kwargs= {}Additional keyword arguments passed to `loop.create_connection`.tuple[asyncio.Transport, P]A tuple containing the transport and protocol.create_usb_connection(- protocol_factory : typing.Callable[..., P],
- vendor : int,
- product : int,
- **kwargs
Create a USB connection with a device based on specified `vendor` and `product` IDs.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.**kwargs= {}Additional keyword arguments to be passed to the `UsbTransport` constructor.tuple[USBTransport, P]A tuple containing the `UsbTransport` instance and the protocol instance.create_udp_connection(- protocol_factory : typing.Callable[..., P],
- server_port : int,
- server_host : str,
- client_port : int,
- client_host : str,
- **kwargs
Create a UDP connection with the specified server (sender) and client (receiver) addresses.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.server_portint = 0The port of the UDP server.server_hoststr = ''The IP address of the UDP server.client_portint = 0The desired port of the client.client_hoststr = ''The desired IP address of the client.**kwargs= {}Additional keyword arguments to be passed to the `UDPTransport` constructor.tuple[asyncio.Transport, P]A tuple containing the `UDPTransport` instance and the `Protocol` instance.
Classes
ByteCommand
classThe most basic form of a `Command`, which ingests and returns bytes.
- MRO
- Bases
- Command[bytes, bytes]
Methods
__init__(- self,
- message : bytes,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool],
- **kwargs
messagebytestimeouttyping.Optional[float] = Noneis_voidtyping.Optional[bool] = False**kwargs= {}_serialize(self, message : typing.Optional[bytes]) -> bytesmessagetyping.Optional[bytes] = Nonebytes_deserialize(self, response : bytes) -> bytesresponsebytesbytesserialize(self, message : typing.Optional[InType]) -> bytesSerializes the message into bytes. Uses `self.message` if `message` is None. NB: Calling with a custom `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
Command
classGeneric Command that can be used with `Protocol.execute`. The first type parameter of the `Command` determines the type that the `Command` accepts on init and serialization, the second type parameter determines the type returned by deserialization, and the third type parameter determines the type returned by the `result` method, e.g. `Command[str, typing.List[str], typing.List[bool]]` would ingest strings from the user, be converted to a list of strings for the parser and then return a list of booleans when used with `Protocol.execute`. Subclasses which change the constructor signature should be careful to preserve the `timeout` and `is_void` parameters. Removal of these arguments may result in compatability issues with the `CommandBuilder`.
Methods
__init__(- self,
- message : InType,
- timeout : typing.Optional[float],
- is_void : bool
messageThe contents of the message to be sent to the device, pre-serialization.timeouttyping.Optional[float] = NoneHow long is seconds to wait for a response.is_voidbool = FalseIf true, does not return a response. Void commands ignore all response validations.serialize(self, message : typing.Optional[InType]) -> bytesSerializes the message into bytes. Uses `self.message` if `message` is None. NB: Calling with a custom `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandBuilder
classMethods
Class that coordinates the chainable building of a `Command` instance. Args: command: The `Command` class to build on top of. cls_name: The name of the `Command` subclass returned by `build`. Examples: Call functions individually, replacing the builder instance >>> builder = CommandBuilder(ByteCommand) >>> builder = builder.with_serializer(lambda x: x + b" ") >>> builder = builder.with_deserializer(lambda x: x.strip(b" ")) >>> cmd = builder.build("message", timeout=0.5, is_void=True) CommandBuilder `with_` methods do not operate on themselves, but rather create and operate on a new instance of the CommandBuilder. Chain functions together >>> cmd = ( >>> CommandBuilder(ByteCommand) >>> .with_serializer(lambda x: x + b" ") >>> .with_deserializer(lambda x: x.strip(b" ")) >>> .build("message", timeout=0.5, is_void=True) >>> ) Create multiple commands from the same builder, or builder intermediate >>> crlf_builder = ( >>> CommandBuilder(ByteCommand) >>> .with_serializer(lambda x: x + b" ") >>> .with_deserializer(lambda x: x.strip(b" ")) >>> ) >>> cmd = crlf_builder.build("message", timeout=0.5) >>> cmd2 = crlf_builder.build("message2", timeout=0.5) >>> cmd3 = crlf_builder.with_serializer(lambda x: x).build(b"message3", timeout=0.5) Here `cmd` and `cmd2` both use the same serializer and deserializer methods, but `cmd3` uses a different serializer.
commandtype[Command[InType, OutType]] = ByteCommandcls_namestr = 'BuiltCommand'__copy__(self) -> CommandBuilder[InType, OutType, R]__deepcopy__(self, memo) -> CommandBuilder[InType, OutType, R]memo= Nonewith_serializer(- self,
- serializer : typing.Union[Serializer[S_InType], SelfSerializer[S_InType]],
- **kwargs
Set a serializer to apply to the `Command.message` before sending it to the device. The serializer will be called with the message to send to the device as the first and only argument. Additional arguments can be preloaded into the serializer with `**kwargs`.
serializertyping.Union[Serializer[S_InType], SelfSerializer[S_InType]]The serializer function to apply.**kwargs= {}Keyword arguments to pass to the serializer.with_deserializer(- self,
- deserializer : typing.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]],
- **kwargs
Set a deserializer function to apply to the `Command.response` recieved from the device. The deserializer will be called with the response bytes as the first and only argument. Additional arguments can be preloaded into the deserializer with `**kwargs`. This function will wrap the deserializer in a check for null values to ensure that calls without `response' use the `Response.payload.result()`.
deserializertyping.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]]The deserializer function to apply.**kwargs= {}Keyword arguments to pass to the deserializer.Add a partial parser function to the `Command`, which will be applied, in order, to the deserialized data when calling `Command.result()`. Parsers are chained together consequetively and must pass data through to the next parser.
with_timeout(self, timeout : float) -> CommandBuilder[InType, OutType, R]Set the timeout to apply to the command. Alternative to setting `timeout` in `build`, will be overriden by `timeout` in `build`, if provided.
timeoutfloatThe timeout to apply to the command.without_response(self) -> CommandBuilder[InType, None, None]Set the command to be a void command, i.e. return None as a response. Alternative to setting `is_void` in `build`, will be overriden by `is_void` in `build`, if provided.
CommandBuilder[InType, None, None]with_multiline(- self,
- timeout : float,
- validate_response : typing.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]]
Handle multi-line responses. Examples: Use the base class's `_validate_response` method. >>> builder = CommandBuilder().with_multiline(0.01) >>> cmd = builder.build(b"message") Provide a custom validation function. >>> builder = builder.with_multiline(0.01, validate_response=lambda x: x.endswith(b" ")) >>> cmd = builder.build(b"message") Args: timeout: The time in seconds to wait for more data before calling the `_validate_response` method. validate_response: The function to replace the `Command._validate_response` method. Raises: CommandValidationError: If the timeout is lower than 0.
timeoutfloatvalidate_responsetyping.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]] = Nonebuild(- self,
- message : InType,
- *args,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool],
- **kwargs
Build the `Command` instance with the previously set serializer, deserializer and parsers.
messageThe message to send to the device.*args= ()Additional positional arguments to pass to the command.timeouttyping.Optional[float] = NoneThe timeout to apply to the command.is_voidtyping.Optional[bool] = NoneWhether the command is a void command.**kwargs= {}Additional keyword arguments to pass to the command.
Attributes
- _command= command
- _cmd_cls_name= cls_name
- _serializer= None
- _deserializer= None
- _response_validator= None
- _response_validator_timeout= None
- _timeout= None
- _is_void= False
- _parserslist[Parser] = []
HTTPCommand
classAn HTTP Request.
- MRO
- Bases
- Command[bytes, HTTPResponse]
Methods
__init__(- self,
- message : typing.Optional[bytes],
- host : typing.Optional[str],
- path : str,
- method : _Method,
- headers : typing.Optional[dict[str, str]],
- protocol_version : int,
- timeout : typing.Optional[float],
- is_void : bool
messagetyping.Optional[bytes] = NoneThe `body` of the request.hosttyping.Optional[str] = NoneThe host of the remote server.pathstr = '/'The path of the HTTP request.headerstyping.Optional[dict[str, str]] = NoneThe headers of the HTTP request.protocol_versionint = 11The protocol version to use. Either 10 or 11.timeouttyping.Optional[float] = NoneHow long in seconds to wait for a response.is_voidbool = FalseWhether or not we should wait for a response at all._serialize(self, message : bytes) -> bytesmessagebytesbytes_validate_response(self, data : bytes) -> booldatabytesbool_parse_http_response(self, data : bytes) -> HTTPResponseRead and parse the given binary data into an `HTTPResponse`.
databytesThe binary data that contains an HTTP response message.An `HTTPResponse` instance with its status code, headers andserialize(self, message : typing.Optional[InType]) -> bytesSerializes the message into bytes. Uses `self.message` if `message` is None. NB: Calling with a custom `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness.match_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- Methodtype[_Method] = _Method
- host= host
- path= path
- method= method
- headers= headers or {}
- protocol_version= protocol_version
- http_responsetyping.Optional[HTTPResponse] = None
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
Request
class`Protocol`s use `Request`s to specify data that is sent to a `Transport`.
- Decorators
- dataclasses.dataclass
Methods
__init__(self, payload : bytes, timeout : typing.Optional[float]) -> NonepayloadbytesThe payload of the request.timeouttyping.Optional[float] = NoneThe duration in seconds to wait for a response before raising a timeout exception. Set to 0.0 to wait indefinitely.__post_init__(self) -> None
Attributes
- payloadbytes = None
- timeouttyping.Optional[float] = None
Response
classProtocols use `Response`s to specify data that is received from a transport.
- Decorators
- dataclasses.dataclass
Methods
__post_init__(self) -> None__handle_done(self, _payload : asyncio.Future[bytes]) -> NoneThe callback to be run when the payload `Future` becomes done.
_payloadasyncio.Future[bytes]The `Future` object.
Attributes
- requestRequest = None
SerialCommand
classCommand for use with serial communication device.
- MRO
- Bases
- Command[str, str]
Methods
__init__(- self,
- message : str,
- read_terminator : bytes,
- write_terminator : bytes,
- encoding : str,
- **kwargs
messagestrthe string message to send to the device.read_terminatorbytes = b'\r\n'the byte-string expected at the end of messages coming from the device.write_terminatorbytes = b'\r\n'the byte-string to append to our string `message` which indicates to the device that a complete message has been received.encodingstr = 'ascii'The encoding used to convert between strings and bytes.**kwargs= {}Additional `Command` kwargs._serialize(self, message : typing.Optional[str]) -> bytesmessagetyping.Optional[str] = Nonebytes_deserialize(self, response : typing.Optional[bytes]) -> strresponsetyping.Optional[bytes]str_validate_response(self, data : bytes) -> booldatabytesboolserialize(self, message : typing.Optional[InType]) -> bytesSerializes the message into bytes. Uses `self.message` if `message` is None. NB: Calling with a custom `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness.match_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consecutively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- _read_terminator= read_terminator
- _write_terminator= write_terminator
- _encoding= encoding
- receivertyping.Optional[Protocol] = None
- _parserstyping.Optional[list[Parser]] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandExecutionError
class- Bases
- Exception
CommandValidationError
classInputValidationError
class- Bases
- ValueError
NoSuchDeviceFound
class- Bases
- Exception
TransportFactory
classInterface representing a factory for creating transports.
Protocol
classBase communication Protocol.
- Bases
- asyncio.Protocol
Methods
__init__(- self,
- transport_factory : TransportFactory,
- reconnect : bool,
- reconnect_delay : float,
- max_reconnect_attempts : int,
- autodetect : bool,
- max_parallel_commands : int,
- **kwargs
transport_factoryA callable used to create a connection to a transport.reconnectbool = TrueWhether or not to a attempt to reconnect to a device when the connection is lost.reconnect_delayfloat = DEFAULT_RECONNECT_DELAYHow long in seconds to wait between reconnection attempts.max_reconnect_attemptsHow many times to attempt to reconnect to a device before connection is considered lost.autodetectbool = FalseWhether or not to use autodetection for device connectivity.max_parallel_commandsThe maximum number of commands to process in parallel. This should be 1 for serial transports, but can be configured to allow more depending on the processing capacity of alternative transports.**kwargs= {}additional kwargs, including kwargs for use with `TransportFactory`.connect(- self,
- transport_factory : typing.Optional[TransportFactory],
- **transport_kwargs
Open a connection to a new `Transport`. Closes the old connection, if it exists, before opening the new connection.
transport_factorytyping.Optional[TransportFactory] = NoneA callable used to create a connection to a transport, defaults to `self._transport_factory`, set from the constructor.**transport_kwargs= {}Kwargs to pass into the `transport_factory`.typing.Tuple[TransportFactory, dict[str, typing.Any]]A tuple containing the `TransportFactory` and kwargs for the previously connected `Transport`._connect_transport(self, **kwargs) -> NoneCreate a new transport instance.
**kwargs= {}identity(self, **config_kwargs) -> boolMethod for validating the identity of the connected device. This method will call another user-defined method on `Protocol` and compare the device's response (i.e. the method's return value) to values provided in `config_kwargs`.
**config_kwargs= {}kwargs sent from `validate`boolTrue if the result of the inner call matches the expectation from `config_kwargs` else False.validate(- self,
- timeout : float,
- **validation_kwargs
This method will be called by user after `__init__` via `open`; it calls `identity` to determine if the connected device is the one that was expected. If autodetect=True this is called internally by `unitelabs.bus.utils.AutoDetector` to cycle through possile devices until the correct device or no device is found. `validation_kwargs` and `__init__` kwargs are stored on the `Protocol` such that they must only be provided once. Should there be values which overlap, `__init__` values are overwritten by values in `validation_kwargs`. `validation_kwargs` may contain stable information about the device. Check `unitelabs.bus.utils.device_manager` for more information about valid device filter kwargs.
timeoutfloatHow long in seconds to wait for a response from the device.**validation_kwargs= {}Kwargs to use to run validation of the device, in the case of use with `autodetect=True`, these kwargs will be stored on the first call for all future validations.boolPropagated return value from `identity`; True if `identity` returns True else Falseopen(- self,
- validation_timeout : float,
- **validation_kwargs
Open underlying `Transport`, establish a connection to a device and validate the device's identity.
validation_timeoutfloat = 1.0How long in seconds to wait for a response to `Protocol.validate`.**validation_kwargs= {}kwargs to be passed to `Protocol.validate` to test device identity against.close(self) -> NoneClose underlying `Transport`. Explicitly calling `close` will NOT attempt to reconnect to the `Transport`.
connection_made(self, transport : asyncio.Transport) -> NoneInvoked by `transport` when connection is made. Logs the connection.
transportasyncio.Transportconnection_lost(self, exc : typing.Optional[Exception]) -> NoneInvoked by transport when connection is lost. Attempts to reconnect after `reconnect_delay` seconds. Here `exc` can be None as a result of : - manual abort through direct call of transport's `abort` method - connection closing after `_safe_write` successfully wrote all data in write-buffer
exctyping.Optional[Exception] = Nonepause_writing(self) -> Noneresume_writing(self) -> Nonedata_received(self, data : bytes) -> NoneInvoked by transport when data is received. Logs the data and sets the response if not already set. Further invocations with the same `Response` will only be logged.
databytesThe data received.datagram_received(self, data : bytes, addr : tuple[str, int]) -> NoneInvoked by transport when a datagram is received.
databytesThe datagram data received.addrtuple[str, int]The address of the sender.error_received(self, exc : typing.Union[Exception, type[Exception]]) -> NoneInvoked by transport when an error is received. Logs the error and sets the response if not already set. Further invocations with the same `Response` will only be logged.
exctyping.Union[Exception, type[Exception]]The error received.Executes a `Command` by sending the `Request` within the `Command` to the `Transport`.
The deserialized response, created by `command.result()` or None if `Command.is_void` is True.
Attributes
- transporttyping.Optional[asyncio.Transport] = None
- _transport_allows_writing= True
- _transport_factory= transport_factory
- _transport_kwargs= kwargs
- _commandslist[Command] = []
- lock= asyncio.BoundedSemaphore(value=max_parallel_commands)
- is_open= asyncio.Event()
- autodetect= autodetect
- _autodetector= None
- _validation_kwargs= None
- is_validated= asyncio.Event()
- reconnect= reconnect
- reconnect_delay= reconnect_delay
- remaining_reconnect_attempts= max_reconnect_attempts
- max_reconnect_attempts= max_reconnect_attempts
- loggerlogging.Logger = NoneA standard python logger.
- autodetectorAutoDetector = None
SerialTransport
classTransport for serial devices.
- MRO
- └── Transport
- └─── SerialTransport
Methods
_open(self) -> NoneOpens underlying serial port, if not already open.
_close(self) -> NoneCloses underlying serial port, if open.
_ensure_reader(self) -> None_poll_read(self) -> None_remove_reader(self) -> None_read(self) -> typing.Optional[bytes]typing.Optional[bytes]_ensure_writer(self) -> NoneAdds a writer to the loop if not already added.
_poll_write(self) -> None_remove_writer(self) -> NoneRemoves a writer from the loop.
_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- _serial= _serial
- _max_read_size= 1024
- _read_buffer= []
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P] = None
- _is_closing= True
USBTransport
classTransport for devices connected via USB. By default, this implementation uses Interface 0 of Configuration 1 on the device.
- MRO
- └── Transport
- └─── USBTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- interface_index : int
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0._open(self) -> None_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> typing.Optional[bytes]Read data from the transport.
typing.Optional[bytes]_ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> Nonedatabytesopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- vendor= vendor
- product= product
- _closing= True
- _interface_index= interface_index
- _devicetyping.Optional[usb.core.Device] = None
- read_endpointtyping.Optional[usb.core.Endpoint] = None
- write_endpointtyping.Optional[usb.core.Endpoint] = None
- _reader_event= threading.Event()
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P] = None
- _is_closing= True
SerialDeviceManager
classDetect, filter, and get info for connected serial devices.
Methods
@classmethod
filter_kwargs(cls, kwargs : dict[str, str]) -> dict[str, str]Filter kwargs to those which are returned from `serial.tools.list_ports.comports`, i.e. the attributes of `DeviceInfo`. Supports the use of `port` as alternative name for `device`.
kwargsdict[str, str]A dictionary of key-value pairs to filter.dict[str, str]The filtered dictionary.@classmethod
get_all(cls) -> list[DeviceInfo]Get all connected devices.
list[DeviceInfo]A list of all devices detected.@classmethod
check_device_match(- cls,
- device_info : DeviceInfo,
- **kwargs
Determine if the `DeviceInfo` instance's attributes match the filter `**kwargs`
device_infoThe device to check for a match against.**kwargs= {}The key-value pairs, which will be filtered, and then used to evaluate the device for a match.boolTrue if the `DeviceInfo` matches (or no kwargs provided, or all kwargs have been filtered), else False.@classmethod
filter(cls, **kwargs) -> list[DeviceInfo]Search through all detectable devices. `**kwargs` are first filtered by `filter_kwargs` based on attrs of `DeviceInfo`. This allows the number of devices detected to be pared down based on known, stable information about the device being searched for.
**kwargs= {}Search criteria for finding a device.list[DeviceInfo]A list of devices which match all `**kwargs` provided.
Attributes
- SERIAL_SEARCH_KEYS= ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', 'product', 'interface']