unitelabs.sdk.utils.asyncio
Attributes
- Name
P- Type
- Value
= typing_extensions.ParamSpec('P')
- Description
- Name
R- Type
- Value
= typing.TypeVar('R')
- Description
- Name
Iter- Type
- Value
= typing.TypeVar('Iter')
- Description
Functions
call_async(
- *args : unitelabs.sdk.utils.asyncio.P.args,
- **kwargs : unitelabs.sdk.utils.asyncio.P.kwargs
Calls an async function synchronously and blocks until it finishes.
Parameters
- Name
*args- Type
- unitelabs.sdk.utils.asyncio.P.args
- Default
- = ()
- Description
- Name
**kwargs- Type
- unitelabs.sdk.utils.asyncio.P.kwargs
- Default
- = {}
- Description
Response
- Type
- R
- Description
cancellable(aiter : collections.abc.AsyncIterable[Iter], cancel : asyncio.Event) -> collections.abc.AsyncIterable[Iter]
Wraps async iterators with a cancellable event.
Parameters
- Name
aiter- Type
- collections.abc.AsyncIterable[Iter]
- Default
- Description
- Name
cancel- Type
- asyncio.Event
- Default
- Description
Response
- Type
- collections.abc.AsyncIterable[Iter]
- Description
coroutine(function : typing.Callable) -> typing.Callable
Wraps click cli commands to run asynchronously.
Parameters
- Name
function- Type
- typing.Callable
- Default
- Description
Response
- Type
- typing.Callable
- Description