UniteLabs
Tutorial

Hardware Communication

Getting Started with the UniteLabs Omnibus Library for Hardware Communication
Experimental API: This API is still under development and therefore subject to change without notice.

The Omnibus package is a Python library that simplifies hardware and software communication by providing an abstraction layer over various communication protocols (RS-232/485, USB, TCP/IP, and more). With the Omnibus, developers can seamlessly interact with devices without needing to delve into the complexities of specific communication methods.

This tutorial will walk you through the process of setting up the Omnibus library, establishing a connection with your hardware device, and performing basic read/write operations. By the end of this tutorial, you'll be ready to integrate the Omnibus into your projects and start communicating with your hardware devices efficiently.

Prerequisites

  • A hardware device capable of serial communication.
  • The communication specification of that device.
  • Python 3 installed on your system.

Step 1: Installing the Omnibus Library

First, you need to install the Omnibus library. You can use any Python package manager of your choice.

Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.

Terminal
poetry new my-app --src
cd my-app

Then setup and install the dependency in your current project.

Terminal
poetry source add --priority=supplemental unitelabs \
  https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple
poetry add --source unitelabs "unitelabs-bus[serial]"
poetry install

Open your terminal or command prompt and run the following commands. If you start from scratch, create a new project first.

Terminal
mkdir my-app
cd my-app
python -m venv .venv
.venv/bin/pip install -U pip

Then setup and install the dependency in your current project.

Terminal
.venv/bin/pip install "unitelabs-bus[serial]" \
  --index-url https://gitlab.com/api/v4/groups/1009252/-/packages/pypi/simple

This will download and install the Omnibus library, making it available for use in your Python scripts.

Note: If your device supports other protocols than serial, install the respective extra with the Omnibus library, e.g. unitelabs-bus[usb].
Coming Soon: Official PyPi distribution to make unitelabs-bus easier to install.

Step 2: Connecting Your Hardware

Connect your hardware device to your computer using the appropriate serial connection (e.g., USB-to-serial adapter). Note the COM port (on Windows) or the device path (on Linux/Mac) that the hardware is connected to. You’ll need this information to establish a connection in your Python code.

To read out all available ports, you can use our device manager with:

poetry run python -m unitelabs.bus.utils.device_manager
Coming Soon: We will provide a fully fledged Command Line Interface to easily identify connected devices and even interact with them.

Step 3: Writing Your First Python Script

Now, let's write a Python script to open a serial connection and communicate with your hardware device.

  1. Import the Omnibus Library
    Start by importing the necessary classes and methods from the Omnibus library:
    from unitelabs.bus import Protocol, create_serial_connection, ByteCommand
    
  2. Initialize the Connection
    Next, configure the serial connection. You need to specify the COM port (or device path), baud rate, and other parameters like timeout.
    protocol = Protocol(
        create_serial_connection,
        port='/dev/ttyUSB0',    # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
        baudrate=9600,          # Set the baud rate to match your device
        timeout=1,              # Set a timeout for read operations
    )
    await protocol.open()
    

    In order for the protocol to establish a connection to the device, you need to call the open() method.
  3. Send Data to the Device
    To transmit data to your hardware device, you can use the execute() method.
    protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    

    This line sends the binary data b"Hello, Device!\r\n" to your serial device.
  4. Receive Data from the Device
    The execute() method is also responsible for returning the corresponding response sent from the device. To read data sent from the device, await the return of the execute() method. This reads data from the serial port until a newline character is encountered:
    response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    print('Received:', response)
    

    The execute() method now waits until it receives a response or until it times out. If it reads a line of data from the device then it prints it to the console.
  5. Close the Connection
    After communication is complete, it’s good practice to close the connection to free the resources:
    protocol.close()
    

Step 4: Running the Script

In order to run asynchronous code, you need to wrap it in an async method and call that via asyncio.run():

src/my_app/__main__.py
import asyncio

from unitelabs.bus import Protocol, create_serial_connection, ByteCommand


async def main():
    protocol = Protocol(
        create_serial_connection,
        port='/dev/ttyUSB0',    # Replace with your port name, e.g. '/dev/cu.usbmodem3210' on macOS, 'COM3' on Windows
        baudrate=9600,          # Set the baud rate to match your device
        timeout=1,              # Set a timeout for read operations
    )
    await protocol.open()

    response = await protocol.execute(ByteCommand(b"Hello, Device!\r\n"))
    print('Received:', response)

    protocol.close()


if __name__ == "__main__":
    asyncio.run(main())

Save your script (e.g., as src/my_app/__main__.py) and run it from your terminal:

poetry run python -m my_app

If everything is set up correctly, the script will open the serial connection, send a message to your hardware device, receive a response, and print it to the console.

Step 5: Troubleshooting Common Issues

Here are a few tips if you run into issues:

  • Port not found: Double-check the port name and ensure your device is properly connected.
  • Incorrect baud rate: Ensure that the baud rate in your script matches the baud rate configured on your hardware.
  • Timeouts: If you’re not receiving data, increase the timeout parameter or check the connection.

