Serial Autodetect
One common issue with serial-based Connectors is port-instability. When we initialize such a Connector we often provide it with the serial port that the device is connected to. In the long term the port is not a stable identifier; any time the device is connected it is possible that it will be given a different port value and that is liable to change if the device is disconnected and reconnected (unless we have some fancy udev rules set up).
How do we ensure that our connector is not only connected to the correct device but that it holds that association? We can enable autodetect!
Prerequisites:
- A working
Protocolconfigured to communicate with a device. - A data endpoint which provides a unique identifier for the device.
How it works
Autodetect relies on the fact that most devices have a unique identifier which we can programmatically request from the device. Let's take the following as an example:
import asyncio
from unitelabs.bus import Protocol, create_serial_connection, SerialCommand
class MyProtocol(Protocol):
def __init__(self):
port = "/dev/"
super().__init__(create_serial_connection, port=port)
async def get_serial_number(self) -> str:
res = await self.execute(SerialCommand("sn"))
return res
Autodetect will iterate through all connected devices and attempt to connect to it, only finalizing the connection only when the identity method returns True.
Configuring Autodetect
1. Override the identity method
Choose a function which extracts the identifier from the device and integrate it into identity:
class MyProtocol(Protocol):
...
async def identity(self, **config_kwargs) -> bool:
device_serial_number = await self.get_serial_number()
return device_serial_number == config_kwargs["serial_number"]
2. Update the Protocol init
The Protocol init also needs to be adjusted to set autodetect=True.
class MyProtocol(Protocol):
def __init__(self):
super().__init__(create_serial_connection, autodetect=True)
We no longer need to provide port directly.
3. Update the Connector
Finally we need to modify how we call open in our Connector to pass in the serial number from our .env config. In the top-level init:
from unitelabs.cdk import Connector, Config
from .io.protocol import MyProtocol
...
class MyConfig(Config):
# extend Config to expect serial_number; will raise error if no value provided in .env
serial_number: str
async def create_app():
config = MyConfig()
app = Connector(
{
"sila_server": {
"name": "Autodetected Device",
"type": "Example",
"description": "A device with autodetect enabled.",
"version": "0.1.0",
"vendor_url": "https://unitelabs.io/",
}
}
)
protocol = MyProtocol()
await protocol.open(serial_number=config.serial_number)
...
yield app
protocol.close()
``