Parallel Processing
Some devices allow multiple requests to be processed in parallel. It is common for such communication protocols to include a unique identifier which is shared between the Request-Response pair. This tutorial will go over the changes necessary to update a working protocol to allow parallel processing as well as the basic pattern of creating a custom Command to encapsulate the logic of identifying a Response's source.
Prerequisites
- A
Protocolcorrectly configured to communicate with a device. - A device which permits parallel processing, i.e. not suitable for RS-232/485 communication.
Step 1: Updating Protocol
This must first be configured in the Protocol by setting the max_parallel_commands argument, which has a default value of 1.
Adjust your protocol to pass this argument to Protocol during initialization. The value will be specific to the device and may require testing.
from unitelabs.bus import Protocol
class ParallelProtocol(Protocol):
def __init__(self, *args, **kwargs):
# configured protocol
super().__init__(
transport_factory,
*args,
max_parallel_commands=3,
**kwargs
)
Step 2: Creating a Custom Command
Let's take as our test case a device which expects messages to include a 4-digit identifier at the start and responds with the id prepended to the response value, i.e.
- Request: b"1234 request"
- Response: b"1234 response"
We can encode this behavior into a custom Command:
import random
import typing
from unitelabs.bus import Command
class IdCommand(Command[str, str]):
def __init__(
self,
message: str,
encoding: str = "utf-8",
**kwargs,
) -> None:
super().__init__(message, timeout, **kwargs)
self._encoding = encoding
self._id = str(self.make_internal_tracking_id())
def make_internal_tracking_id(self) -> str:
return f"{random.randrange(0000, 9999):04}"
@property
def id(self) -> str:
return self._id
def _serialize(self, message: str = None) -> bytes:
message = message or self.message
msg = str.encode(f"{self.id} {message}", encoding=self._encoding)
return msg
def _deserialize(self, response: bytes = None) -> str:
resp = response.decode(encoding=self._encoding).lstrip(f"{self.id} ")
return resp
IdCommand will prepend a unique 4-digit identifier to the string message and convert it to bytes to send to the device. Later it removes that identifier from the bytestring received from the device, returning a string.
Now we can update our Protocol to create a method that uses IdCommand with Protocol.execute.
import asyncio
from .id_command import IdCommand
class ParallelProtocol(Protocol):
...
async def get_request(self) -> str
cmd = IdCommand("request")
res = await protocol.execute(cmd)
print(res) # "response"
return res
Step 3: Updating a Custom Command to Enable Parallel Processing
It is not enough to have a unique identifier, IdCommand needs to be slightly modified to enable parallel processing behavior and to do this we need to customize the Command.match_response method.
When the Protocol receives data from the device it will first call match_response to identify the Command that generated the request that the response belongs to. Because consecutive processing is the default behavior of the Protocol, the default Command.match_response always returns True. This is because without parallel processing there is never ambiguity as to which Command a device response belongs to.
Let's override match_response in our custom command:
from .id_command import IdCommand
class ParallelCommand(IdCommand):
def match_response(self, data: bytes) -> bool:
msg = data.decode(encoding=self._encoding)
if not msg.startswith(self.id):
return False
return super().match_response(data)
Here match_response will only return True in the case where the id on the device response matches the ParallelCommand's id. The Protocol will then associated the response data with it's originating Command and return the results.
Step 4: Making Parallel Calls to the Device
Now to integrate this into Protocol use asyncio.gather to process multiple calls to execute in parallel. Here return_exceptions=True treats exceptions the same as successful results and stores them in the responses list.
import asyncio
from .parallel_command import ParallelCommand
class ParallelProtocol(Protocol):
...
async def get_request_many(self, number: int, request: str = "request") -> list[typing.Union[Exception, str]]:
responses = await asyncio.gather(
*[self.execute(ParallelCommand(request)) for _ in range(number)],
return_exceptions=True,
)
return responses
With this method we can test the behavior of the device when sending multiple requests in parallel, which can be helpful for configuring max_parallel_commands if the max is not specified by the device.
import asyncio
from .parallel_protocol import ParallelProtocol
protocol = ParallelProtocol()
await protocol.open()
res = await protocol.get_request_many(2) # ["response", "response"]
res = await protocol.get_request_many(3) # ["response", "response", "response"]
res = await protocol.get_request_many(4) # ["response", "response", "response", CommandExecutionError("Cannot send request. Transport is currently processing maximum number of commands.")]
protocol.close()
Remember that we set max_parallel_commands=3, so when we try to call get_request_many(4) the last value in the list of returned device responses is an Exception raised by Protocol.