unitelabs.bus
unitelabs/bus/__init__.py
Packages
Attributes
- __all__= [ "Protocol", "Command", "CommandBuilder", "Request", "Response", "TransportFactory", "create_serial_connection", "create_udp_connection", "create_usb_connection", "SerialTransport", "UsbTransport", "UDPTransport", "SerialDeviceManager", "InputValidationError", "CommandValidationError", "CommandExecutionError", "NoSuchDeviceFound", "testing" ]
Functions
create_serial_connection(- protocol_factory : typing.Callable[..., P],
- port : str,
- baudrate : int,
- bytesize : ByteSize,
- parity : Parity,
- stopbits : StopBits,
- **kwargs
Create a serial connection with the specified port.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.portstrThe port of the serial device.baudrateint = 9600The baud rate.**kwargs= {}Additional keyword arguments to be passed to the `SerialTransport` constructor.create_udp_connection(- protocol_factory : typing.Callable[..., P],
- server_port : int,
- server_host : str,
- client_port : int,
- client_host : str,
- **kwargs
Create a UDP connection with the specified server (sender) and client (receiver) addresses.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the `Protocol` to be used.server_portintThe port of the UDP server.server_hoststr = DEFAULT_UDP_CLIENT_HOSTThe IP address of the UDP server.client_portint = DEFAULT_UDP_CLIENT_PORTThe desired port of the client.client_hoststr = DEFAULT_UDP_CLIENT_HOSTThe desired IP address of the client.**kwargs= {}Additional keyword arguments to be passed to the `UDPTransport` constructor.create_usb_connection(- protocol_factory : typing.Callable[..., P],
- vendor : int,
- product : int,
- **kwargs
Create a USB connection with a device based on specified `vendor` and `product` IDs.
protocol_factorytyping.Callable[..., P]A callable that returns an instance of the protocol to be used.vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.**kwargs= {}Additional keyword arguments to be passed to the `UsbTransport` constructor.
Classes
Command
classGeneric Command that can be used with `Protocol.execute`. The first type parameter of the `Command` determines the type that the `Command` accepts on init and serialization, the second type parameter determines the type returned by deserialization, and the third type parameter determines the type returned by the `result` method, e.g. `Command[str, typing.List[str], typing.List[bool]]` would ingest strings from the user, be converted to a list of strings for the parser and then return a list of booleans when used with `Protocol.execute`. Subclasses which change the constructor signature should be careful to preserve the `timeout` and `is_void` parameters. Removal of these arguments may result in compatability issues with the `CommandBuilder`.
Methods
__init__(- self,
- message : InType,
- timeout : typing.Optional[float],
- is_void : bool
messageThe contents of the message to be sent to the device, pre-serialization.timeouttyping.Optional[float] = NoneHow long is seconds to wait for a response.is_voidbool = FalseIf true, does not return a response. Void commands ignore all response validations.serialize(self, message : typing.Optional[InType]) -> bytesSerializes the message into bytes. Uses `self.message` if `message` is None. NB: Calling with a custom `message` DOES NOT SET the `self.message'` attribute; the `Request` sent to the device will use the `self.message` attribute.
messagetyping.Optional[InType] = NoneA command input, or None to use `self.message`.bytesThe serialized message to be sent to the device.deserialize(self, response : typing.Optional[bytes]) -> OutTypeDeserializes the `response` bytes. Calls `_deserialize` with `self.response` payload if `response` is None.
responsetyping.Optional[bytes] = Nonebytes to be deserialized, or None to use `Response` payload.None if `self.is_void`, else the deserialized `response`.validate_request(self, message : bytes) -> boolValidate a serialized message. Called within `Command.request` before generating a `Request` object.
messagebytesThe serialized message to set as the `Request.payload`, if valid.boolWhether or not the `message` is valid._set_response(self) -> NoneSet the result of `self.response.payload` to `self._response_buffer` and clears `self._response_buffer`.
validate_response(self, data : bytes) -> NoneThis method is called by `Protocol.data_received` and is responsible for setting the `Response.payload`. It manages the `_response_buffer` that accumulates the response bytes and calls `_validate_response` to determine whether the accumulated message in the `_response_buffer` is finished / 'valid'. If the response is valid, it sets the `Response.payload.result` to the accumulated bytes from the `_response_buffer`.
databytesThe bytes from the `Transport` to add to the response_buffer and evaluate for completeness._validate_response(self, data : bytes) -> boolUser-configured response validation method. Validate the data received from the `Transport` and determine if the data is a complete response. Subclasses override this method to specify behavior for validating data before setting it as the response.
databytesThe bytes to evaluate for completeness.boolTrue if `data` is a complete, valid message from the device, otherwise False.match_response(self, data : bytes) -> boolFor devices that allow parallel command processing, first check if `data` belongs to this command and then validate the response.
databytesThe bytes to check for match during parallel processing, usually an identifier shared by request and response.boolTrue if the `data` matches to this command, otherwise False.Add a parser to the list of parsers applied when calling `result()`. Parsers are chained together in the order that they are added and are consequetively applied to the deserialized data based on the return value of the previous parser, meaning that any data filtered by earlier parsers will no longer be accessible to later parsers.
Attributes
- receivertyping.Optional[Protocol] = None
- _responsetyping.Optional[Response] = None
- _requesttyping.Optional[Request] = None
- message= message
- timeout= timeout
- _response_buffer= b''
- is_void= is_void
- _parserslist[Parser] = []
- requestRequest = NoneThe `Request` which will be used by the `Protocol` to send bytes to the device. Calls `validate_request` on the `command` before serializing it and creating the `Request` object.
- responseResponse = NoneThe `Response` used by `Protocol.data_received` to set the `payload` of the command. When using the `Protocol.execute` method, the `Protocol` will call `validate_response` from within it's `Protocol.data_received` method and only set the result if valid.
- parserslist[Parser] = NoneParsers are functions that are applied to the deserialized data when calling `result()`.
CommandBuilder
classMethods
Class that coordinates the chainable building of a `Command` instance. Args: command: The `Command` class to build on top of. cls_name: The name of the `Command` subclass returned by `build`. Examples: Call functions individually >>> builder = CommandBuilder(ByteCommand) >>> builder = builder.with_serializer(lambda x: x + b" ") >>> builder = builder.with_deserializer(lambda x: x.strip(b" ")) >>> cmd = builder.build("message", timeout=0.5, is_void=True) Chain functions together >>> cmd = CommandBuilder(ByteCommand) >>> .with_serializer(lambda x: x + b" ") >>> .with_deserializer(lambda x: x.strip(b" ")) >>> .build("message", timeout=0.5, is_void=True) Create multiple commands from the same builder, or builder intermediate >>> crlf_builder = CommandBuilder(ByteCommand) >>> .with_serializer(lambda x: x + b" ") >>> .with_deserializer(lambda x: x.strip(b" ")) >>> cmd = crlf_builder.build("message", timeout=0.5) >>> cmd2 = crlf_builder.build("message2", timeout=0.5) >>> cmd3 = crlf_builder.with_serializer(lambda x: x).build(b"message3", timeout=0.5) Here `cmd` and `cmd2` both use the same serializer and deserializer methods, but `cmd3` uses a different serializer.
commandtype[Command[InType, OutType]] = ByteCommandcls_namestr = 'BuiltCommand'with_serializer(- self,
- serializer : typing.Union[Serializer[S_InType], SelfSerializer[S_InType]],
- **kwargs
Set a serializer to apply to the `Command.message` before sending it to the device. The serializer will be called with the message to send to the device as the first and only argument. Additional arguments can be preloaded into the serializer with `**kwargs`.
serializertyping.Union[Serializer[S_InType], SelfSerializer[S_InType]]The serializer function to apply.**kwargs= {}Keyword arguments to pass to the serializer.with_deserializer(- self,
- deserializer : typing.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]],
- **kwargs
Set a deserializer function to apply to the `Command.response` recieved from the device. The deserializer will be called with the response bytes as the first and only argument. Additional arguments can be preloaded into the deserializer with `**kwargs`. This function will wrap the deserializer in a check for null values to ensure that calls without `response' use the `Response.payload.result()`.
deserializertyping.Union[Deserializer[D_OutType], SelfDeserializer[D_OutType]]The deserializer function to apply.**kwargs= {}Keyword arguments to pass to the deserializer.Add a partial parser function to the `Command`, which will be applied, in order, to the deserialized data when calling `Command.result()`. Parsers are chained together consequetively and must pass data through to the next parser.
with_timeout(self, timeout : float) -> typing_extensions.SelfSet the timeout to apply to the command. Alternative to setting `timeout` in `build`, will be overriden by `timeout` in `build`, if provided.
timeoutfloatThe timeout to apply to the command.typing_extensions.Selfwithout_response(self) -> CommandBuilder[InType, None, None]Set the command to be a void command, i.e. return None as a response. Alternative to setting `is_void` in `build`, will be overriden by `is_void` in `build`, if provided.
CommandBuilder[InType, None, None]with_multiline(- self,
- timeout : float,
- validate_response : typing.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]]
Handle multi-line responses. Examples: Use the base class's `_validate_response` method. >>> builder = CommandBuilder().with_multiline(0.01) >>> cmd = builder.build(b"message") Provide a custom validation function. >>> builder = builder.with_multiline(0.01, validate_response=lambda x: x.endswith(b" ")) >>> cmd = builder.build(b"message") Args: timeout: The time in seconds to wait for more data before calling the `_validate_response` method. validate_response: The function to replace the `Command._validate_response` method. Raises: CommandValidationError: If the timeout is lower than 0.
timeoutfloatvalidate_responsetyping.Optional[typing.Union[typing.Callable[[], bool], typing.Callable[[], bool]]] = Nonetyping_extensions.Selfbuild(- self,
- message : InType,
- *args,
- timeout : typing.Optional[float],
- is_void : typing.Optional[bool],
- **kwargs
Build the `Command` instance with the previously set serializer, deserializer and parsers.
messageThe message to send to the device.*args= ()Additional positional arguments to pass to the command.timeouttyping.Optional[float] = NoneThe timeout to apply to the command.is_voidtyping.Optional[bool] = NoneWhether the command is a void command.**kwargs= {}Additional keyword arguments to pass to the command.
Attributes
- _command= command
- _cmd_cls_name= cls_name
- _serializer= None
- _deserializer= None
- _response_validator= None
- _response_validator_timeout= None
- _timeout= None
- _is_void= False
- _parserslist[Parser] = []
Request
class`Protocol`s use `Request`s to specify data that is sent to a `Transport`.
- Decorators
- dataclasses.dataclass
Methods
__init__(self, payload : bytes, timeout : typing.Optional[float]) -> NonepayloadbytesThe payload of the request.timeouttyping.Optional[float] = NoneThe duration in seconds to wait for a response before raising a timeout exception. Set to 0.0 to wait indefinitely.__post_init__(self) -> None
Attributes
- payloadbytes = None
- timeouttyping.Optional[float] = None
Response
classProtocols use `Response`s to specify data that is received from a transport.
- Decorators
- dataclasses.dataclass
Methods
__post_init__(self) -> None__handle_done(self, _payload : asyncio.Future[bytes]) -> NoneThe callback to be run when the payload `Future` becomes done.
_payloadasyncio.Future[bytes]The `Future` object.
Attributes
- requestRequest = None
CommandExecutionError
class- Bases
- Exception
CommandValidationError
classInputValidationError
class- Bases
- ValueError
NoSuchDeviceFound
class- Bases
- Exception
TransportFactory
classInterface representing a factory for creating transports.
Protocol
classBase communication Protocol.
- Bases
- asyncio.Protocol
Methods
__init__(- self,
- transport_factory : TransportFactory,
- reconnect : bool,
- reconnect_delay : float,
- max_reconnect_attempts : int,
- autodetect : bool,
- max_parallel_commands : int,
- **kwargs
transport_factoryA callable used to create a connection to a transport.reconnectbool = TrueWhether or not to a attempt to reconnect to a device when the connection is lost.reconnect_delayfloat = DEFAULT_RECONNECT_DELAYHow long in seconds to wait between reconnection attempts.max_reconnect_attemptsHow many times to attempt to reconnect to a device before connection is considered lost.autodetectbool = FalseWhether or not to use autodetection for device connectivity.max_parallel_commandsThe maximum number of commands to process in parallel. This should be 1 for serial transports, but can be configured to allow more depending on the processing capacity of alternative transports.**kwargs= {}additional kwargs, including kwargs for use with `TransportFactory`.connect(- self,
- transport_factory : typing.Optional[TransportFactory],
- **transport_kwargs
Open a connection to a new `Transport`.
transport_factorytyping.Optional[TransportFactory] = NoneA callable used to create a connection to a transport, defaults to `self._transport_factory`, set from the constructor.**transport_kwargs= {}Kwargs to pass into the `transport_factory`.typing.Tuple[TransportFactory, dict[str, typing.Any]]A tuple containing the `TransportFactory` and kwargs for the previously connected `Transport`._connect_transport(self, **kwargs) -> NoneCreate a new transport instance.
**kwargs= {}identity(self, **config_kwargs) -> boolMethod for validating the identity of the connected device. This method will call another user-defined method on `Protocol` and compare the device's response (i.e. the method's return value) to values provided in `config_kwargs`.
**config_kwargs= {}kwargs sent from `validate`boolTrue if the result of the inner call matches the expectation from `config_kwargs` else False.validate(- self,
- timeout : float,
- **validation_kwargs
This method will be called by user after `__init__` via `open`; it calls `identity` to determine if the connected device is the one that was expected. If autodetect=True this is called internally by `unitelabs.bus.utils.AutoDetector` to cycle through possile devices until the correct device or no device is found. `validation_kwargs` and `__init__` kwargs are stored on the `Protocol` such that they must only be provided once. Should there be values which overlap, `__init__` values are overwritten by values in `validation_kwargs`. `validation_kwargs` may contain stable information about the device. Check `unitelabs.bus.utils.device_manager` for more information about valid device filter kwargs.
timeoutfloatHow long in seconds to wait for a response from the device.**validation_kwargs= {}Kwargs to use to run validation of the device, in the case of use with `autodetect=True`, these kwargs will be stored on the first call for all future validations.boolPropagated return value from `identity`; True if `identity` returns True else Falseopen(- self,
- validation_timeout : float,
- **validation_kwargs
Open underlying `Transport`, establish a connection to a device and validate the device's identity.
validation_timeoutfloat = 1.0How long in seconds to wait for a response to `Protocol.validate`.**validation_kwargs= {}kwargs to be passed to `Protocol.validate` to test device identity against.close(self) -> NoneClose underlying `Transport`. Explicitly calling `close` will NOT attempt to reconnect to the `Transport`.
connection_made(self, transport : asyncio.Transport) -> NoneInvoked by `transport` when connection is made. Logs the connection.
transportasyncio.Transportconnection_lost(self, exc : typing.Optional[Exception]) -> NoneInvoked by transport when connection is lost. Attempts to reconnect after `reconnect_delay` seconds. Here `exc` can be None as a result of : - manual abort through direct call of transport's `abort` method - connection closing after `_safe_write` successfully wrote all data in write-buffer
exctyping.Optional[Exception] = Nonepause_writing(self) -> Noneresume_writing(self) -> Nonedata_received(self, data : bytes) -> NoneInvoked by transport when data is received. Logs the data and sets the response if not already set. Further invocations with the same `Response` will only be logged.
databytesThe data received.error_received(self, exc : typing.Union[Exception, type[Exception]]) -> NoneInvoked by transport when an error is received. Logs the error and sets the response if not already set. Further invocations with the same `Response` will only be logged.
exctyping.Union[Exception, type[Exception]]The error received.Executes a `Command` by sending the `Request` within the `Command` to the `Transport`.
The deserialized response, created by `command.result()` or None if `Command.is_void` is True.
Attributes
- _transport_allows_writing= True
- _transport_factory= transport_factory
- _transport_kwargs= kwargs
- _commandslist[Command] = []
- lock= asyncio.BoundedSemaphore(value=max_parallel_commands)
- is_open= asyncio.Event()
- autodetect= autodetect
- _autodetector= None
- _validation_kwargs= None
- is_validated= asyncio.Event()
- reconnect= reconnect
- reconnect_delay= reconnect_delay
- remaining_reconnect_attempts= max_reconnect_attempts
- max_reconnect_attempts= max_reconnect_attempts
- loggerlogging.Logger = NoneA standard python logger.
- autodetectorAutoDetector = None
SerialTransport
classTransport for serial devices.
- MRO
- └── Transport
- └─── SerialTransport
Methods
_open(self) -> NoneOpens underlying serial port, if not already open.
_close(self) -> NoneCloses underlying serial port, if open.
_ensure_reader(self) -> None_poll_read(self) -> None_remove_reader(self) -> None_read(self) -> typing.Optional[bytes]typing.Optional[bytes]_ensure_writer(self) -> NoneAdds a writer to the loop if not already added.
_poll_write(self) -> None_remove_writer(self) -> NoneRemoves a writer from the loop.
_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- _serial= _serial
- _max_read_size= 1024
- _read_buffer= []
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P_co] = None
- _is_closing= True
UDPTransport
classInterface for User Datagram Protocol (UDP) communication.
- MRO
- └── Transport
- └─── UDPTransport
Methods
__init__(- self,
- server_port : int,
- server_host : str,
- client_port : int,
- client_host : str
server_portintThe port of the server.server_hoststr = DEFAULT_UDP_CLIENT_HOSTThe IP address of the server (sender).client_portint = DEFAULT_UDP_CLIENT_PORTThe port of the client.client_hoststr = DEFAULT_UDP_CLIENT_HOSTThe IP address of the client (receiver)._open(self) -> None_close(self) -> None_ensure_reader(self) -> None_poll_read(self) -> None_read(self) -> typing.Optional[bytes]typing.Optional[bytes]_remove_reader(self) -> None_ensure_writer(self) -> None_poll_write(self) -> None_remove_writer(self) -> None_write(self, data : bytes) -> intdatabytesintopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- client= (client_host, client_port)
- server= (server_host, server_port)
- _socket= socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- _max_read_sizeint = DEFAULT_MAX_READ_SIZE
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _protocoltyping.Optional[P_co] = None
- _is_closing= True
UsbTransport
classTransport for devices connected via USB. By default, this implementation uses Interface 0 of Configuration 1 on the device.
- MRO
- └── Transport
- └─── UsbTransport
Methods
__init__(- self,
- vendor : int,
- product : int,
- protocol : asyncio.BaseProtocol,
- interface_index : int
vendorintThe vendor ID of the USB device.productintThe product ID of the USB device.protocolasyncio.BaseProtocolThe protocol instance to use for communication.interface_indexint = DEFAULT_USB_INTERFACEThe index of the USB Interface to use. Defaults to 0._open(self) -> None_close(self) -> None_ensure_reader(self) -> None_remove_reader(self) -> None__read(self) -> None_read(self) -> NoneRead data from the transport.
_ensure_writer(self) -> None_remove_writer(self) -> None_write(self, data) -> Nonedataopen(self) -> NoneOpens the transport and sets state to allow future read operations.
close(self) -> NoneCloses the transport and sets state to disallow further read operations.
is_reading(self) -> boolWhether or not reading operations are currently being performed.
boolTrue if the transport is receiving, otherwise False.pause_reading(self) -> NonePause the receiving end. No data will be passed to the protocol's `data_received()` method until `resume_reading()` is called.
resume_reading(self) -> NoneResume the receiving end. Data received will once again be passed to the protocol's `data_received()` method.
_abort(self, exception : typing.Optional[Exception]) -> NoneCloses the transport immediately and updates state to disable further read operations.
exceptiontyping.Optional[Exception] = NoneThe Exception to propagate to the protocol when aborting, if connected._safe_read(self) -> NoneSafely and asynchronously read data from the transport.
read_all(self) -> bytesRead all available data from the transport. Repeatedly calls `_read` and aggregates the results until no further data is available.
bytesAll available data from the transport.clear_read_buffer(self) -> NoneClears out all available read data without notifying the protocol. Calls `read_all` and throws away the result.
get_write_buffer_size(self) -> intCalculate the current size of the write buffer.
intThe number of bytes in the write buffer.get_write_buffer_limits(self) -> tuple[int, int]Get the high and low watermarks for write flow control.
tuple[int, int]a tuple (low, high) where low and high are positive number of bytes.set_write_buffer_limits(self, high : typing.Optional[int], low : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. These two values control when to call the protocol's `pause_writing()` and `resume_writing()` methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes `pause_writing()` to be called whenever the buffer becomes non-empty. Setting low to zero causes `resume_writing()` to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.
hightyping.Optional[int] = NoneThe maximum allowed number of bytes in the write buffer.lowtyping.Optional[int] = NoneThe minimum allowed number of bytes in the write buffer.write(self, data : typing.Union[bytes, bytearray, memoryview]) -> NoneWrite some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously.
datatyping.Union[bytes, bytearray, memoryview]The bytes to write to the Transport.can_write_eof(self) -> boolWhether or not this transport has implemented `write_eof()` method.
boolTrue if this transport supports `write_eof()`, False if not.write_eof(self) -> NoneClose the write with end-of-file after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received.
writelines(self, list_of_data : typing.Iterable[typing.Union[bytes, bytearray, memoryview]]) -> NoneWrite a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls `write()` on the result.
list_of_datatyping.Iterable[typing.Union[bytes, bytearray, memoryview]]The list of bytes to concatenate and write to the Transport.flush(self) -> NoneFlush the write buffer and disable further writing.
_safe_write(self) -> NoneAsynchronously write buffered data. This method is called back asynchronously as a writer registered with the asyncio event-loop against the underlying file descriptor for the serial port. If this method is invoked while the transport is closing, and the write-buffer is then emptied by this method, the protocol's `connection_lost()` method will be called with None as its argument.
_maybe_pause_writing(self) -> NoneTo be called whenever the write-buffer size increases. Tests the current write-buffer size against the high water mark configured for this transport. If the high water mark is exceeded, the `Protocol` is instructed to `pause_writing()`.
_maybe_resume_protocol(self) -> NoneTo be called whenever the write-buffer size decreases. Tests the current write-buffer size against the low water mark configured for this transport. If writing is currently paused and the write-buffer size is below the low water mark, the `Protocol` is instructed to `resume_writing()`.
_set_write_buffer_limits(self, low : typing.Optional[int], high : typing.Optional[int]) -> NoneSet the high- and low-water limits for write flow control. By default, the high-water limit is 4 times the high-water limit and if neither is specified, (16384, 65536).
lowtyping.Optional[int] = NoneThe low-water limit for write flow control.hightyping.Optional[int] = NoneThe high-water limit for write flow control.is_closing(self) -> boolWhether the transport is closing or closed.
boolTrue if the transport is closing or closed, False otherwise.abort(self) -> NoneClose the transport immediately.
_exception(- self,
- exception : Exception,
- message : str
Report a fatal error to the event-loop and abort the transport.
exceptionExceptionThe Exception to pass on the the loop's exception handler.messagestrHuman-readable text describing the exception's execution state, cause, etc.
Attributes
- vendor= vendor
- product= product
- _closing= True
- _protocol= protocol
- _interface_index= interface_index
- _devicetyping.Optional[usb.core.Device] = None
- read_endpointtyping.Optional[usb.core.Endpoint] = None
- write_endpointtyping.Optional[usb.core.Endpoint] = None
- _reader_event= threading.Event()
- _has_reader= False
- _has_writer= False
- _is_writing_paused= False
- _write_bufferlist[typing.Union[bytes, bytearray, memoryview]] = []
- writes_pendingbool = NoneWhether or not there is data in the write buffer waiting to be written.
- _loop= asyncio.get_event_loop_policy().get_event_loop()
- _is_closing= True
SerialDeviceManager
classDetect, filter, and get info for connected serial devices.
Methods
@classmethod
filter_kwargs(cls, kwargs : dict[str, str]) -> dict[str, str]Filter kwargs to those which are returned from `serial.tools.list_ports.comports`, i.e. the attributes of `DeviceInfo`. Supports the use of `port` as alternative name for `device`.
kwargsdict[str, str]A dictionary of key-value pairs to filter.dict[str, str]The filtered dictionary.@classmethod
get_all(cls) -> list[DeviceInfo]Get all connected devices.
list[DeviceInfo]A list of all devices detected.@classmethod
check_device_match(- cls,
- device_info : DeviceInfo,
- **kwargs
Determine if the `DeviceInfo` instance's attributes match the filter `**kwargs`
device_infoThe device to check for a match against.**kwargs= {}The key-value pairs, which will be filtered, and then used to evaluate the device for a match.boolTrue if the `DeviceInfo` matches (or no kwargs provided, or all kwargs have been filtered), else False.@classmethod
filter(cls, **kwargs) -> list[DeviceInfo]Search through all detectable devices. `**kwargs` are first filtered by `filter_kwargs` based on attrs of `DeviceInfo`. This allows the number of devices detected to be pared down based on known, stable information about the device being searched for.
**kwargs= {}Search criteria for finding a device.list[DeviceInfo]A list of devices which match all `**kwargs` provided.
Attributes
- SERIAL_SEARCH_KEYS= ['device', 'name', 'description', 'hwid', 'vid', 'pid', 'serial_number', 'location', 'manufacturer', 'product', 'interface']