UniteLabs
package

unitelabs.cdk

unitelabs/cdk/__init__.py

Packages

Attributes

  • __version__
    = version('unitelabs-cdk')
  • __all__
    = [ "unitelabs.cdk.main.AppFactory", "unitelabs.cdk.config.connector_base_config.CloudServerConfig", "unitelabs.cdk.config.config.ConfigurationError", "unitelabs.cdk.connector.Connector", "unitelabs.cdk.config.connector_base_config.ConnectorBaseConfig", "unitelabs.cdk.subscriptions.publisher.Publisher", "unitelabs.cdk.config.connector_base_config.SiLAServerConfig", "unitelabs.cdk.subscriptions.subject.Subject", "unitelabs.cdk.subscriptions.subscription.Subscription", "unitelabs.cdk.config.config.UnsupportedConfigFiletype", "unitelabs.cdk.logging.create_logger", "unitelabs.cdk.main.run", "unitelabs.cdk.config.config.validate_config" ]

Classes

  • CloudServerConfig

    class

    Configuration for a gRPC Cloud Server.

    MRO
    • sila.server.cloud_server.CloudServerConfig
    Decorators
    dataclasses.dataclass

    Methods

    • __init__(
        self,
        port : typing_extensions.Annotated[int, []],
        root_certificates : str | pathlib.Path | bytes | None,
        certificate_chain : str | pathlib.Path | bytes | None,
        private_key : str | pathlib.Path | bytes | None,
        options : dict
      ) -> None

      port
      typing_extensions.Annotated[int, []] = 50000
      root_certificates
      str | pathlib.Path | bytes | None = None
      certificate_chain
      str | pathlib.Path | bytes | None = None
      private_key
      str | pathlib.Path | bytes | None = None
      options
      dict
    • @pydantic.field_validator('hostname')

      @classmethod

      ensure_valid_hostname(cls, value : str) -> str

      Ensure that the hostname is valid.

      value
      str
      str
    • __post_init__(self) -> None

    Attributes

    • port
      typing_extensions.Annotated[int, []] = 50000
    • root_certificates
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • certificate_chain
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • private_key
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
    • options
      dict = dataclasses.field(default_factory=dict)
  • ConfigurationError

    class

    Received an invalid configuration.

    Bases
    ValueError
  • ConnectorBaseConfig

    class

    Base configuration for a UniteLabs SiLA2 Connector.

    Decorators
    dataclasses.dataclass

    Methods

    Attributes

    • sila_server
      SiLAServerConfig | None = dataclasses.field(default_factory=SiLAServerConfig)
    • cloud_server_endpoint
      CloudServerConfig | None = dataclasses.field(default_factory=CloudServerConfig)
    • discovery
      DiscoveryConfig | None = dataclasses.field(default_factory=DiscoveryConfig)
    • logging
      dict | None = dataclasses.field(default=None)
      A python `logging.config` which is passed into `dictConfig`. Check the official documentation for more information about the logging config schema: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
  • SiLAServerConfig

    class

    Configuration for a SiLA server.

    MRO
    • sila.server.server.ServerConfig
    Decorators
    dataclasses.dataclass

    Methods

    • __init__(
        self,
        root_certificates : str | pathlib.Path | bytes | None,
        certificate_chain : str | pathlib.Path | bytes | None,
        private_key : str | pathlib.Path | bytes | None,
        options : dict,
        name : typing_extensions.Annotated[str, []],
        default_lifetime : float | None
      ) -> None

      root_certificates
      str | pathlib.Path | bytes | None = None
      certificate_chain
      str | pathlib.Path | bytes | None = None
      private_key
      str | pathlib.Path | bytes | None = None
      options
      dict
      name
      typing_extensions.Annotated[str, []] = 'SiLA Server'
      vendor_url
      URIString = 'https://sila-standard.com'
      default_lifetime
      float | None = 3600
    • @classmethod

      __get_pydantic_json_schema__(
        cls,
        core_schema : pydantic_core.core_schema.CoreSchema,
        handler : pydantic.annotated_handlers.GetJsonSchemaHandler
      ) -> pydantic.json_schema.JsonSchemaValue

      core_schema
      pydantic_core.core_schema.CoreSchema
      handler
      pydantic.annotated_handlers.GetJsonSchemaHandler
      pydantic.json_schema.JsonSchemaValue
    • __post_init__(self) -> None

    Attributes

    • root_certificates
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • certificate_chain
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
    • private_key
      str | pathlib.Path | bytes | None = None
      A path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
    • options
      dict = dataclasses.field(default_factory=dict)
    • uuid
      UUIDString = dataclasses.field(default_factory=(lambda: str(uuid.uuid4())))
    • name
      typing_extensions.Annotated[str, []] = 'SiLA Server'
    • vendor_url
      URIString = 'https://sila-standard.com'
    • default_lifetime
      float | None = 3600
      The default lifetime of observable commands in seconds. This value is measured from the time a command is initiated and determines how long its status and responses remain available on the server before being deleted to free memory.
  • UnsupportedConfigFiletype

    class

    The filetype is unsupported for reading/writing config files.

    Bases
    Exception
  • Connector

    class

    Main app.

    Methods

    • __init__(self, config : ConnectorBaseConfig | None) -> None

      config
      ConnectorBaseConfig | None = None
    • start(self) -> None

      Start the connector and all related services.

    • stop(self) -> None

      Stop the connector and all related services.

    • wait_for_ready(self) -> None

      Wait until the connector is ready.

    • wait_for_termination(self) -> None

      Wait until the connector is terminated.

    • get_feature(self, feature : type[T]) -> T

      Get the instance of a registered feature by its type.

      feature
      type[T]
      The type of the feature to receive.
      The feature registered with this connector.
    • register(self, feature : Feature) -> None

      Register a new feature to this driver.

      feature
    • on_shutdown(self, handler : Handler) -> None

      Add a shutdown hook to be called in the terminating phase. This will be in response to an explicit call to `app.stop()` or upon receipt of system signals such as SIGINT, SIGTERM or SIGHUP.

      handler
      The method to be called on shutdown.
    • off_shutdown(self, handler : Handler) -> None

      Remove a previously added shutdown hook.

      handler
      The handler to be removed from the shutdown hooks.

    Attributes

    • __config
      = config or ConnectorBaseConfig()
    • _ready
      = asyncio.Event()
    • _shutdown
      = asyncio.Event()
    • _shutdown_handlers
      list[Handler] = []
    • _sila_server
      = None
    • _discovery
      = None
    • _cloud_server
      = None
    • config
      The configuration.
    • sila_server
      sila.server.server.Server | None = None
      The SiLA Server.
    • logger
      logging.Logger = None
      A standard Python :class:`~logging.Logger` for the app.
    • debug
      bool = None
      Whether debug mode is enabled.
  • Publisher

    class

    An observable which updates itself by polling a data source.

    Bases
    typing_extensions.Generic[IN, OUT], Subject[IN, OUT]

    Methods

    • __init__(
        self,
        source : typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN],
        interval : float,
        maxsize : int,
      ) -> None

      source
      typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN]
      A function or coroutine that will be called at a fixed interval as the data source of the subscription.
      interval
      float = 5
      How many seconds to wait between polling calls to `source`.
      maxsize
      int = 0
      The maximum number of messages to track in the queue.
      pipe
      PipeFunction[IN, OUT] | None = None
    • @typing.override

      on_subscribe(self) -> None

    • @typing.override

      _on_subscribe(self, subscription : Subscription) -> None

      subscription
    • @typing.override

      on_unsubscribe(self) -> None

    • _set(self) -> None

      Create a background task to poll the data `source` and update the current value. Task will be destroyed when all subscriptions to the `Publisher` are removed.

    • _unset(self) -> None

      Stop the background task that polls the data `source`. This is called when all subscriptions to the `Publisher` are removed.

    • __self_update(self) -> None

    Attributes

    • _update_task
      asyncio.Task | None = None
    • _source
      = source
    • _interval
      = interval
  • Subject

    class

    An observable that can be updated externally and subscribed to by multiple observers.

    Bases
    typing_extensions.Generic[IN, OUT], contextlib.AbstractContextManager

    Methods

    • __init__(self, maxsize : int, pipe : PipeFunction[IN, OUT] | None) -> None

      maxsize
      int = 0
      The maximum number of messages to track in `Subscription` queues created by `subscribe`.
      pipe
      PipeFunction[IN, OUT] | None = None
    • __repr__(self) -> str

      str
    • subscribe(self) -> Subscription[OUT]

      Add a `Subscription` that will be notified on `update`.

    • on_subscribe(self) -> None

      Emit an event when the first subscription is added. Override this method to start external listeners or resources when the first subscriber begins listening.

    • _on_subscribe(self, subscription : Subscription) -> None

      Emit an event when `subscribe` is called.

      subscription
    • unsubscribe(self, subscriber : Subscription[typing_extensions.Any]) -> None

      Remove a `Subscription`.

      subscriber
      Subscription[typing_extensions.Any]
    • on_unsubscribe(self) -> None

      Emit an event when the last subscription is removed. Override this method to perform cleanup or release resources when there are no active subscribers.

    • _on_unsubscribe(self) -> None

      Emit an event when `unsubscribe` is called.

    • notify(self) -> None

      Propagate the current value to all listening `Subscription`s.

    • update(self, value : IN) -> None

      Update the current value and `notify` all listening `Subscription`s.

      value
    • pipe(self, func : typing_extensions.Callable[[], T], temporary : bool) -> Subject[OUT, T]

      Create a new `Subject` with `func` added to the list of pipes that are applied to values recieved from `notify`.

      func
      typing_extensions.Callable[[], T]
      The callable that should be applied to all values seen by the new `Subject`.
      temporary
      bool = False
      Whether or not the pipe should be pruned from its parent on `unsubscribe`.
      A new `Subject` with the pipe function added.
    • filter(
        self,
        predicate : typing_extensions.Callable[[], bool | typing_extensions.Any],
        temporary : bool
      ) -> Subject[OUT, OUT]

      Create a new `Subject` that is only notified when the item passes the `predicate`.

      predicate
      typing_extensions.Callable[[], bool | typing_extensions.Any]
      A filter predicate to apply.
      temporary
      bool = False
      Whether the not the filter should be pruned from its parent on `unsubscribe`.
      A new `Subject` with the filter applied.
    • @typing.override

      __enter__(self) -> Subscription[OUT]

      Return a new `Subscription` upon entering the runtime context.

      The newly created `Subscription`.
    • @typing.override

      __exit__(
        self,
        exc_type : type[BaseException] | None,
        exc_value : BaseException | None,
        traceback : types.TracebackType | None
      ) -> bool

      exc_type
      type[BaseException] | None = None
      exc_value
      BaseException | None = None
      traceback
      types.TracebackType | None = None
      bool

    Attributes

    • _maxsize
      = maxsize
    • _value
      OUT | Default = _DEFAULT_VALUE
    • _total_subscribers
      = 0
    • _subscribers
      list[Subscription[OUT]] = []
    • _children
      list[Subject[OUT, typing_extensions.Any]] = []
    • _parent
      Subject | None = None
    • _is_temporary
      = False
    • _context
      Subscription[OUT] | None = None
    • _pipe
      PipeFunction[IN, OUT] = pipe or default_pipe
    • current
      OUT | Default = None
      The current value.
    • subscribers
      list[Subscription[OUT]] = None
      All `Subscription`s listening to this `Subject`.
    • has_subscribers
      bool = None
      Whether any `Subject` listens to this `Subscription`.
  • Subscription

    class

    An AsyncIterable you can asynchronously add items to.

    Bases
    asyncio.Queue[T], collections.abc.AsyncIterator[T]

    Methods

    • __init__(self, maxsize : int, parent : Subject) -> None

      maxsize
      int
      parent
    • __repr__(self) -> str

      str
    • update(self, value : T) -> None

      Update the current value, if `value` is not current value.

      value
    • cancel(self) -> None

      Cancel the subscription.

    • terminate(self) -> None

      Unsubscribe the subscription from its parent.

    • __aiter__(self) -> collections.abc.AsyncIterator[T]

      collections.abc.AsyncIterator[T]
    • __anext__(self) -> T

    • get(
        self,
        predicate : typing_extensions.Callable[[], bool],
        timeout : float | None
      ) -> T

      Request an upcoming value that satisfies the `predicate`. If used without `timeout` this will block indefinitely until a value satisfies the `predicate`.

      predicate
      typing_extensions.Callable[[], bool]
      A filter predicate to apply.
      timeout
      float | None = None
      How many seconds to wait for new value before timing out.

    Attributes

    • _parent
      Subject = weakref.proxy(parent)
    • _value
      T | Default = typing.cast(T, _DEFAULT_VALUE)
    • _closed
      = asyncio.Event()
    • size
      int = None
      The number of items in the queue.