UniteLabs
Guides

Subscriptions

Creating and managing continuously updating data streams, pipelines and filters.

Tracking certain pieces of information and keeping them continuously available and up-to-date is a common requirement. When such data is needed in multiple parts of a connector, it often leads to complex and repetitive propagation logic. To simplify this, we provide two built-in classes within the subscription module - Subject and Publisher - which offer rich functionality for managing and distributing data. This guide covers their core features and aims to make working with subscribable endpoints straightforward.

Let us consider how we might implement a logging endpoint which informs its subscribers when certain methods have been called.

logged_feature.py
import dataclasses
import datetime

from unitelabs.cdk import Subject, sila


@dataclasses.dataclass
class Log(sila.CustomDataType):
    """
    Log entry.

    .. parameter:: The method that was called.
    .. parameter:: When the log was written.
    """

    method: str
    when: datetime.datetime

class LoggedFeature(sila.Feature):
    def __init__(self):
        super().__init__()
        self.log_subject = Subject[Log](maxsize=10)

    @sila.ObservableProperty()
    async def subscribe_logs(self) -> sila.Stream[Log]:
        return self.log_subject.subscribe()

    @sila.UnobservableCommand()
    async def perform_action(self) -> None:
        # perform action
        self.log_subject.update(
            Log(
                method="perform_action",
                when=datetime.datetime.now()
        ))

Listing 1: Implementing a basic Subject and using the Subscription as a sila.Stream.

First, we define a sila.CustomDataType to structure our log message data and a Subject[Log] to hold and distribute our Log values. Then we define our ObservableProperty and use Subject.subscribe to create a Subscription that listens for new Log values put into the Subject.

We then create a method perform_action which we would like to track. It calls Subject.update with an instance of the Log dataclass to set a new Log value on the Subject. If the Subject has any subscribers, it will notify them of the new value.

Technical Hint: In Subject[Log], the bracketed value [Log] indicates the type of values that can be passed into Subject.update. Here log_subject has an implicit type of Subject[Log, Log], meaning that Log is also the type that it provides to subscribers. We will dig more into this later when we talk about pipes and filters

Subscription Values

We've seen that we can use a Subscription directly as a sila.Stream, but what if we want to perform additional operations on the values recieved from the Subject?

Here we can use Subject's built-in context-manager which will create a new Subscription that is automatically unsubscribed from the Subject when the property/method is terminated:

logged_feature.py
class LoggedFeature(sila.Feature):
    ...
    @sila.ObservableProperty()
    async def subscribe_logged_methods(self) -> sila.Stream[str]:
        """Subscribe to observe changes to the logged method."""
        with self.log_subject as subscription:
            async for log in subscription:
                yield log.method

Listing 2: Using the Subjects context-manager to access Subscription values.

From a user's perspective, when the client stops listening to further updates from the subscribe_logged_methods endpoint, i.e. the user cancels their subscription, the gRPC call will be cancelled and the Subscription gracefully cancelled.

Technical Hint: Here the use of the Subject as a context-manager means that we do not have to unsubscribe our Subscription from the Subject as this is done by the context-manager as it closes.

Subscription Operations

Transforming data

A Subject can produce child-subjects which receive updates from their parent and apply a static operation to all values recieved from the parent subject.

A simplistic example of this:

number_provider.py
from unitelabs.cdk import Subject, sila


class NumberProvider(sila.Feature):
    def __init__(self):
        super().__init__()
        self.single = Subject[int](maxsize=10)
        self.double = self.single.pipe(lambda x: x * 2)

    @sila.ObservableProperty()
    async def subscribe_single_value(self) -> sila.Stream[int]:
        """Subscribe to observe changes to the set value."""
        return self.single.subscribe()

    @sila.ObservableProperty()
    async def subscribe_double_value(self) -> sila.Stream[int]:
        """Subscribe to observes changes to the doubled value."""
        return self.double.subscribe()

    @sila.UnobservableCommand()
    async def set_value(self, value: int) -> None:
        """Update the value of any currently running subscriptions."""
        self.single.update(value)

