UniteLabs
package

unitelabs.cdk

unitelabs/cdk/__init__.py

Packages

Attributes

  • __version__
    = version('unitelabs-cdk')
  • __all__
    = [ "unitelabs.cdk.main.AppFactory", "unitelabs.cdk.config.connector_base_config.CloudServerConfig", "unitelabs.cdk.config.config.ConfigurationError", "unitelabs.cdk.connector.Connector", "unitelabs.cdk.config.connector_base_config.ConnectorBaseConfig", "unitelabs.cdk.subscriptions.publisher.Publisher", "unitelabs.cdk.subscriptions.replay.Replay", "unitelabs.cdk.config.connector_base_config.SiLAServerConfig", "unitelabs.cdk.subscriptions.subject.Subject", "unitelabs.cdk.subscriptions.subscription.Subscription", "unitelabs.cdk.config.config.UnsupportedConfigFiletype", "unitelabs.cdk.logging.create_logger", "unitelabs.cdk.main.run", "unitelabs.cdk.config.config.validate_config" ]

Classes

  • CloudServerConfig

    class

    Configuration for a gRPC Cloud Server.

    MRO
    • sila.server.cloud_server.CloudServerConfig
    Decorators
    dataclasses.dataclass

    Methods

    • __init__(
        self,
        port : typing_extensions.Annotated[int, []],
        root_certificates : str | pathlib.Path | bytes | None,
        certificate_chain : str | pathlib.Path | bytes | None,
        private_key : str | pathlib.Path | bytes | None,
        options : dict
      ) -> None

      port
      typing_extensions.Annotated[int, []] = 50000
      root_certificates
      str | pathlib.Path | bytes | None = None
      certificate_chain
      str | pathlib.Path | bytes | None = None
      private_key
      str | pathlib.Path | bytes | None = None
      options
      dict
    • @pydantic.field_validator('hostname')

      @classmethod

      ensure_valid_hostname(cls, value : str) -> str

      Ensure that the hostname is valid.

      value
      str
      str
    • @classmethod

      __get_pydantic_json_schema__(
        cls,
        core_schema : pydantic_core.core_schema.CoreSchema,
        handler : pydantic.annotated_handlers.GetJsonSchemaHandler
      ) -> pydantic.json_schema.JsonSchemaValue

      core_schema
      pydantic_core.core_schema.CoreSchema
      handler
      pydantic.annotated_handlers.GetJsonSchemaHandler
      pydantic.json_schema.JsonSchemaValue
    • __post_init__(self) -> None

    Attributes

    • port
      typing_extensions.Annotated[int, []] = 50000
    • root_certificates
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • certificate_chain
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • private_key
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
    • options
      dict = dataclasses.field(default_factory=dict)
  • ConfigurationError

    class

    Received an invalid configuration.

    Bases
    ValueError
  • ConnectorBaseConfig

    class

    Base configuration for a UniteLabs SiLA2 Connector.

    Decorators
    dataclasses.dataclass

    Methods

    Attributes

    • sila_server
      SiLAServerConfig | None = dataclasses.field(default_factory=SiLAServerConfig)
    • cloud_server_endpoint
      CloudServerConfig | None = dataclasses.field(default_factory=CloudServerConfig)
    • discovery
      DiscoveryConfig | None = dataclasses.field(default_factory=DiscoveryConfig)
    • logging
      dict | None = dataclasses.field(default=None)
      A python `logging.config` which is passed into `dictConfig`. Check the official documentation for more information about the logging config schema: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
  • SiLAServerConfig

    class

    Configuration for a SiLA server.

    MRO
    • sila.server.server.ServerConfig
    Decorators
    dataclasses.dataclass

    Methods

    • __init__(
        self,
        root_certificates : str | pathlib.Path | bytes | None,
        certificate_chain : str | pathlib.Path | bytes | None,
        private_key : str | pathlib.Path | bytes | None,
        options : dict,
        name : typing_extensions.Annotated[str, []],
        default_lifetime : float | None
      ) -> None

      root_certificates
      str | pathlib.Path | bytes | None = None
      certificate_chain
      str | pathlib.Path | bytes | None = None
      private_key
      str | pathlib.Path | bytes | None = None
      options
      dict
      name
      typing_extensions.Annotated[str, []] = 'SiLA Server'
      vendor_url
      URIString = 'https://sila-standard.com'
      default_lifetime
      float | None = 3600
    • @classmethod

      __get_pydantic_json_schema__(
        cls,
        core_schema : pydantic_core.core_schema.CoreSchema,
        handler : pydantic.annotated_handlers.GetJsonSchemaHandler
      ) -> pydantic.json_schema.JsonSchemaValue

      core_schema
      pydantic_core.core_schema.CoreSchema
      handler
      pydantic.annotated_handlers.GetJsonSchemaHandler
      pydantic.json_schema.JsonSchemaValue
    • __post_init__(self) -> None

    Attributes

    • root_certificates
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • certificate_chain
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • private_key
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
    • options
      dict = dataclasses.field(default_factory=dict)
    • uuid
      UUIDString = dataclasses.field(default_factory=(lambda: str(uuid.uuid4())))
    • name
      typing_extensions.Annotated[str, []] = 'SiLA Server'
    • vendor_url
      URIString = 'https://sila-standard.com'
    • default_lifetime
      float | None = 3600
      The default lifetime of observable commands in seconds. This value is measured from the time a command is initiated and determines how long its status and responses remain available on the server before being deleted to free memory.
  • UnsupportedConfigFiletype

    class

    The filetype is unsupported for reading/writing config files.

    Bases
    Exception
  • Connector

    class

    Main app.

    Methods

    • __init__(self, config : ConnectorBaseConfig | None) -> None

      config
      ConnectorBaseConfig | None = None
    • start(self) -> None

      Start the connector and all related services.

    • stop(self) -> None

      Stop the connector and all related services.

    • wait_for_ready(self) -> None

      Wait until the connector is ready.

    • wait_for_termination(self) -> None

      Wait until the connector is terminated.

    • get_feature(self, feature : type[T]) -> T

      Get the instance of a registered feature by its type.

      feature
      type[T]
      The type of the feature to receive.
      The feature registered with this connector.
    • register(self, feature : Feature) -> None

      Register a new feature to this driver.

      feature
    • on_shutdown(self, handler : Handler) -> None

      Add a shutdown hook to be called in the terminating phase. This will be in response to an explicit call to `app.stop()` or upon receipt of system signals such as SIGINT, SIGTERM or SIGHUP.

      handler
      The method to be called on shutdown.
    • off_shutdown(self, handler : Handler) -> None

      Remove a previously added shutdown hook.

      handler
      The handler to be removed from the shutdown hooks.

    Attributes

    • __config
      = config or ConnectorBaseConfig()
    • _ready
      = asyncio.Event()
    • _shutdown
      = asyncio.Event()
    • _shutdown_handlers
      list[Handler] = []
    • _sila_server
      = None
    • _discovery
      = None
    • _cloud_server
      = None
    • config
      The configuration.
    • sila_server
      sila.server.server.Server | None = None
      The SiLA Server.
    • logger
      logging.Logger = None
      A standard Python :class:`~logging.Logger` for the app.
    • debug
      bool = None
      Whether debug mode is enabled.
  • Publisher

    class

    An observable which updates itself by polling a data source.

    Bases
    typing_extensions.Generic[IN, OUT], Subject[IN, OUT]

    Methods

    • __init__(
        self,
        source : typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN],
        interval : float,
        maxsize : int,
      ) -> None

      source
      typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN]
      A function or coroutine that will be called at a fixed interval as the data source of the subscription.
      interval
      float = 5
      How many seconds to wait between polling calls to `source`.
      maxsize
      int = 0
      The maximum number of messages to track in the queue.
      pipe
      PipeFunction[IN, OUT] | None = None
    • @typing.override

      on_subscribe(self) -> None

    • @typing.override

      _on_subscribe(self, subscription : Subscription) -> None

      subscription
    • @typing.override

      on_unsubscribe(self) -> None

    • _set(self) -> None

      Create a background task to poll the data `source` and update the current value. Task will be destroyed when all subscriptions to the `Publisher` are removed.

    • _unset(self) -> None

      Stop the background task that polls the data `source`. This is called when all subscriptions to the `Publisher` are removed.

    • __self_update(self) -> None

    Attributes

    • _update_task
      asyncio.Task | None = None
    • _source
      = source
    • _interval
      = interval
  • Replay

    class

    Defines how previously emitted values are replayed to new subscribers.

    Bases
    enum.IntEnum

    Attributes

    • NONE
      = 0
      Do not replay any previous values. The subscription will only receive values emitted after it is created.
    • ALL
      = -1
      Replay all previously emitted values to the new subscriber, in the order they were originally produced.
    • CURRENT
      = 1
      Replay only the most recent value (if any) to the new subscriber. If no value has been emitted yet, nothing is replayed.
  • Subject

    class

    An observable that can be updated externally and subscribed to by multiple observers.

    Bases
    typing_extensions.Generic[IN, OUT], contextlib.AbstractContextManager

    Methods

    • __init__(
        self,
        maxsize : int,
      ) -> None

      maxsize
      int = 0
      The maximum number of messages to track in `Subscription` queues created by `subscribe`.
      pipe
      PipeFunction[IN, OUT] | None = None
      initial_value
      Optionally set an initial value for this subject.
    • __repr__(self) -> str

      str
    • subscribe(self, replay : Replay | int, distinct : bool) -> Subscription[OUT]

      Add a `Subscription` that will be notified on `update`.

      replay
      Replay | int = NONE
      Controls how many previously emitted values are replayed to the new subscriber.
      distinct
      bool = True
      If True, only emit values when they differ from the previously emitted value.
    • on_subscribe(self) -> None

      Emit an event when the first subscription is added. Override this method to start external listeners or resources when the first subscriber begins listening.

    • _on_subscribe(self, subscription : Subscription) -> None

      Emit an event when `subscribe` is called.

      subscription
    • unsubscribe(self, subscriber : Subscription[typing_extensions.Any]) -> None

      Remove a `Subscription`.

      subscriber
      Subscription[typing_extensions.Any]
    • on_unsubscribe(self) -> None

      Emit an event when the last subscription is removed. Override this method to perform cleanup or release resources when there are no active subscribers.

    • _on_unsubscribe(self) -> None

      Emit an event when `unsubscribe` is called.

    • notify(self) -> None

      Propagate the current value to all listening `Subscription`s.

    • update(self, value : IN) -> None

      Update the current value and `notify` all listening `Subscription`s.

      value
    • pipe(self, func : typing_extensions.Callable[[], T], temporary : bool) -> Subject[OUT, T]

      Create a new `Subject` with `func` added to the list of pipes that are applied to values recieved from `notify`.

      func
      typing_extensions.Callable[[], T]
      The callable that should be applied to all values seen by the new `Subject`.
      temporary
      bool = False
      Whether or not the pipe should be pruned from its parent on `unsubscribe`.
      A new `Subject` with the pipe function added.
    • filter(
        self,
        predicate : typing_extensions.Callable[[], bool | typing_extensions.Any],
        temporary : bool
      ) -> Subject[OUT, OUT]

      Create a new `Subject` that is only notified when the item passes the `predicate`.

      predicate
      typing_extensions.Callable[[], bool | typing_extensions.Any]
      A filter predicate to apply.
      temporary
      bool = False
      Whether the not the filter should be pruned from its parent on `unsubscribe`.
      A new `Subject` with the filter applied.
    • @typing.override

      __enter__(self) -> Subscription[OUT]

      Return a new `Subscription` upon entering the runtime context.

      The newly created `Subscription`.
    • @typing.override

      __exit__(
        self,
        exc_type : type[BaseException] | None,
        exc_value : BaseException | None,
        traceback : types.TracebackType | None
      ) -> bool

      exc_type
      type[BaseException] | None = None
      exc_value
      BaseException | None = None
      traceback
      types.TracebackType | None = None
      bool

    Attributes

    • _maxsize
      = maxsize
    • _values
      = collections.deque[OUT](iterable=([initial_value] if not isinstance(initial_value, Default) else []), maxlen=(maxsize or None))
    • _total_subscribers
      = 0
    • _subscribers
      list[Subscription[OUT]] = []
    • _children
      list[Subject[OUT, typing_extensions.Any]] = []
    • _parent
      Subject | None = None
    • _is_temporary
      = False
    • _context
      Subscription[OUT] | None = None
    • _callbacks
      set[asyncio.Task] = set()
    • _pipe
      PipeFunction[IN, OUT] = pipe or default_pipe
    • current
      OUT | Default = None
      The current value.
    • subscribers
      list[Subscription[OUT]] = None
      All `Subscription`s listening to this `Subject`.
    • has_subscribers
      bool = None
      Whether any `Subject` listens to this `Subscription`.
  • Subscription

    class

    An AsyncIterable you can asynchronously add items to.

    Bases
    asyncio.Queue[T], collections.abc.AsyncIterator[T]

    Methods

    • __init__(
        self,
        maxsize : int,
        distinct : bool
      ) -> None

      maxsize
      int
      parent
      distinct
      bool = True
    • __repr__(self) -> str

      str
    • update(self, value : T) -> None

      Update the current value. Observers are only informed about this change if the subscription is either not set to distinct mode or the value differs from the current one.

      value
      The updated value.
    • cancel(self) -> None

      Cancel the subscription.

    • terminate(self) -> None

      Unsubscribe the subscription from its parent.

    • __aiter__(self) -> collections.abc.AsyncIterator[T]

      collections.abc.AsyncIterator[T]
    • __anext__(self) -> T

    • get(
        self,
        predicate : typing_extensions.Callable[[], bool],
        timeout : float | None
      ) -> T

      Request an upcoming value that satisfies the `predicate`. If used without `timeout` this will block indefinitely until a value satisfies the `predicate`.

      predicate
      typing_extensions.Callable[[], bool]
      A filter predicate to apply.
      timeout
      float | None = None
      How many seconds to wait for new value before timing out.

    Attributes

    • _distinct
      = distinct
    • _parent
      Subject = weakref.proxy(parent)
    • _value
      T | Default = typing.cast(T, _DEFAULT_VALUE)
    • _closed
      = asyncio.Event()
    • size
      int = None
      The number of items in the queue.