To receive a more verbose output, you can increase the log level. Add the following at the top of your script:

import logging

logging.basicConfig(level=logging.DEBUG)

Step 6: Expanding Your Communication

Now that you've mastered the basics of using the Omnibus library to establish a connection and perform simple read/write operations, it's time to explore more advanced use cases. The Omnibus library is designed with flexibility in mind, allowing you to extend its functionality to suit specific communication requirements or device protocols.

1. Creating Custom Protocols

In many cases, you want to create a custom protocol to encapsulate specific communication patterns or device commands. A protocol in Omnibus bundles all the functions your hardware device offers, presenting them in a human-readable and easy-to-use way.

Example: Creating a Custom Protocol

Let’s start by creating a custom protocol class that inherits from Protocol. This class can encapsulate common commands and responses your device supports:

class MyCustomProtocol(Protocol):
    def __init__(self, port: str) -> None:
        super().__init__(create_serial_connection, port=port, baudrate=9600, timeout=1)

    async def hello(self) -> str:
        return await self.execute(SerialCommand("HELLO"))

    async def ping(self) -> str:
        return await self.execute(SerialCommand("PING"))

In this example, MyCustomProtocol defines two methods, hello and ping, that communicate with the device using specific commands. This setup makes it easy to call these methods without worrying about the underlying serial communication details.

2. Customizing Commands with CommandBuilder

The Omnibus comes with some basic Command subclasses predefined: ByteCommand and SerialCommand. We can readily modify these classes to adjust their behavior to our needs using the CommandBuilder.

Before we can get into customizing a Command, we first need to understand the basic parts of a command and how it interacts with the Protocol.

  • serializer - the function to convert the input type into bytes when generating a Request, which is sent through Protocol.execute to the device.
  • deserializer - the function to convert the Response bytes received from the device within the Protocol.execute call to the output type.
  • parser - one or more optional functions to serially apply to the deserialized data for further data processing before the result is returned from Protocol.execute

The builder pattern allows us to build on a basic Command subclass to customize its functionality by making changes to the aforementioned parts as well as the initialization arguments of the Command.

from unitelabs.bus import CommandBuilder

builder = CommandBuilder()
cmd = builder.build(b"HELLO")
cmd2 = builder.with_serializer(lambda x:x +b"\r\n").build(b"HELLO")

By default the CommandBuilder uses ByteCommand as its base. Calling build directly after initializing CommandBuilder is functionally equivalent to creating an instance of ByteCommand.

The result here is that serialized payload of cmd is b"HELLO" and for cmd2 the serialized payload is b"HELLO\r\n".

CommandBuilder Methods

CommandBuilder offers methods which may be chained together or called individually to create intermediate builders, which we will group here based on their usage.

  • with_serializer(serializer_function, **function_kwargs) for changing the serialization function
  • with_deserializer(deserializer_function, **function_kwargs) for changing the deserialization function
  • with_parser(parser_function, **function_kwargs) for adding parsers functions

This first set of methods accept functions as arguments and will preload additional arguments into the provided function:

def complex_serializer(message: str, sub_message: str, is_complex: bool) -> bytes:
    return message.encode("ascii") if not is_complex else (message + sub_message).encode("ascii")

cmd = CommandBuilder().with_serializer(complex_serializer, sub_message=" world", is_complex=True).build("hello")

Code explanation

If we inspect the contents of the request at cmd.request.payload we will see that our serialized payload is b"hello world"!

These methods will alter the function signature to additionally bind the function to the Command instance:

def use_protocol_logger(self, data: bytes) -> None:
    if b"goodbye" in data:
        self.receiver.logger.info("Device says goodbye.")
cmd = CommandBuilder().with_parser(use_protocol_logger).build(b"hello")

Code explanation

Commands have access to the Protocol instance they are executed in through the receiver attribute, therefore we can use the instance-binding of the CommandBuilder to bind functions which interact with the Protocol.

CommandBuilder also allows us to set global init args for our Commands:

  • with_timeout(timeout) for setting the timeout (i.e. how long the command can attempt to run through Protocol.execute before throwing an error)
  • without_response() for marking a Command which does not expect a response from the device

These methods set the timeout and is_void args during Command initialization. The values can also be supplied when calling build(message, timeout, is_void). Use of both with_ and its associated arg in build will preferentially use the build arg.

Finally we have the use-case dependent method with_multiline which is designed specifically for handling edge cases and bad behavior in serial communication. For more information, see the Serial Troubleshooting Guide

Coming Soon: We are already working on guides about reconnect and retry behavior, and other edge cases.

Conclusion

This tutorial introduced you to the basics of using the Omnibus library for hardware communication in Python. With these foundational steps, you can start building sophisticated applications that interact with various serial-enabled devices. Whether you're working on IoT projects, embedded systems, or data acquisition, Omnibus provides a flexible and robust framework to meet your needs. Happy coding!


Copyright © 2025