unitelabs.cdk
unitelabs/cdk/__init__.py
Packages
Attributes
- __version__= version('unitelabs-cdk')
- __all__= [ "unitelabs.cdk.main.AppFactory", "unitelabs.cdk.config.connector_base_config.CloudServerConfig", "unitelabs.cdk.config.config.ConfigurationError", "unitelabs.cdk.connector.Connector", "unitelabs.cdk.config.connector_base_config.ConnectorBaseConfig", "unitelabs.cdk.subscriptions.publisher.Publisher", "unitelabs.cdk.subscriptions.replay.Replay", "unitelabs.cdk.config.connector_base_config.SiLAServerConfig", "unitelabs.cdk.subscriptions.subject.Subject", "unitelabs.cdk.subscriptions.subscription.Subscription", "unitelabs.cdk.config.config.UnsupportedConfigFiletype", "unitelabs.cdk.logging.create_logger", "unitelabs.cdk.main.run", "unitelabs.cdk.config.config.validate_config" ]
Classes
CloudServerConfig
classConfiguration for a gRPC Cloud Server.
- MRO
- └ sila.server.cloud_server.CloudServerConfig
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- port : typing_extensions.Annotated[int, []],
- root_certificates : str | pathlib.Path | bytes | None,
- certificate_chain : str | pathlib.Path | bytes | None,
- private_key : str | pathlib.Path | bytes | None,
- options : dict
porttyping_extensions.Annotated[int, []] = 50000root_certificatesstr | pathlib.Path | bytes | None = Nonecertificate_chainstr | pathlib.Path | bytes | None = Noneprivate_keystr | pathlib.Path | bytes | None = Noneoptionsdict@pydantic.field_validator('hostname')
@classmethod
ensure_valid_hostname(cls, value : str) -> strEnsure that the hostname is valid.
valuestrstr@classmethod
__get_pydantic_json_schema__(- cls,
- core_schema : pydantic_core.core_schema.CoreSchema,
- handler : pydantic.annotated_handlers.GetJsonSchemaHandler
core_schemapydantic_core.core_schema.CoreSchemahandlerpydantic.annotated_handlers.GetJsonSchemaHandlerpydantic.json_schema.JsonSchemaValue__post_init__(self) -> None
Attributes
- porttyping_extensions.Annotated[int, []] = 50000
- root_certificatesstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
- certificate_chainstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
- private_keystr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
- optionsdict = dataclasses.field(default_factory=dict)
ConfigurationError
classReceived an invalid configuration.
- Bases
- ValueError
ConnectorBaseConfig
classBase configuration for a UniteLabs SiLA2 Connector.
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- sila_server : SiLAServerConfig | None,
- cloud_server_endpoint : CloudServerConfig | None,
- discovery : DiscoveryConfig | None,
- logging : dict | None
sila_serverSiLAServerConfig | Nonecloud_server_endpointCloudServerConfig | NonediscoveryDiscoveryConfig | Noneloggingdict | None = None__post_init__(self) -> None
Attributes
- sila_serverSiLAServerConfig | None = dataclasses.field(default_factory=SiLAServerConfig)
- cloud_server_endpointCloudServerConfig | None = dataclasses.field(default_factory=CloudServerConfig)
- discoveryDiscoveryConfig | None = dataclasses.field(default_factory=DiscoveryConfig)
- loggingdict | None = dataclasses.field(default=None)A python `logging.config` which is passed into `dictConfig`. Check the official documentation for more information about the logging config schema: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
SiLAServerConfig
classConfiguration for a SiLA server.
- MRO
- └ sila.server.server.ServerConfig
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- root_certificates : str | pathlib.Path | bytes | None,
- certificate_chain : str | pathlib.Path | bytes | None,
- private_key : str | pathlib.Path | bytes | None,
- options : dict,
- uuid : UUIDString,
- name : typing_extensions.Annotated[str, []],
- vendor_url : URIString,
- default_lifetime : float | None
root_certificatesstr | pathlib.Path | bytes | None = Nonecertificate_chainstr | pathlib.Path | bytes | None = Noneprivate_keystr | pathlib.Path | bytes | None = Noneoptionsdictuuidnametyping_extensions.Annotated[str, []] = 'SiLA Server'vendor_urlURIString = 'https://sila-standard.com'default_lifetimefloat | None = 3600@classmethod
__get_pydantic_json_schema__(- cls,
- core_schema : pydantic_core.core_schema.CoreSchema,
- handler : pydantic.annotated_handlers.GetJsonSchemaHandler
core_schemapydantic_core.core_schema.CoreSchemahandlerpydantic.annotated_handlers.GetJsonSchemaHandlerpydantic.json_schema.JsonSchemaValue__post_init__(self) -> None
Attributes
- root_certificatesstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
- certificate_chainstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
- private_keystr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
- optionsdict = dataclasses.field(default_factory=dict)
- uuidUUIDString = dataclasses.field(default_factory=(lambda: str(uuid.uuid4())))
- nametyping_extensions.Annotated[str, []] = 'SiLA Server'
- vendor_urlURIString = 'https://sila-standard.com'
- default_lifetimefloat | None = 3600The default lifetime of observable commands in seconds. This value is measured from the time a command is initiated and determines how long its status and responses remain available on the server before being deleted to free memory.
UnsupportedConfigFiletype
classThe filetype is unsupported for reading/writing config files.
- Bases
- Exception
Connector
classMain app.
Methods
__init__(self, config : ConnectorBaseConfig | None) -> NoneconfigConnectorBaseConfig | None = Nonestart(self) -> NoneStart the connector and all related services.
stop(self) -> NoneStop the connector and all related services.
wait_for_ready(self) -> NoneWait until the connector is ready.
wait_for_termination(self) -> NoneWait until the connector is terminated.
Attributes
- __config= config or ConnectorBaseConfig()
- _ready= asyncio.Event()
- _shutdown= asyncio.Event()
- _shutdown_handlerslist[Handler] = []
- _sila_server= None
- _discovery= None
- _cloud_server= None
- configConnectorBaseConfig = NoneThe configuration.
- sila_serversila.server.server.Server | None = NoneThe SiLA Server.
- loggerlogging.Logger = NoneA standard Python :class:`~logging.Logger` for the app.
- debugbool = NoneWhether debug mode is enabled.
Publisher
classAn observable which updates itself by polling a data source.
Methods
__init__(- self,
- source : typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN],
- interval : float,
- maxsize : int,
- pipe : PipeFunction[IN, OUT] | None
sourceA function or coroutine that will be called at a fixed interval as the data source of the subscription.intervalfloat = 5How many seconds to wait between polling calls to `source`.maxsizeint = 0The maximum number of messages to track in the queue.pipePipeFunction[IN, OUT] | None = None@typing.override
on_subscribe(self) -> None@typing.override
_on_subscribe(self, subscription : Subscription) -> Nonesubscription@typing.override
on_unsubscribe(self) -> None_set(self) -> NoneCreate a background task to poll the data `source` and update the current value. Task will be destroyed when all subscriptions to the `Publisher` are removed.
_unset(self) -> NoneStop the background task that polls the data `source`. This is called when all subscriptions to the `Publisher` are removed.
__self_update(self) -> None
Attributes
- _update_taskasyncio.Task | None = None
- _source= source
- _interval= interval
Replay
classDefines how previously emitted values are replayed to new subscribers.
- Bases
- enum.IntEnum
Attributes
- NONE= 0Do not replay any previous values. The subscription will only receive values emitted after it is created.
- ALL= -1Replay all previously emitted values to the new subscriber, in the order they were originally produced.
- CURRENT= 1Replay only the most recent value (if any) to the new subscriber. If no value has been emitted yet, nothing is replayed.
Subject
classAn observable that can be updated externally and subscribed to by multiple observers.
Methods
__init__(- self,
- maxsize : int,
- pipe : PipeFunction[IN, OUT] | None,
maxsizeint = 0The maximum number of messages to track in `Subscription` queues created by `subscribe`.pipePipeFunction[IN, OUT] | None = Noneinitial_valueOUT | Default = _DEFAULT_VALUEOptionally set an initial value for this subject.__repr__(self) -> strstrsubscribe(self, replay : Replay | int, distinct : bool) -> Subscription[OUT]Add a `Subscription` that will be notified on `update`.
on_subscribe(self) -> NoneEmit an event when the first subscription is added. Override this method to start external listeners or resources when the first subscriber begins listening.
_on_subscribe(self, subscription : Subscription) -> NoneEmit an event when `subscribe` is called.
subscriptionunsubscribe(self, subscriber : Subscription[typing_extensions.Any]) -> NoneRemove a `Subscription`.
subscriberSubscription[typing_extensions.Any]on_unsubscribe(self) -> NoneEmit an event when the last subscription is removed. Override this method to perform cleanup or release resources when there are no active subscribers.
_on_unsubscribe(self) -> NoneEmit an event when `unsubscribe` is called.
notify(self) -> NonePropagate the current value to all listening `Subscription`s.
Create a new `Subject` with `func` added to the list of pipes that are applied to values recieved from `notify`.
functyping_extensions.Callable[[], T]The callable that should be applied to all values seen by the new `Subject`.temporarybool = FalseWhether or not the pipe should be pruned from its parent on `unsubscribe`.filter(- self,
- predicate : typing_extensions.Callable[[], bool | typing_extensions.Any],
- temporary : bool
Create a new `Subject` that is only notified when the item passes the `predicate`.
predicatetyping_extensions.Callable[[], bool | typing_extensions.Any]A filter predicate to apply.temporarybool = FalseWhether the not the filter should be pruned from its parent on `unsubscribe`.@typing.override
__enter__(self) -> Subscription[OUT]Return a new `Subscription` upon entering the runtime context.
The newly created `Subscription`.@typing.override
__exit__(- self,
- exc_type : type[BaseException] | None,
- exc_value : BaseException | None,
- traceback : types.TracebackType | None
exc_typetype[BaseException] | None = Noneexc_valueBaseException | None = Nonetracebacktypes.TracebackType | None = Nonebool
Attributes
- _maxsize= maxsize
- _values= collections.deque[OUT](iterable=([initial_value] if not isinstance(initial_value, Default) else []), maxlen=(maxsize or None))
- _total_subscribers= 0
- _subscriberslist[Subscription[OUT]] = []
- _parentSubject | None = None
- _is_temporary= False
- _contextSubscription[OUT] | None = None
- _callbacksset[asyncio.Task] = set()
- _pipePipeFunction[IN, OUT] = pipe or default_pipe
- subscriberslist[Subscription[OUT]] = NoneAll `Subscription`s listening to this `Subject`.
- has_subscribersbool = NoneWhether any `Subject` listens to this `Subscription`.
Subscription
classAn AsyncIterable you can asynchronously add items to.
Methods
__repr__(self) -> strstrcancel(self) -> NoneCancel the subscription.
terminate(self) -> NoneUnsubscribe the subscription from its parent.
get(- self,
- predicate : typing_extensions.Callable[[], bool],
- timeout : float | None
Request an upcoming value that satisfies the `predicate`. If used without `timeout` this will block indefinitely until a value satisfies the `predicate`.
predicatetyping_extensions.Callable[[], bool]A filter predicate to apply.timeoutfloat | None = NoneHow many seconds to wait for new value before timing out.
Attributes
- _distinct= distinct
- _parentSubject = weakref.proxy(parent)
- _closed= asyncio.Event()
- sizeint = NoneThe number of items in the queue.