Subscriptions
Tracking certain pieces of information and keeping them continuously available and up-to-date is a common requirement. When such data is needed in multiple parts of a connector, it often leads to complex and repetitive propagation logic. To simplify this, we provide two built-in classes within the subscription module - Subject
and Publisher
- which offer rich functionality for managing and distributing data. This guide covers their core features and aims to make working with subscribable endpoints straightforward.
Let us consider how we might implement a logging endpoint which informs its subscribers when certain methods have been called.
import dataclasses
import datetime
from unitelabs.cdk import Subject, sila
@dataclasses.dataclass
class Log(sila.CustomDataType):
"""
Log entry.
.. parameter:: The method that was called.
.. parameter:: When the log was written.
"""
method: str
when: datetime.datetime
class LoggedFeature(sila.Feature):
def __init__(self):
super().__init__()
self.log_subject = Subject[Log](maxsize=10)
@sila.ObservableProperty()
async def subscribe_logs(self) -> sila.Stream[Log]:
return self.log_subject.subscribe()
@sila.UnobservableCommand()
async def perform_action(self) -> None:
# perform action
self.log_subject.update(
Log(
method="perform_action",
when=datetime.datetime.now()
))
Listing 1: Implementing a basic Subject
and using the Subscription
as a sila.Stream
.
First, we define a sila.CustomDataType
to structure our log message data and a Subject[Log]
to hold and distribute our Log
values. Then we define our ObservableProperty
and use Subject.subscribe
to create a Subscription
that listens for new Log
values put into the Subject
.
We then create a method perform_action
which we would like to track. It calls Subject.update
with an instance of the Log
dataclass to set a new Log
value on the Subject
. If the Subject
has any subscribers, it will notify them of the new value.
Subject[Log]
, the bracketed value [Log]
indicates the type of values that can be passed into Subject.update
.
Here log_subject
has an implicit type of Subject[Log, Log]
, meaning that Log
is also the type that it provides to subscribers. We will dig more into this later when we talk about pipes and filtersSubscription Values
We've seen that we can use a Subscription
directly as a sila.Stream
, but what if we want to perform additional operations on the values recieved from the Subject
?
Here we can use Subject
's built-in context-manager which will create a new Subscription
that is automatically unsubscribed from the Subject
when the property/method is terminated:
class LoggedFeature(sila.Feature):
...
@sila.ObservableProperty()
async def subscribe_logged_methods(self) -> sila.Stream[str]:
"""Subscribe to observe changes to the logged method."""
with self.log_subject as subscription:
async for log in subscription:
yield log.method
Listing 2: Using the Subject
s context-manager to access Subscription
values.
From a user's perspective, when the client stops listening to further updates from the subscribe_logged_methods
endpoint, i.e. the user cancels their subscription, the gRPC call will be cancelled and the Subscription
gracefully cancelled.
Subject
as a context-manager means that we do not have to unsubscribe our Subscription
from the Subject
as this is done by the context-manager as it closes.Subscription Operations
Transforming data
A Subject
can produce child-subjects which receive updates from their parent and apply a static operation to all values recieved from the parent subject.
A simplistic example of this:
from unitelabs.cdk import Subject, sila
class NumberProvider(sila.Feature):
def __init__(self):
super().__init__()
self.single = Subject[int](maxsize=10)
self.double = self.single.pipe(lambda x: x * 2)
@sila.ObservableProperty()
async def subscribe_single_value(self) -> sila.Stream[int]:
"""Subscribe to observe changes to the set value."""
return self.single.subscribe()
@sila.ObservableProperty()
async def subscribe_double_value(self) -> sila.Stream[int]:
"""Subscribe to observes changes to the doubled value."""
return self.double.subscribe()
@sila.UnobservableCommand()
async def set_value(self, value: int) -> None:
"""Update the value of any currently running subscriptions."""
self.single.update(value)
Listing 3: Creating and subscribing to Subject
s with static data transformations via Subject.pipe
.
Here, we create a Subject
that is updated by the user with the set_value
endpoint. Calling set_value
will update the value on single
, which will update its child-subject, double
, with that same value. Any value x
received by double
will automatically have the transformation x * 2
applied to it before it is yielded to subscribers.
Typing clarification: When declaring a Subject
we generally must only declare a single type. The second type we see in the return signature is the type which is sent to Subscription
s of the Subject
, which defaults to the input value type. When a Subject
is instantiated with a pipe operation, it is required to additionally declare this second type, which would be the return type of the pipe function.
def stringify(value: int) → str:
return str(value)
subject = Subject[int, str](maxsize=10, pipe=stringify)
Listing 4: Instantiating a Subject
with a pipe
function to immediately apply a data transformation.
If the pipe
is added to a Subject[T]
after initiation, it will accept a callable with the signature (T) -> Any
, e.g. a Subject[int]
will accept a callable with the signature (int) -> Any
.
Finally, multiple pipes can be chained together, with the type of the return value from the previous pipe being the required parameter type for the subsequent pipe, i.e.
dict_subject: Subject[dict[str, float], dict[str, float]] = Subject[dict[str, float]](maxsize=10)
float_subject: Subject[float, int] = dict_subject.pipe(lambda x: x.get("value", -1)).pipe(lambda x: int(x))
Listing 5: Chaining pipes together to apply multiple data transformations.
Filtering data
We can also apply filters to our Subject
such that its Subscription
s and child-Subject
s are conditionally notified using the Subject.filter
method.
numbers = Subject[int](maxsize=10)
big_numbers = numbers.filter(lambda x: x > 100)
big_numbers_made_small = big_numbers.pipe(lambda x: x / 100)
Listing 6: Filtering Subject
data to prevent its propagation to Subscription
s with Subject.filter
.
A filter is essentially a special pipe which only propagates data to Subject
s subscribers when the condition of the filter function is met. Here calling numbers.update(100)
would notify a subscriber of numbers
, but not a subscriber of big_numbers
or its child big_numbers_made_small
.
Publisher: a Self-updating Subject
The CDK additionally provides a subclass of Subject
, called a Publisher
, which will update its value by calling a provided source
function at a given interval
. Publisher
s are ideal for sensor data and other data sources which can be stably represented with a callable.
import random
from unitelabs.cdk import Publisher, sila
def get_random_number() -> int:
return random.randint(0, 42)
class RandomNumberProvider(sila.Feature):
def __init__(self):
super().__init__()
self.number_publisher = Publisher[int](source=get_random_number, interval=1)
@sila.ObservableProperty()
async def subscribe_random_number(self) -> sila.Stream[int]:
"""Subscribe to observe changes to the random number."""
return self.number_publisher.subscribe()
Listing 7: Implementing a self-updating observable with Publisher
.
When we subscribe to a Publisher
it will create a background task that calls the source
function every interval
seconds. Publisher
s smartly manage resources such that only a single background task is created to notify multiple subscribers. Only when all subscribers, including subscribers on child Subject
s, i.e. those created from Publisher.pipe
and Publisher.filter
, have unsubscribed from the Publisher
will it cancel its background task to free resources.
A new Subscription
of a Subject
will wait for the next call to update
for its first value. In comparison, a Publisher
assumes that its most recently seen value is still relevant information for its subscribers and immediately adds this value to the Subscription
queue.
Advanced Usage
Resource Management
Whenever pipe
or filter
is called, a new child-subject is created. By design, Subject
s and Publisher
s notify all child-subjects in their heirarchy. This can mean that if we are creating many child-subjects via pipe
and filter
we may eventually begin to see our Subscription
s slowing down due to unneccessary resource allocation.
Consider the following scenario:
from unitelabs.cdk import Publisher, sila
value = 0
def get_next_value() -> int:
global value
value += 1
return value
class GlobalCounterProvider(sila.Feature):
def __init__(self):
super().__init__()
self.counter = Publisher[int](source=get_next_value)
@sila.ObservableProperty()
async def subscribe_even_numbers(self) -> sila.Stream[int]:
"""Subscribe to observe changes to the counter, when the new number is an even number."""
return self.counter.filter(lambda x: x % 2 == 0).subscribe()
Listing 8: Misuse of the filter
and pipe
methods resulting in unneccessary resource allocation.
Now, if we were to call subscribe_even_numbers
multiple times, every call would create a fresh child-subject. Even though the Subscription
is properly unsubscribed, the child-subject remains in the Subject
s known children. Subsequent calls to the endpoint will get slower as the counter
propagates values to a growing list of child-subjects.
This could be solved in the manner previously described, by creating a stable child-subject that lives in the __init__
method of the Feature
, as demonstrated in Listing 3. However, this is not always practical, especially when the child-subjects are created dynamically based on user input or other runtime conditions.
A small change in how we use pipe
and filter
can ensure that these child-Subject
s are removed from a Subject
or Publisher
s children when they are no longer being subscribed to.
...
class GlobalCounterProvider(sila.Feature):
...
@sila.ObservableProperty()
def subscribe_even_numbers(self) -> sila.Stream[int]:
"""Subscribe to observe changes to the counter, when the new number is an even number."""
return self.counter.filter(lambda x: x % 2 == 0, temporary=True).subscribe()
Listing 9: Creating a temporary child-subject to ensure proper disposal after unsubscribe.
By calling pipe
and filter
with temporary=True
we mark these child-subjects for removal once their dependent Subscription
s have been cancelled.
Note: Trying to create a non-temporary child Subject
s of a temporary Subject
will raise an error at runtime.
temp = subject.pipe(method1, temporary=True).pipe(method2) # raises
temp = subject.pipe(method1, temporary=True).pipe(method2, temporary=True) # ok
Listing 10: Creating a non-temporary Subject
from a temporary Subject
will raise a RuntimeError
.
Subscription Queue
We can also use the Subscription
as an asyncio.Queue
. You may have noticed that the constructor of Subject
and Publisher
contain the argument maxsize
. This refers to the maximum number of entries that can be held in any Subscription
queues created with Subject.subscribe
.
We can operate on the underlying asyncio.Queue
using the Subject.get
method.
from unitelabs.cdk import Subject, sila
class SteppedActionError(Exception):
pass
class LoggedFeature(sila.Feature):
...
@sila.UnobservableCommand()
async def execute_method(self) -> None:
# execute method
self.log_subject.update(
Log(
method="execute_method",
when=datetime.datetime.now(),
))
@sila.UnobservableCommand(errors=[SteppedActionError])
async def multi_step_action(self) -> None:
subscription = self.log_subject.subscribe()
try:
await self.perform_action()
log_item = await subscription.get(
predicate=lambda x: x.method == "perform_action",
timeout=1.0,
)
await self.execute_method()
log_item = await subscription.get(
predicate=lambda x: x.method == "execute_method",
timeout=1.0,
)
except TimeoutError as e:
raise SteppedActionError() from e
finally:
self.log_subject.unsubscribe(subscription)
Listing 11: Using a Subscription
as an asyncio.Queue
.
Here we have a method multi_step_action
which expects certain Log
entries to be added to the Subscription
and checks that those Log
s were observed before moving on. By calling Subject.get
with timeout 1.0, we say that we expect a Log
entry matching our predicate
within 1.0 seconds.
Subscription
in a try-finally loop. This pattern ensures that the subscription is always properly disposed of, i.e. unsubscribed, when the subscription is cancelled. Improper handling of Subscription
s can result in decreased performance for long-running processes.