Hardware Communication
The Omnibus package is a Python library that simplifies hardware and software communication by providing an abstraction layer over various communication protocols (RS-232/485, USB, TCP/IP, and more). With the Omnibus, developers can seamlessly interact with devices without needing to delve into the complexities of specific communication methods.
This tutorial will walk you through the process of setting up the Omnibus library, establishing a connection with your hardware device, and performing basic read/write operations. By the end of this tutorial, you'll be ready to integrate the Omnibus into your projects and start communicating with your hardware devices efficiently.
Prerequisites
- A hardware device capable of serial communication.
- The communication specification of that device.
- Python 3 installed on your system.
Step 1: Installing the Omnibus Library
First, you need to install the Omnibus
library. You can use any Python package manager of your choice.
Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.
poetry new my-app --src
cd my-app
Then setup and install the dependency in your current project.
poetry source add --priority=supplemental unitelabs \
https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple
poetry add --source unitelabs "unitelabs-bus[serial]"
poetry install
This will download and install the Omnibus
library, making it available for use in your Python scripts.
Omnibus
library, e.g. unitelabs-bus[usb]
.unitelabs-bus
easier to install.Step 2: Connecting Your Hardware
Connect your hardware device to your computer using the appropriate serial connection (e.g., USB-to-serial adapter). Note the COM port (on Windows) or the device path (on Linux/Mac) that the hardware is connected to. You’ll need this information to establish a connection in your Python code.
To read out all available ports, you can use our device manager with:
poetry run python -m unitelabs.bus.utils.device_manager
Step 3: Writing Your First Python Script
Now, let's write a Python script to open a serial connection and communicate with your hardware device.
- Import the Omnibus Library
Start by importing the necessary classes and methods from the Omnibus library:from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
- Initialize the Connection
Next, configure the serial connection. You need to specify the COM port (or device path), baud rate, and other parameters like timeout.protocol = Protocol( create_serial_connection, port='/dev/ttyUSB0', # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows baudrate=9600, # Set the baud rate to match your device timeout=1, # Set a timeout for read operations ) await protocol.open()
In order for the protocol to establish a connection to the device, you need to call theopen()
method. - Send Data to the Device
To transmit data to your hardware device, you can use theexecute()
method.protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
This line sends the binary data b"Hello, Device!\r\n" to your serial device. - Receive Data from the Device
Theexecute()
method is also responsible for returning the corresponding response sent from the device. To read data sent from the device, await the return of theexecute()
method. This reads data from the serial port until a newline character is encountered:response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n")) print('Received:', response)
Theexecute()
method now waits until it receives a response or until it times out. If it reads a line of data from the device then it prints it to the console. - Close the Connection
After communication is complete, it’s good practice to close the connection to free the resources:protocol.close()
Step 4: Running the Script
In order to run asynchronous code, you need to wrap it in an async
method and call that via asyncio.run()
:
import asyncio
from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
async def main():
protocol = Protocol(
create_serial_connection,
port='/dev/ttyUSB0', # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
baudrate=9600, # Set the baud rate to match your device
timeout=1, # Set a timeout for read operations
)
await protocol.open()
response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
print('Received:', response)
protocol.close()
if __name__ == "__main__":
asyncio.run(main())
Save your script (e.g., as src/my_app/__main__.py
) and run it from your terminal:
poetry run python -m my_app
If everything is set up correctly, the script will open the serial connection, send a message to your hardware device, receive a response, and print it to the console.
Step 5: Troubleshooting Common Issues
Here are a few tips if you run into issues:
- Port not found: Double-check the port name and ensure your device is properly connected.
- Incorrect baud rate: Ensure that the baud rate in your script matches the baud rate configured on your hardware.
- Timeouts: If you’re not receiving data, increase the timeout parameter or check the connection.
To receive a more verbose output, you can increase the log level. Add the following at the top of your script:
import logging
logging.basicConfig(level=logging.DEBUG)
Step 6: Expanding Your Communication
Now that you've mastered the basics of using the Omnibus library to establish a connection and perform simple read/write operations, it's time to explore more advanced use cases. The Omnibus library is designed with flexibility in mind, allowing you to extend its functionality to suit specific communication requirements or device protocols.
1. Creating Custom Protocols
In many cases, you want to create a custom protocol to encapsulate specific communication patterns or device commands. A protocol in Omnibus bundles all the functions your hardware device offers, presenting them in a human-readable and easy-to-use way.
Example: Creating a Custom Protocol
Let’s start by creating a custom protocol class that inherits from Protocol. This class can encapsulate common commands and responses your device supports:
class MyCustomProtocol(Protocol):
def __init__(self, port: str) -> None:
super().__init__(create_serial_connection, port=port, baudrate=9600, timeout=1)
async def hello(self) -> str:
return await self.execute(SerialCommand("HELLO"))
async def ping(self) -> str:
return await self.execute(SerialCommand("PING"))
In this example, MyCustomProtocol
defines two methods, hello
and ping
, that communicate with the device using specific commands. This setup makes it easy to call these methods without worrying about the underlying serial communication details.
2. Customizing Commands with CommandBuilder
The Omnibus comes with some basic Command subclasses predefined: ByteCommand
and SerialCommand
. We can readily modify these classes to adjust their behavior to our needs using the CommandBuilder
.
Before we can get into customizing a Command, we first need to understand the basic parts of a command and how it interacts with the Protocol.
- serializer - the function to convert the input type into bytes when generating a Request, which is sent through
Protocol.execute
to the device. - deserializer - the function to convert the Response bytes received from the device within the
Protocol.execute
call to the output type. - parser - one or more optional functions to serially apply to the deserialized data for further data processing before the result is returned from
Protocol.execute
The builder pattern allows us to build on a basic Command subclass to customize its functionality by making changes to the aforementioned parts as well as the initialization arguments of the Command.
from unitelabs.bus import CommandBuilder
builder = CommandBuilder()
cmd = builder.build(b"HELLO")
cmd2 = builder.with_serializer(lambda x:x +b"\r\n").build(b"HELLO")
By default the CommandBuilder
uses ByteCommand
as its base. Calling build directly after initializing CommandBuilder is functionally equivalent to creating an instance of ByteCommand
.
The result here is that serialized payload of cmd
is b"HELLO"
and for cmd2
the serialized payload is b"HELLO\r\n"
.
CommandBuilder Methods
CommandBuilder offers methods which may be chained together or called individually to create intermediate builders, which we will group here based on their usage.
with_serializer(serializer_function, **function_kwargs)
for changing the serialization functionwith_deserializer(deserializer_function, **function_kwargs)
for changing the deserialization functionwith_parser(parser_function, **function_kwargs)
for adding parsers functions
This first set of methods accept functions as arguments and will preload additional arguments into the provided function:
def complex_serializer(message: str, sub_message: str, is_complex: bool) -> bytes:
return message.encode("ascii") if not is_complex else (message + sub_message).encode("ascii")
cmd = CommandBuilder().with_serializer(complex_serializer, sub_message=" world", is_complex=True).build("hello")
Code explanation
If we inspect the contents of the request at cmd.request.payload
we will see that our serialized payload is b"hello world"!
These methods will alter the function signature to additionally bind the function to the Command instance:
def use_protocol_logger(self, data: bytes) -> None:
if b"goodbye" in data:
self.receiver.logger.info("Device says goodbye.")
cmd = CommandBuilder().with_parser(use_protocol_logger).build(b"hello")
Code explanation
Commands have access to the Protocol
instance they are executed in through the receiver
attribute, therefore we can use the instance-binding of the CommandBuilder to bind functions which interact with the Protocol.
CommandBuilder also allows us to set global init args for our Commands:
with_timeout(timeout)
for setting the timeout (i.e. how long the command can attempt to run through Protocol.execute before throwing an error)without_response()
for marking a Command which does not expect a response from the device
These methods set the timeout
and is_void
args during Command
initialization. The values can also be supplied when calling build(message, timeout, is_void)
. Use of both with_
and its associated arg in build
will preferentially use the build arg.
Finally we have the use-case dependent method with_multiline
which is designed specifically for handling edge cases and bad behavior in serial communication. For more information, see the Serial Troubleshooting Guide
Conclusion
This tutorial introduced you to the basics of using the Omnibus library for hardware communication in Python. With these foundational steps, you can start building sophisticated applications that interact with various serial-enabled devices. Whether you're working on IoT projects, embedded systems, or data acquisition, Omnibus provides a flexible and robust framework to meet your needs. Happy coding!