codestare.async_utils package

class codestare.async_utils.accessor(*, funcs: None = ..., condition: asyncio.Condition | None = None, name: str = None)
class codestare.async_utils.accessor(*, condition: asyncio.Condition | None = None, name: str = None)
class codestare.async_utils.accessor(*, funcs: Tuple[fget_type[T], fset_type[T]] = ..., condition: asyncio.Condition | None = None, name: str = None)

Bases: Generic[T]

An accessor provides easy shared access to a resource and

Example

In the simplest case, an accessor synchronizes reads and writes out of the box

>>> import asyncio
>>> from codestare.async_utils import accessor
>>> foo = accessor()
>>> async def wait_for_write(accessor_):
...     print(await accessor_.get())
...
>>> background = asyncio.create_task(wait_for_write(foo))
>>> await foo.set("Bar")
Bar

It’s possible to use custom getters / setter e.g. create an accessor to the value managed by a normal property if one needs shared access as well

>>> class Thing:
...     def __init__(self):
...         self._value = None
...     @property
...     def value(self):
...         return self._value
...     @value._setter
...     def value(self, val):
...         if not val:
...             raise ValueError(f"Illegal value {val}")
...         self._value = val
...
>>> thing = Thing()
>>> thing.value = 3
>>> thing.value
3
>>> thing.value = 0
ValueError: Illegal value 0
>>> class BetterThing(Thing):
...     def __init__(self):
...         super().__init__()
...         self.value_accessor = accessor(funcs=(
...             type(self).value.fget.__get__(self),
...             type(self).value.fset.__get__(self)
...         ))
...
>>> better_thing = BetterThing()
>>> background = asyncio.create_task(wait_for_write(better_thing.value_accessor))
>>> await better_thing.value_accessor.set(3)
3
>>> better_thing.value
3
>>> await better_thing.value_accessor.set(0)
ValueError: Illegal value 0

See also

condition_property – decorator to create accessor properties more easily

__init__(*, funcs: None = None, condition: asyncio.Condition | None = None, name: str = None)
__init__(*, condition: asyncio.Condition | None = None, name: str = None)
__init__(*, funcs: Tuple[fget_type[T], fset_type[T]] = None, condition: asyncio.Condition | None = None, name: str = None)
Parameters:
  • funcs – getter and setter for some value – optional, if not passed get / set a private field of the object

  • condition – condition to synchronize access to the value – optional, if not passed a new condition is created

fset: fset_type[T]

Setter, either passed via funcs argument, or a setter of an internal value if no funcs where passed

fget: fget_type[T]

Getter, either passed via funcs argument, or a getter of an internal value if no funcs where passed

condition: Condition

Used to synchronized access, either passed via condition argument, or a new condition created specifically for this accessor

name

For debug purposes

has_waiting_read

Use this awaitable if you want to wait for read access

property value: T | None

Simple access to the value produced by fget without async locks i.e. not safe if you did not acquire the lock of condition

async set(value: T, wait_for_read=False) None

Sets the value (using fset) and notifies every coroutine waiting on the condition (e.g. get()

Parameters:
  • value – new value passed to fset

  • wait_for_read – if True, will set the value only after futures are waiting by using get. Use this to invert the semantics – a write waiting for a read, instead of a read waiting for a write)

async get(*, predicate: Callable[[T], bool] | None = None, wait_for_write: bool | None = None) T

Shared access to value produced by fget