Listing 3: Creating and subscribing to Subjects with static data transformations via Subject.pipe.

Here, we create a Subject that is updated by the user with the set_value endpoint. Calling set_value will update the value on single, which will update its child-subject, double, with that same value. Any value x received by double will automatically have the transformation x * 2 applied to it before it is yielded to subscribers.

Typing clarification: When declaring a Subject we generally must only declare a single type. The second type we see in the return signature is the type which is sent to Subscriptions of the Subject, which defaults to the input value type. When a Subject is instantiated with a pipe operation, it is required to additionally declare this second type, which would be the return type of the pipe function.

def stringify(value: int)str:
    return str(value)

subject = Subject[int, str](maxsize=10, pipe=stringify)

Listing 4: Instantiating a Subject with a pipe function to immediately apply a data transformation.

If the pipe is added to a Subject[T] after initiation, it will accept a callable with the signature (T) -> Any, e.g. a Subject[int] will accept a callable with the signature (int) -> Any.

Finally, multiple pipes can be chained together, with the type of the return value from the previous pipe being the required parameter type for the subsequent pipe, i.e.

dict_subject: Subject[dict[str, float], dict[str, float]] = Subject[dict[str, float]](maxsize=10)
float_subject: Subject[float, int] = dict_subject.pipe(lambda x: x.get("value", -1)).pipe(lambda x: int(x))

Listing 5: Chaining pipes together to apply multiple data transformations.

Filtering data

We can also apply filters to our Subject such that its Subscriptions and child-Subjects are conditionally notified using the Subject.filter method.

numbers = Subject[int](maxsize=10)
big_numbers = numbers.filter(lambda x: x > 100)
big_numbers_made_small = big_numbers.pipe(lambda x: x / 100)

Listing 6: Filtering Subject data to prevent its propagation to Subscriptions with Subject.filter.

A filter is essentially a special pipe which only propagates data to Subjects subscribers when the condition of the filter function is met. Here calling numbers.update(100) would notify a subscriber of numbers, but not a subscriber of big_numbers or its child big_numbers_made_small.

Publisher: a Self-updating Subject

The CDK additionally provides a subclass of Subject, called a Publisher, which will update its value by calling a provided source function at a given interval. Publishers are ideal for sensor data and other data sources which can be stably represented with a callable.

random_number_provider.py
import random

from unitelabs.cdk import Publisher, sila


def get_random_number() -> int:
    return random.randint(0, 42)


class RandomNumberProvider(sila.Feature):
    def __init__(self):
        super().__init__()
        self.number_publisher = Publisher[int](source=get_random_number, interval=1)

    @sila.ObservableProperty()
    async def subscribe_random_number(self) -> sila.Stream[int]:
        """Subscribe to observe changes to the random number."""
        return self.number_publisher.subscribe()

Listing 7: Implementing a self-updating observable with Publisher.

When we subscribe to a Publisher it will create a background task that calls the source function every interval seconds. Publishers smartly manage resources such that only a single background task is created to notify multiple subscribers. Only when all subscribers, including subscribers on child Subjects, i.e. those created from Publisher.pipe and Publisher.filter, have unsubscribed from the Publisher will it cancel its background task to free resources.

A new Subscription of a Subject will wait for the next call to update for its first value. In comparison, a Publisher assumes that its most recently seen value is still relevant information for its subscribers and immediately adds this value to the Subscription queue.

Advanced Usage

Resource Management

Whenever pipe or filter is called, a new child-subject is created. By design, Subjects and Publishers notify all child-subjects in their heirarchy. This can mean that if we are creating many child-subjects via pipe and filter we may eventually begin to see our Subscriptions slowing down due to unneccessary resource allocation.

Consider the following scenario:

global_counter_provider.py
from unitelabs.cdk import Publisher, sila


