codestare.async_utils.helper module

codestare.async_utils.helper.make_async(func: Callable[[T], SimpleCoroutine[S] | S]) Callable[[T], SimpleCoroutine[S]]

Decorator to turn a non async function into a coroutine by running it in the default executor pool.

class codestare.async_utils.helper.RegistryMeta(name, bases, attrs)

Bases: ABCMeta

Set __unique_key_attr__ in created classes to some attribute of the class instance that is unique to use this attribute as a key in the registry instead of the __default_key__. If the __default_key__ attribute is not present in the instance, it will be copied from the class on instance creation, and appended with the number of instances created before.

All types created by this metaclass will have a __registry_key__ property, so that the key for an instance (i.e. either default key or unique key) can be accessed easily.

Warning

If the instances get garbage collected, they will not be available from the registry anymore

Example

Here you can see that the registry only uses weak references, and instances that get garbage collected are removed from the registry

>>> from codestare.async_utils import RegistryMeta
>>> class T(metaclass=RegistryMeta):
...     __unique_key_attr__ = 'name'
...     def __init__(self, name):
...             self.name = name
...     def __repr__(self):
...             return f"{self.__class__.__name__}(name={self.name!r})"
...
>>> a = T('foo')
>>> T.registry
{'foo': T(name='foo')}
>>> import gc
>>> gc.collect()
0
>>> T.registry
{'foo': T(name='foo')}
>>> a = T('bar')
>>> T.registry
{'foo': T(name='foo'), 'bar': T(name='bar')}
>>> gc.collect()
0
>>> T.registry
{'bar': T(name='bar')}
property registry: Dict[Any, T]

Mapping \(\text{instance.__registry_key__} \rightarrow instance\)

class codestare.async_utils.helper.Registry(*args, **kwargs)

Bases: object

You can inherit from this class to implicitly use the RegistryMeta metaclass

Example

Define a registry with the metaclass or by inheriting Registry

from codestare.async_utils import RegistryMeta, Registry

# virtually equivalent for most intents and purposes

class Foo(metaclass=RegistryMeta):
    pass

class Bar(Registry):
    pass

See also

RegistryMeta – more information about working with registry classes

codestare.async_utils.helper.async_exit_on_exc(ctx_manager: AsyncContextManager, task: Task, loop: Optional[BaseEventLoop] = None) None

Schedules exit of the ctx_manager if the getting the task result raises an exception other than a asyncio.CancelledError

Parameters:
  • ctx_manager – Some context manager that needs to be closed with exception info for exceptions raised by the task

  • task – a task that maybe succeeded or raised an exception

  • loop – event loop to schedule the exit, uses current running loop if not provided – optional

class codestare.async_utils.helper.awaitable_predicate(predicate: Callable[[], bool], condition: asyncio.Condition | None = None, timeout=None)

Bases: object

Typically, to let an async coroutine wait until some predicate is True, one uses a asyncio.Condition. Condition.wait_for(predicate) will block the coroutine until the predicate returns Truepredicate will be reevaluated every time the condition notifies waiting coroutines.

An awaitable_predicate object does exactly that, but it can also be evaluated to a boolean to make code more concise

Example

>>> from codestare.async_utils import awaitable_predicate
>>> value = 0
>>> is_zero = awaitable_predicate(lambda: value == 0)
>>> bool(is_zero)
True
>>> value = 1
>>> bool(is_zero)
False

Or we can wait until the predicate is actually True

>>> [...]  # continued from above
>>> async def set_value(number):
...     global value
...     async with is_zero.condition:
...             value = number
...             is_zero.condition.notify()
...
>>> async def wait_for_zero():
...     await is_zero
...     print(f"Finally! value: {value}")
...
>>> import asyncio
>>> async def main():
...     asyncio.create_task(wait_for_zero())
...     for n in reversed(range(3)):
...             await set_value(n)
...
>>> asyncio.run(main())
Finally! value: 0