unitelabs.cdk
unitelabs/cdk/__init__.py
Packages
Attributes
- __version__= version('unitelabs-cdk')
- __all__= [ "unitelabs.cdk.main.AppFactory", "unitelabs.cdk.config.connector_base_config.CloudServerConfig", "unitelabs.cdk.config.config.ConfigurationError", "unitelabs.cdk.connector.Connector", "unitelabs.cdk.config.connector_base_config.ConnectorBaseConfig", "unitelabs.cdk.subscriptions.publisher.Publisher", "unitelabs.cdk.config.connector_base_config.SiLAServerConfig", "unitelabs.cdk.subscriptions.subject.Subject", "unitelabs.cdk.subscriptions.subscription.Subscription", "unitelabs.cdk.config.config.UnsupportedConfigFiletype", "unitelabs.cdk.logging.create_logger", "unitelabs.cdk.main.run", "unitelabs.cdk.config.config.validate_config" ]
Classes
CloudServerConfig
classConfiguration for a gRPC Cloud Server.
- MRO
- └ sila.server.cloud_server.CloudServerConfig
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- port : typing_extensions.Annotated[int, []],
- root_certificates : str | pathlib.Path | bytes | None,
- certificate_chain : str | pathlib.Path | bytes | None,
- private_key : str | pathlib.Path | bytes | None,
- options : dict
porttyping_extensions.Annotated[int, []] = 50000root_certificatesstr | pathlib.Path | bytes | None = Nonecertificate_chainstr | pathlib.Path | bytes | None = Noneprivate_keystr | pathlib.Path | bytes | None = Noneoptionsdict@pydantic.field_validator('hostname')
@classmethod
ensure_valid_hostname(cls, value : str) -> strEnsure that the hostname is valid.
valuestrstr__post_init__(self) -> None
Attributes
- porttyping_extensions.Annotated[int, []] = 50000
- root_certificatesstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
- certificate_chainstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
- private_keystr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
- optionsdict = dataclasses.field(default_factory=dict)
ConfigurationError
classReceived an invalid configuration.
- Bases
- ValueError
ConnectorBaseConfig
classBase configuration for a UniteLabs SiLA2 Connector.
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- sila_server : SiLAServerConfig | None,
- cloud_server_endpoint : CloudServerConfig | None,
- discovery : DiscoveryConfig | None,
- logging : dict | None
sila_serverSiLAServerConfig | Nonecloud_server_endpointCloudServerConfig | NonediscoveryDiscoveryConfig | Noneloggingdict | None = None__post_init__(self) -> None
Attributes
- sila_serverSiLAServerConfig | None = dataclasses.field(default_factory=SiLAServerConfig)
- cloud_server_endpointCloudServerConfig | None = dataclasses.field(default_factory=CloudServerConfig)
- discoveryDiscoveryConfig | None = dataclasses.field(default_factory=DiscoveryConfig)
- loggingdict | None = dataclasses.field(default=None)A python `logging.config` which is passed into `dictConfig`. Check the official documentation for more information about the logging config schema: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
SiLAServerConfig
classConfiguration for a SiLA server.
- MRO
- └ sila.server.server.ServerConfig
- Decorators
- dataclasses.dataclass
Methods
__init__(- self,
- root_certificates : str | pathlib.Path | bytes | None,
- certificate_chain : str | pathlib.Path | bytes | None,
- private_key : str | pathlib.Path | bytes | None,
- options : dict,
- uuid : UUIDString,
- name : typing_extensions.Annotated[str, []],
- vendor_url : URIString
root_certificatesstr | pathlib.Path | bytes | None = Nonecertificate_chainstr | pathlib.Path | bytes | None = Noneprivate_keystr | pathlib.Path | bytes | None = Noneoptionsdictuuidnametyping_extensions.Annotated[str, []] = 'SiLA Server'vendor_urlURIString = 'https://sila-standard.com'@classmethod
__get_pydantic_json_schema__(- cls,
- core_schema : pydantic_core.core_schema.CoreSchema,
- handler : pydantic.annotated_handlers.GetJsonSchemaHandler
core_schemapydantic_core.core_schema.CoreSchemahandlerpydantic.annotated_handlers.GetJsonSchemaHandlerpydantic.json_schema.JsonSchemaValue__post_init__(self) -> None
Attributes
- root_certificatesstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded root certificates, or `None` if no root certificates should be used. Note: TLS must be set to True to activate encryption with this certificate.
- certificate_chainstr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded certificate chain, or `None` if no certificate chain should be used. Note: TLS must be set to True to activate encryption with this certificate.
- private_keystr | pathlib.Path | bytes | None = NoneA path to, or the bytestring contents of, the PEM-encoded private key, or `None` if no private key should be used. Note: TLS must be set to True to activate encryption with this key.
- optionsdict = dataclasses.field(default_factory=dict)
- uuidUUIDString = dataclasses.field(default_factory=(lambda: str(uuid.uuid4())))
- nametyping_extensions.Annotated[str, []] = 'SiLA Server'
- vendor_urlURIString = 'https://sila-standard.com'
UnsupportedConfigFiletype
classThe filetype is unsupported for reading/writing config files.
- Bases
- Exception
Connector
classMain app.
Methods
__init__(self, config : ConnectorBaseConfig | None) -> NoneconfigConnectorBaseConfig | None = Nonestart(self) -> NoneStart the connector and all related services.
stop(self) -> NoneStop the connector and all related services.
wait_for_ready(self) -> NoneWait until the connector is ready.
wait_for_termination(self) -> NoneWait until the connector is terminated.
Attributes
- __config= config or ConnectorBaseConfig()
- _ready= asyncio.Event()
- _shutdown= asyncio.Event()
- _shutdown_handlerslist[Handler] = []
- _sila_server= None
- _discovery= None
- _cloud_server= None
- configConnectorBaseConfig = NoneThe configuration.
- sila_serversila.server.server.Server | None = NoneThe SiLA Server.
- loggerlogging.Logger = NoneA standard Python :class:`~logging.Logger` for the app.
- debugbool = NoneWhether debug mode is enabled.
Publisher
classAn observable which updates itself by polling a data source.
Methods
__init__(- self,
- source : typing_extensions.Callable[[], collections.abc.Awaitable[IN]] | typing_extensions.Callable[[], IN],
- interval : float,
- maxsize : int,
- pipe : PipeFunction[IN, OUT] | None
sourceA function or coroutine that will be called at a fixed interval as the data source of the subscription.intervalfloat = 5How many seconds to wait between polling calls to `source`.maxsizeint = 0The maximum number of messages to track in the queue.pipePipeFunction[IN, OUT] | None = None@typing.override
on_subscribe(self) -> None@typing.override
_on_subscribe(self, subscription : Subscription) -> Nonesubscription@typing.override
on_unsubscribe(self) -> None_set(self) -> NoneCreate a background task to poll the data `source` and update the current value. Task will be destroyed when all subscriptions to the `Publisher` are removed.
_unset(self) -> NoneStop the background task that polls the data `source`. This is called when all subscriptions to the `Publisher` are removed.
__self_update(self) -> None
Attributes
- _update_taskasyncio.Task | None = None
- _source= source
- _interval= interval
Subject
classAn observable that can be updated externally and subscribed to by multiple observers.
Methods
__init__(self, maxsize : int, pipe : PipeFunction[IN, OUT] | None) -> Nonemaxsizeint = 0The maximum number of messages to track in `Subscription` queues created by `subscribe`.pipePipeFunction[IN, OUT] | None = None__repr__(self) -> strstrsubscribe(self) -> Subscription[OUT]Add a `Subscription` that will be notified on `update`.
on_subscribe(self) -> NoneEmit an event when the first subscription is added. Override this method to start external listeners or resources when the first subscriber begins listening.
_on_subscribe(self, subscription : Subscription) -> NoneEmit an event when `subscribe` is called.
subscriptionunsubscribe(self, subscriber : Subscription[typing_extensions.Any]) -> NoneRemove a `Subscription`.
subscriberSubscription[typing_extensions.Any]on_unsubscribe(self) -> NoneEmit an event when the last subscription is removed. Override this method to perform cleanup or release resources when there are no active subscribers.
_on_unsubscribe(self) -> NoneEmit an event when `unsubscribe` is called.
notify(self) -> NonePropagate the current value to all listening `Subscription`s.
Create a new `Subject` with `func` added to the list of pipes that are applied to values recieved from `notify`.
functyping_extensions.Callable[[], T]The callable that should be applied to all values seen by the new `Subject`.temporarybool = FalseWhether or not the pipe should be pruned from its parent on `unsubscribe`.filter(- self,
- predicate : typing_extensions.Callable[[], bool | typing_extensions.Any],
- temporary : bool
Create a new `Subject` that is only notified when the item passes the `predicate`.
predicatetyping_extensions.Callable[[], bool | typing_extensions.Any]A filter predicate to apply.temporarybool = FalseWhether the not the filter should be pruned from its parent on `unsubscribe`.@typing.override
__enter__(self) -> Subscription[OUT]Return a new `Subscription` upon entering the runtime context.
The newly created `Subscription`.@typing.override
__exit__(- self,
- exc_type : type[BaseException] | None,
- exc_value : BaseException | None,
- traceback : types.TracebackType | None
exc_typetype[BaseException] | None = Noneexc_valueBaseException | None = Nonetracebacktypes.TracebackType | None = Nonebool
Attributes
- _maxsize= maxsize
- _total_subscribers= 0
- _subscriberslist[Subscription[OUT]] = []
- _parentSubject | None = None
- _is_temporary= False
- _contextSubscription[OUT] | None = None
- _pipePipeFunction[IN, OUT] = pipe or default_pipe
- subscriberslist[Subscription[OUT]] = NoneAll `Subscription`s listening to this `Subject`.
- has_subscribersbool = NoneWhether any `Subject` listens to this `Subscription`.
Subscription
classAn AsyncIterable you can asynchronously add items to.
Methods
__repr__(self) -> strstrcancel(self) -> NoneCancel the subscription.
terminate(self) -> NoneUnsubscribe the subscription from its parent.
get(- self,
- predicate : typing_extensions.Callable[[], bool],
- timeout : float | None
Request an upcoming value that satisfies the `predicate`. If used without `timeout` this will block indefinitely until a value satisfies the `predicate`.
predicatetyping_extensions.Callable[[], bool]A filter predicate to apply.timeoutfloat | None = NoneHow many seconds to wait for new value before timing out.
Attributes
- _parentSubject = weakref.proxy(parent)
- _closed= asyncio.Event()
- sizeint = NoneThe number of items in the queue.