Walkthrough
Once you have installed the framework and created your first project with the connector-factory
, you're ready to go.
Connector-factory Output
For this example, we will use the serial
communication type.
First we can check out the starter protocol generated by the connector-factory
, located in the io
directory.
└── awesome-instrument
└── src/unitelabs/awesome_instrument
└── io
└── awesome_instrument_protocol.py
Note
The file and directory names will be based on the inputs provided to connector-factory
and will not look exactly as above, but will still be in the same location.
The file contents should look as follows:
from unitelabs.bus import Protocol, create_serial_connection
class AwesomeInstrumentProtocol(Protocol):
def __init__(self, **kwargs):
kwargs["port"] = "/dev/ttyUSB0" # FIXME: set device port
super().__init__(create_serial_connection, **kwargs)
Code explanation
- Here we see that a transport factory function
create_serial_connection
has been passed into theProtocol
constructor. - A transport factory creates a
Transport
that establishes a connection to the device whenProtocol.open()
is called and requires arguments to be passed in via**kwargs
in theProtocol
constructor. For serial communication thecreate_serial_connection
transport factory requiresport
to be specified.
Protocol Configuration
As mentioned above, each transport factory requires that certain arguments be provided in the Protocol.__init__
. At this point in time if we run
poetry run connector start -v
A SerialException
is raised informing us that connection to the specified serial device could not be established.
For serial and USB communication we can use the DeviceManager
to see a listing of all the available devices by calling
DeviceManager
.
Not supported on Windows machines.poetry run python -m unitelabs.bus.utils.device_manager
This returns information about connected devices. Here the connected device allows both serial and USB communication.
[
{
"device": "/dev/cu.usbmodem142401",
"name": "cu.usbmodem142401",
"description": "Board in FS mode",
"hwid": "USB VID:PID=CAFE:0001 SER=e66180109f6f8025 LOCATION=20-2.4",
"vid": 51966,
"pid": 1,
"serial_number": "e66180109f6f8025",
"location": "20-2.4",
"manufacturer": "MicroPython",
"product": "Board in FS mode"
}
]
The device
here is the serial port value we need in order to configure our transport factory.
from unitelabs.bus import Protocol, create_serial_connection
class AwesomeInstrumentProtocol(Protocol):
def __init__(self, **kwargs):
kwargs["port"] = "/dev/cu.usbmodem142401"
super().__init__(create_serial_connection, **kwargs)
With the port value set we should be able to successfully run the connector start
command and start our SiLA server. It should be noted, however, that serial ports are likely to change (unless you have set up some udev rules on your computer), meaning that this will not be a stable connection.
We can now test our basic set up by starting the connector:
poetry run connector start -v
With our Protocol configured, it's time to start communicating with the device. You can read more about the Basics of Hardware Communication Here.
Protocol Method Definition
To get started we'll create our first Protocol method using a basic ByteCommand
.
Assuming a device that sends either a response of b"ok"
or b"not ok"
to the request b"status"
:
First, we'll create an exception, which is displayed in the SiLA client with a description based on the class's docstring.
class NotOkException(Exception):
"""The device is not ok."""
Read More about error handling here!
import typing
from unitelabs.bus import ByteCommand, Protocol, create_serial_connection
from .errors import NotOkException
class AwesomeInstrumentProtocol(Protocol):
def __init__(self, **kwargs):
kwargs["port"] = "/dev/cu.usbmodem142401"
super().__init__(create_serial_connection, **kwargs)
async def get_status(self, timeout: typing.Optional[float] = None) -> bytes:
"""
Get the device's status.
Args:
timeout: Amount of time in seconds to wait for a response, or indefinitely if None.
Raises:
NotOkException: If the device responds with b"not ok".
Returns:
b"ok" if everything is ok
"""
command = ByteCommand(b"status", timeout)
response = await self.execute(command)
self.logger.info(f"status: {response}")
if response == b"not ok":
raise NotOkException("Everything is not ok.")
return response
With this method added to our Protocol we can now call get_status()
and respond to the data received from the device.
Note
If the device does not recognize a request bytestring, no response will be sent. By default if no timeout
for a Command
is set, the Protocol
will wait indefinitely for a response.
Creating a SiLA Feature
This next step is to create a SiLA Feature with a method for accessing our status information.
import typing
from unitelabs.awesome_instrument.io.errors import NotOkException
from unitelabs.cdk import sila
if typing.TYPE_CHECKING:
from ...io.awesome_instrument_protocol import AwesomeInstrumentProtocol
class DeviceController(sila.Feature):
"""
Controls the device and provides functions to monitor its state.
"""
def __init__(self, protocol: "AwesomeInstrumentProtocol"):
super().__init__(
originator="org.silastandard",
category="examples",
version="0.1",
maturity_level="Draft",
)
self.protocol = protocol
The class docstring of DeviceController
will become the feature description. Originator, category, version, and maturity level are all required arguments that must be passed to the super().__init__(...)
call. This basic information is defined by the SiLA 2 standard and provides SiLA clients the basic feature information.
Next we will add an UnobservableProperty
to the Feature which calls the underlying AwesomeInstrumentProtocol.get_status
method from our Protocol.
@sila.UnobservableProperty(errors=[NotOkException])
async def get_status(self) -> bytes:
"""
Description of the property, viewable from the SiLA client.
"""
return await self.protocol.get_status(timeout=5)
Code explanation
Generally speaking, most of the business logic of a connector will be contained within the Protocol. When writing a SiLA Feature, the primary concern is improving the input parameterization and output structures for users, e.g. with sila.CustomDataType
s to humanize data inputs and outputs from the device.
Integrate SiLA Feature
With a new feature defined, we'll need to register this in on our SiLA server in the project __init__.py
from importlib.metadata import version
from unitelabs.cdk import Connector
from .features.device_controller.device_controller import DeviceController
from .io.awesome_instrument_protocol import AwesomeInstrumentProtocol
__version__ = version("unitelabs-awesome-instrument")
async def create_app():
"""Creates the connector application"""
app = Connector(
{
"sila_server": {
"name": "AwesomeInstrument",
"type": "Example",
"description": "A connector for the Awesome Instrument built with the UniteLabs CDK.",
"version": str(__version__),
"vendor_url": "https://unitelabs.io/",
}
}
)
protocol = AwesomeInstrumentProtocol()
await protocol.open()
app.register(DeviceController(protocol))
yield app
protocol.close()
Calling app.register
with an instance of our new SiLA Feature, we should now be able to see our feature and interact with it on the SiLA Browser.
poetry run connector start -v
Simulation Mode
Best practices include the development of a Simulation Mode that mocks device communication and allows the testing of the UI without access to a physical device.
SimulationControllerBase
.