Parameters:
  • predicate – waits for the predicate result to be truthy, then returns the result of fget. The default predicate (used when predicate=None) returns [False, True, True, ...], so get() blocks once, until it is notified from a set() and then does not block again. Passing predicate=(lambda: True) will make get() not block at all.

  • wait_for_write – if set to True, and a predicate is passed, the predicate will only be applied once the default predicate (see above) also returns True i.e. you get the next value that matches the predicate, even if the current value also matches. You can set this value to False, to ignore the default predicate behaviour (which is the same as passing predicate=(lambda: True) and using the default for this value. – optional

Returns:

value produced by fget

Raises:

ValueError – if predicate is not a callable

See also

asyncio.Condition.wait_for() – used to wait for internal condition

class codestare.async_utils.condition_property(fget: Callable[[Any], T] | None = None, fset: Callable[[Any, T], None] | None = None, fdel: Callable[[Any], None] | None = None, doc: str | None = None)

Bases: cached_property, Generic[T]

This is a decorator to create a cached accessor to handle access to some data via a asyncio.Condition.

You can use it like the normal @property decorator, but the result of the lookup (__get__ of the descriptor) will be an accessor with coroutine attributes to handle safely setting and getting the value (from the objects methods passed via setter and getter, like in normal properties) by means of a condition.

See also

accessor – how to access the value

getter(fget: Callable[[Any], T]) condition_property[T]

This is a cached property but uses the same interface as a normal property. See example of property documentation on how to use.

setter(fset: Callable[[Any, T], None]) condition_property[T]

This is a cached property but uses the same interface as a normal property. See example of property documentation on how to use.

deleter(fdel) condition_property[T]

This is a cached property but uses the same interface as a normal property. See example of property documentation on how to use.

codestare.async_utils.make_async(func: Callable[[T], SimpleCoroutine[S] | S]) Callable[[T], SimpleCoroutine[S]]

Decorator to turn a non async function into a coroutine by running it in the default executor pool.

class codestare.async_utils.CoroutineWrapper(*, coroutine: Coroutine[T, S, R], **kwargs)

Bases: Coroutine[T, S, R]

Complex Coroutines are easy to implement with native def async coroutine syntax, but often require some smaller coroutines to compose. Inheriting from CoroutineWrapper, a complex coroutine can encapsulate all it’s dependencies and auxiliary methods.

Note

The example uses an asyncio REPL (python -m asyncio)

Example

Let’s consider an async generator that generates iterables

>>> async def generate_ranges(up_to: int):
...     for n in range(up_to):
...             yield range(n)
...
>>> [list(r) async for r in generate_ranges(5)]
[[], [0], [0, 1], [0, 1, 2], [0, 1, 2, 3]]

Our coroutine should apply some processing to the values, and to make it interesting it should only do so to values matching an additional filter function, i.e. values for which the specified filter function returns False should be ignored

>>> async def coro(process, filter_, ranges):
...     async for r in ranges:
...         for n in r:
...             if not filter_(n): continue
...             process(n)
...
>>> await coro(print, lambda n: n % 2 == 0, generate_ranges(5))
0
0
0
2
0
2

This is pretty concise and one would probably understand what’s happening, but for the sake of the example we want to break this processing behaviour down into pieces, so that we know what’s happening. One could e.g. define a custom awaitable

>>> class coro:
...     def __init__(self, process, filter_, ranges):
...         self.filter = filter_
...         self.process = process
...         self.ranges = ranges
...         self._work = self.work()
...     async def make_flat_values(self):
...         async for r in self.ranges:
...             for n in r:
...                 yield n
...     async def filter_values(self):
...         async for value in self.make_flat_values():
...             if self.filter(value):
...                 yield value
...     async def work(self):
...         async for value in self.filter_values():
...             self.process(value)
...     def __await__(self):
...         return self._work.__await__()
...
>>> await coro(print, lambda n: n % 2 == 0, generate_ranges(5))
0
0
0
2
0
2

The problem is, that coro now isn’t a coroutine, i.e. one can’t create a task with it for example

>>> import asyncio
>>> await asyncio.create_task(coro(print, lambda n: n % 2 == 0, generate_ranges(5)))
TypeError: a coroutine was expected, got <coro object>

This is where the CoroutineWrapper comes in:

>>> from codestare.async_utils import CoroutineWrapper
>>> class coro(CoroutineWrapper):
...     def __init__(self, process, filter_, ranges):
...         self.filter = filter_
...         self.process = process
...         self.ranges = ranges
...         super().__init__(coroutine=self.work())
...     async def make_flat_values(self):
...         async for r in self.ranges:
...             for n in r:
...                 yield n
...     async def filter_values(self):
...         async for value in self.make_flat_values():
...             if self.filter(value):
...                 yield value
...     async def work(self):
...         async for value in self.filter_values():
...             self.process(value)
>>> await asyncio.create_task(coro(print, lambda n: n % 2 == 0, generate_ranges(5)))
0
0
0
2
0
2

The coro objects now actually implement the asyncio coroutine interface

__await__()

Proxy to internal coroutine object

send(value)

Proxy to internal coroutine object

throw(typ, val=None, tb=None)

Proxy to internal coroutine object

close()

Proxy to internal coroutine object

class codestare.async_utils.TaskNursery(name: str | Callable[[], str] | None = None, loop: asyncio.BaseEventLoop | None = None)

Bases: AsyncExitStack, Registry

Use a TaskNursery to create and manage tasks, instead of using the asyncio.loop.create_task() method directly.

All tasks created by a nursery receive a callback to execute when they finish, which will trigger a graceful shutdown of the event loop if necessary – this saves a lot of boilerplate code

Basically, a TaskNursery is a contextlib.AsyncExitStack that gets closed when it’s sentinel_task is cancelled. This means you can enter any number of async context managers with a nursery, and every one of them will be closed when the nursery shuts down.

Creating a TaskNursery for an event loop also sets up signal handling and exception handling for that loop, i.e. if a loop has an active TaskNursery exiting the interpreter forcefully (e.g. Ctrl+C in the shell) will gracefully shut down all nurseries first.

TaskNursery is a Registry with unique attribute name by default, i.e. all instances need to have a unique name and can be referenced by name using the TaskNursery.registry

>>> from codestare.async_utils import TaskNursery
>>> import asyncio
>>> loop = asyncio.get_event_loop_policy().get_event_loop()
>>> nursery = TaskNursery(name="Foo", loop=loop)
>>> TaskNursery.registry
{'Foo': codestare.async_utils.nursery.TaskNursery(name='Foo', loop=<...>)}
registry

Inherited from codestare.async_utils.helper.Registry

static add_shutdown_handling(loop: AbstractEventLoop) None

Add shutdown handling to the loop, once. Caches loops that were passed as argument to this method, additional calls with the same loop will be ignored.

Parameters:

loop – gets shutdown handling applied with setup_shutdown_handling()

stop_task(task: Task)

Try to stop a task managed by this nursery using stop_task()

Parameters:

task – should be stopped

Returns:

result of stop_task()

Raises:

ValueError – if the task is not managed by this nursery i.e. it was not created by create_task()

__init__(name: str | Callable[[], str] | None = None, loop: asyncio.BaseEventLoop | None = None)

Creates a nursery on the loop – or the current running loop.

Parameters:
  • name – if a callable is used, the name can be dynamic

  • loop – if no loop is passed the running loop is used

loop: BaseEventLoop

Event loop instance to start tasks in, either passed as loop or current running loop

sentinel_task: Task

Dummy task that does nothing but cancelling it triggers the closing of this TaskNursery i.e. all entered async contexts will be closed and all created tasks will be stopped.

property name

Unique name of nursery, used as Registry key by default, see __unique_key_attr__. If a callable is passed as name during initialization, this callable is used to compute the name dynamically.

property tasks

Reference to managed tasks

pop_all() TaskNursery

Preserve the context stack by transferring it to a new instance.

create_task(coro: Generator[_TaskYieldType, None, T] | Awaitable[T], **kwargs) asyncio.Task[T]

Tasks created with this method have a callback which runs when the task finishes and tries to retrieve the task result. If the task raised an error the loops exception handler is called with the exception info and a reference to this nursery’s sentinel_task. Typically, the exception handler is handle_exception() (it is set as exception handler when the TaskNursery is created for a loop without specific exception handler). It will cancel the sentinel task and trigger the nursery’s shutdown.

Parameters:
Returns:

reference to created task

class codestare.async_utils.RegistryMeta(name, bases, attrs)

Bases: ABCMeta

Set __unique_key_attr__ in created classes to some attribute of the class instance that is unique to use this attribute as a key in the registry instead of the __default_key__. If the __default_key__ attribute is not present in the instance, it will be copied from the class on instance creation, and appended with the number of instances created before.

All types created by this metaclass will have a __registry_key__ property, so that the key for an instance (i.e. either default key or unique key) can be accessed easily.

Warning

If the instances get garbage collected, they will not be available from the registry anymore

Example

Here you can see that the registry only uses weak references, and instances that get garbage collected are removed from the registry

>>> from codestare.async_utils import RegistryMeta
>>> class T(metaclass=RegistryMeta):
...     __unique_key_attr__ = 'name'
...     def __init__(self, name):
...             self.name = name
...     def __repr__(self):
...             return f"{self.__class__.__name__}(name={self.name!r})"
...
>>> a = T('foo')
>>> T.registry
{'foo': T(name='foo')}
>>> import gc
>>> gc.collect()
0
>>> T.registry
{'foo': T(name='foo')}
>>> a = T('bar')
>>> T.registry
{'foo': T(name='foo'), 'bar': T(name='bar')}
>>> gc.collect()
0
>>> T.registry
{'bar': T(name='bar')}
property registry: Dict[Any, T]

Mapping \(\text{instance.__registry_key__} \rightarrow instance\)

class codestare.async_utils.Registry(*args, **kwargs)

Bases: object

You can inherit from this class to implicitly use the RegistryMeta metaclass

Example

Define a registry with the metaclass or by inheriting Registry

from codestare.async_utils import RegistryMeta, Registry

# virtually equivalent for most intents and purposes

class Foo(metaclass=RegistryMeta):
    pass

class Bar(Registry):
    pass

See also

RegistryMeta – more information about working with registry classes

codestare.async_utils.async_exit_on_exc(ctx_manager: AsyncContextManager, task: Task, loop: Optional[BaseEventLoop] = None) None

Schedules exit of the ctx_manager if the getting the task result raises an exception other than a asyncio.CancelledError

Parameters:
  • ctx_manager – Some context manager that needs to be closed with exception info for exceptions raised by the task

  • task – a task that maybe succeeded or raised an exception

  • loop – event loop to schedule the exit, uses current running loop if not provided – optional

class codestare.async_utils.awaitable_predicate(predicate: Callable[[], bool], condition: asyncio.Condition | None = None, timeout=None)

Bases: object

Typically, to let an async coroutine wait until some predicate is True, one uses a asyncio.Condition. Condition.wait_for(predicate) will block the coroutine until the predicate returns Truepredicate will be reevaluated every time the condition notifies waiting coroutines.

An awaitable_predicate object does exactly that, but it can also be evaluated to a boolean to make code more concise

Example

>>> from codestare.async_utils import awaitable_predicate
>>> value = 0
>>> is_zero = awaitable_predicate(lambda: value == 0)
>>> bool(is_zero)
True
>>> value = 1
>>> bool(is_zero)
False

Or we can wait until the predicate is actually True

>>> [...]  # continued from above
>>> async def set_value(number):
...     global value
...     async with is_zero.condition:
...             value = number
...             is_zero.condition.notify()
...
>>> async def wait_for_zero():
...     await is_zero
...     print(f"Finally! value: {value}")
...
>>> import asyncio
>>> async def main():
...     asyncio.create_task(wait_for_zero())
...     for n in reversed(range(3)):
...             await set_value(n)
...
>>> asyncio.run(main())
Finally! value: 0

Submodules