value = 0
def get_next_value() -> int:
    global value
    value += 1
    return value


class GlobalCounterProvider(sila.Feature):
    def __init__(self):
        super().__init__()
        self.counter = Publisher[int](source=get_next_value)

    @sila.ObservableProperty()
    async def subscribe_even_numbers(self) -> sila.Stream[int]:
        """Subscribe to observe changes to the counter, when the new number is an even number."""
        return self.counter.filter(lambda x: x % 2 == 0).subscribe()

Listing 8: Misuse of the filter and pipe methods resulting in unneccessary resource allocation.

Now, if we were to call subscribe_even_numbers multiple times, every call would create a fresh child-subject. Even though the Subscription is properly unsubscribed, the child-subject remains in the Subjects known children. Subsequent calls to the endpoint will get slower as the counter propagates values to a growing list of child-subjects.

This could be solved in the manner previously described, by creating a stable child-subject that lives in the __init__ method of the Feature, as demonstrated in Listing 3. However, this is not always practical, especially when the child-subjects are created dynamically based on user input or other runtime conditions.

A small change in how we use pipe and filter can ensure that these child-Subjects are removed from a Subject or Publishers children when they are no longer being subscribed to.

global_counter_provider.py
...
class GlobalCounterProvider(sila.Feature):
    ...
    @sila.ObservableProperty()
    def subscribe_even_numbers(self) -> sila.Stream[int]:
        """Subscribe to observe changes to the counter, when the new number is an even number."""
        return self.counter.filter(lambda x: x % 2 == 0, temporary=True).subscribe()

Listing 9: Creating a temporary child-subject to ensure proper disposal after unsubscribe.

By calling pipe and filter with temporary=True we mark these child-subjects for removal once their dependent Subscriptions have been cancelled.

Note: Trying to create a non-temporary child Subjects of a temporary Subject will raise an error at runtime.

temp = subject.pipe(method1, temporary=True).pipe(method2)  # raises
temp = subject.pipe(method1, temporary=True).pipe(method2, temporary=True)  # ok

Listing 10: Creating a non-temporary Subject from a temporary Subject will raise a RuntimeError.

Subscription Queue

We can also use the Subscription as an asyncio.Queue. You may have noticed that the constructor of Subject and Publisher contain the argument maxsize. This refers to the maximum number of entries that can be held in any Subscription queues created with Subject.subscribe.

We can operate on the underlying asyncio.Queue using the Subject.get method.

logged_feature.py
from unitelabs.cdk import Subject, sila

class SteppedActionError(Exception):
    pass

class LoggedFeature(sila.Feature):
    ...
    @sila.UnobservableCommand()
    async def execute_method(self) -> None:
        # execute method
        self.log_subject.update(
          Log(
            method="execute_method",
            when=datetime.datetime.now(),
        ))

    @sila.UnobservableCommand(errors=[SteppedActionError])
    async def multi_step_action(self) -> None:
        subscription = self.log_subject.subscribe()
        try:
            await self.perform_action()
            log_item = await subscription.get(
                predicate=lambda x: x.method == "perform_action",
                timeout=1.0,
            )

            await self.execute_method()
            log_item = await subscription.get(
                predicate=lambda x: x.method == "execute_method",
                timeout=1.0,
            )
        except TimeoutError as e:
            raise SteppedActionError() from e
        finally:
            self.log_subject.unsubscribe(subscription)

Listing 11: Using a Subscription as an asyncio.Queue.

Here we have a method multi_step_action which expects certain Log entries to be added to the Subscription and checks that those Logs were observed before moving on. By calling Subject.get with timeout 1.0, we say that we expect a Log entry matching our predicate within 1.0 seconds.

Technical Hint: It is important here to wrap the use of the Subscription in a try-finally loop. This pattern ensures that the subscription is always properly disposed of, i.e. unsubscribed, when the subscription is cancelled. Improper handling of Subscriptions can result in decreased performance for long-running processes.

Copyright © 2025