codestare.async_utils package¶
- class codestare.async_utils.accessor(*, funcs: None = ..., condition: asyncio.Condition | None = None, name: str = None)¶
- class codestare.async_utils.accessor(*, condition: asyncio.Condition | None = None, name: str = None)
- class codestare.async_utils.accessor(*, funcs: Tuple[fget_type[T], fset_type[T]] = ..., condition: asyncio.Condition | None = None, name: str = None)
-
An accessor provides easy shared access to a resource and
Example
In the simplest case, an accessor synchronizes reads and writes out of the box
>>> import asyncio >>> from codestare.async_utils import accessor >>> foo = accessor() >>> async def wait_for_write(accessor_): ... print(await accessor_.get()) ... >>> background = asyncio.create_task(wait_for_write(foo)) >>> await foo.set("Bar") Bar
It’s possible to use custom getters / setter e.g. create an
accessorto the value managed by a normal property if one needs shared access as well>>> class Thing: ... def __init__(self): ... self._value = None ... @property ... def value(self): ... return self._value ... @value._setter ... def value(self, val): ... if not val: ... raise ValueError(f"Illegal value {val}") ... self._value = val ... >>> thing = Thing() >>> thing.value = 3 >>> thing.value 3 >>> thing.value = 0 ValueError: Illegal value 0 >>> class BetterThing(Thing): ... def __init__(self): ... super().__init__() ... self.value_accessor = accessor(funcs=( ... type(self).value.fget.__get__(self), ... type(self).value.fset.__get__(self) ... )) ... >>> better_thing = BetterThing() >>> background = asyncio.create_task(wait_for_write(better_thing.value_accessor)) >>> await better_thing.value_accessor.set(3) 3 >>> better_thing.value 3 >>> await better_thing.value_accessor.set(0) ValueError: Illegal value 0
See also
condition_property– decorator to create accessor properties more easily- __init__(*, funcs: None = None, condition: asyncio.Condition | None = None, name: str = None)¶
- __init__(*, condition: asyncio.Condition | None = None, name: str = None)
- __init__(*, funcs: Tuple[fget_type[T], fset_type[T]] = None, condition: asyncio.Condition | None = None, name: str = None)
- Parameters:
funcs – getter and setter for some value – optional, if not passed get / set a private field of the object
condition – condition to synchronize access to the value – optional, if not passed a new condition is created
- fset: fset_type[T]¶
Setter, either passed via
funcsargument, or a setter of an internal value if nofuncswhere passed
- fget: fget_type[T]¶
Getter, either passed via
funcsargument, or a getter of an internal value if nofuncswhere passed
- condition: Condition¶
Used to synchronized access, either passed via
conditionargument, or a new condition created specifically for this accessor
- name¶
For debug purposes
- has_waiting_read¶
Use this awaitable if you want to wait for read access
- property value: T | None¶
Simple access to the value produced by
fgetwithout async locks i.e. not safe if you did not acquire the lock ofcondition
- async set(value: T, wait_for_read=False) None¶
Sets the value (using
fset) and notifies every coroutine waiting on thecondition(e.g.get()
- async get(*, predicate: Callable[[T], bool] | None = None, wait_for_write: bool | None = None) T¶
Shared access to value produced by
fget- Parameters:
predicate – waits for the predicate result to be truthy, then returns the result of
fget. The default predicate (used whenpredicate=None) returns[False, True, True, ...], soget()blocks once, until it is notified from aset()and then does not block again. Passingpredicate=(lambda: True)will makeget()not block at all.wait_for_write – if set to
True, and a predicate is passed, the predicate will only be applied once the default predicate (see above) also returnsTruei.e. you get the next value that matches the predicate, even if the current value also matches. You can set this value to False, to ignore the default predicate behaviour (which is the same as passingpredicate=(lambda: True)and using the default for this value. – optional
- Returns:
value produced by
fget- Raises:
ValueError – if
predicateis not a callable
See also
asyncio.Condition.wait_for()– used to wait for internal condition
- class codestare.async_utils.condition_property(fget: Callable[[Any], T] | None = None, fset: Callable[[Any, T], None] | None = None, fdel: Callable[[Any], None] | None = None, doc: str | None = None)¶
Bases:
cached_property,Generic[T]This is a decorator to create a cached
accessorto handle access to some data via aasyncio.Condition.You can use it like the normal @property decorator, but the result of the lookup (__get__ of the descriptor) will be an
accessorwith coroutine attributes to handle safely setting and getting the value (from the objects methods passed viasetterandgetter, like in normal properties) by means of a condition.See also
accessor– how to access the value- getter(fget: Callable[[Any], T]) condition_property[T]¶
This is a cached property but uses the same interface as a normal
property. See example ofpropertydocumentation on how to use.
- setter(fset: Callable[[Any, T], None]) condition_property[T]¶
This is a cached property but uses the same interface as a normal
property. See example ofpropertydocumentation on how to use.
- deleter(fdel) condition_property[T]¶
This is a cached property but uses the same interface as a normal
property. See example ofpropertydocumentation on how to use.
- codestare.async_utils.make_async(func: Callable[[T], SimpleCoroutine[S] | S]) Callable[[T], SimpleCoroutine[S]]¶
Decorator to turn a non async function into a coroutine by running it in the default executor pool.
- class codestare.async_utils.CoroutineWrapper(*, coroutine: Coroutine[T, S, R], **kwargs)¶
-
Complex Coroutines are easy to implement with native
def asynccoroutine syntax, but often require some smaller coroutines to compose. Inheriting fromCoroutineWrapper, a complex coroutine can encapsulate all it’s dependencies and auxiliary methods.Note
The example uses an asyncio REPL (
python -m asyncio)Example
Let’s consider an async generator that generates iterables
>>> async def generate_ranges(up_to: int): ... for n in range(up_to): ... yield range(n) ... >>> [list(r) async for r in generate_ranges(5)] [[], [0], [0, 1], [0, 1, 2], [0, 1, 2, 3]]
Our coroutine should apply some processing to the values, and to make it interesting it should only do so to values matching an additional filter function, i.e. values for which the specified filter function returns
Falseshould be ignored>>> async def coro(process, filter_, ranges): ... async for r in ranges: ... for n in r: ... if not filter_(n): continue ... process(n) ... >>> await coro(print, lambda n: n % 2 == 0, generate_ranges(5)) 0 0 0 2 0 2
This is pretty concise and one would probably understand what’s happening, but for the sake of the example we want to break this processing behaviour down into pieces, so that we know what’s happening. One could e.g. define a custom awaitable
>>> class coro: ... def __init__(self, process, filter_, ranges): ... self.filter = filter_ ... self.process = process ... self.ranges = ranges ... self._work = self.work() ... async def make_flat_values(self): ... async for r in self.ranges: ... for n in r: ... yield n ... async def filter_values(self): ... async for value in self.make_flat_values(): ... if self.filter(value): ... yield value ... async def work(self): ... async for value in self.filter_values(): ... self.process(value) ... def __await__(self): ... return self._work.__await__() ... >>> await coro(print, lambda n: n % 2 == 0, generate_ranges(5)) 0 0 0 2 0 2
The problem is, that
coronow isn’t a coroutine, i.e. one can’t create a task with it for example>>> import asyncio >>> await asyncio.create_task(coro(print, lambda n: n % 2 == 0, generate_ranges(5))) TypeError: a coroutine was expected, got <coro object>
This is where the
CoroutineWrappercomes in:>>> from codestare.async_utils import CoroutineWrapper >>> class coro(CoroutineWrapper): ... def __init__(self, process, filter_, ranges): ... self.filter = filter_ ... self.process = process ... self.ranges = ranges ... super().__init__(coroutine=self.work()) ... async def make_flat_values(self): ... async for r in self.ranges: ... for n in r: ... yield n ... async def filter_values(self): ... async for value in self.make_flat_values(): ... if self.filter(value): ... yield value ... async def work(self): ... async for value in self.filter_values(): ... self.process(value) >>> await asyncio.create_task(coro(print, lambda n: n % 2 == 0, generate_ranges(5))) 0 0 0 2 0 2
The
coroobjects now actually implement the asyncio coroutine interface- __await__()¶
Proxy to internal coroutine object
- send(value)¶
Proxy to internal coroutine object
- throw(typ, val=None, tb=None)¶
Proxy to internal coroutine object
- close()¶
Proxy to internal coroutine object
- class codestare.async_utils.TaskNursery(name: str | Callable[[], str] | None = None, loop: asyncio.BaseEventLoop | None = None)¶
Bases:
AsyncExitStack,RegistryUse a
TaskNurseryto create and manage tasks, instead of using theasyncio.loop.create_task()method directly.All tasks created by a nursery receive a callback to execute when they finish, which will trigger a graceful shutdown of the event loop if necessary – this saves a lot of boilerplate code
Basically, a
TaskNurseryis acontextlib.AsyncExitStackthat gets closed when it’ssentinel_taskis cancelled. This means you can enter any number of async context managers with a nursery, and every one of them will be closed when the nursery shuts down.Creating a
TaskNurseryfor an event loop also sets up signal handling and exception handling for that loop, i.e. if a loop has an activeTaskNurseryexiting the interpreter forcefully (e.g.Ctrl+Cin the shell) will gracefully shut down all nurseries first.TaskNurseryis aRegistrywith unique attributenameby default, i.e. all instances need to have a unique name and can be referenced by name using theTaskNursery.registry>>> from codestare.async_utils import TaskNursery >>> import asyncio >>> loop = asyncio.get_event_loop_policy().get_event_loop() >>> nursery = TaskNursery(name="Foo", loop=loop) >>> TaskNursery.registry {'Foo': codestare.async_utils.nursery.TaskNursery(name='Foo', loop=<...>)}
- registry¶
Inherited from
codestare.async_utils.helper.Registry
- static add_shutdown_handling(loop: AbstractEventLoop) None¶
Add shutdown handling to the loop, once. Caches loops that were passed as argument to this method, additional calls with the same loop will be ignored.
- Parameters:
loop – gets shutdown handling applied with
setup_shutdown_handling()
- stop_task(task: Task)¶
Try to stop a task managed by this nursery using
stop_task()- Parameters:
task – should be stopped
- Returns:
result of
stop_task()- Raises:
ValueError – if the task is not managed by this nursery i.e. it was not created by
create_task()
- __init__(name: str | Callable[[], str] | None = None, loop: asyncio.BaseEventLoop | None = None)¶
Creates a nursery on the loop – or the current running loop.
- Parameters:
name – if a callable is used, the name can be dynamic
loop – if no loop is passed the running loop is used
- loop: BaseEventLoop¶
Event loop instance to start tasks in, either passed as
loopor current running loop
- sentinel_task: Task¶
Dummy task that does nothing but cancelling it triggers the closing of this
TaskNurseryi.e. all entered async contexts will be closed and all created tasks will be stopped.
- property name¶
Unique name of nursery, used as
Registrykey by default, see__unique_key_attr__. If a callable is passed as name during initialization, this callable is used to compute the name dynamically.
- property tasks¶
Reference to managed tasks
- pop_all() TaskNursery¶
Preserve the context stack by transferring it to a new instance.
- create_task(coro: Generator[_TaskYieldType, None, T] | Awaitable[T], **kwargs) asyncio.Task[T]¶
Tasks created with this method have a callback which runs when the task finishes and tries to retrieve the task result. If the task raised an error the
loopsexception handler is called with the exception info and a reference to this nursery’ssentinel_task. Typically, the exception handler ishandle_exception()(it is set as exception handler when theTaskNurseryis created for a loop without specific exception handler). It will cancel the sentinel task and trigger the nursery’s shutdown.- Parameters:
coro – Coroutine which should run inside the task
**kwargs – passed to
asyncio.loop.create_task()
- Returns:
reference to created task
- class codestare.async_utils.RegistryMeta(name, bases, attrs)¶
Bases:
ABCMetaSet __unique_key_attr__ in created classes to some attribute of the class instance that is unique to use this attribute as a key in the registry instead of the __default_key__. If the __default_key__ attribute is not present in the instance, it will be copied from the class on instance creation, and appended with the number of instances created before.
All types created by this metaclass will have a
__registry_key__property, so that the key for an instance (i.e. either default key or unique key) can be accessed easily.Warning
If the instances get garbage collected, they will not be available from the registry anymore
Example
Here you can see that the registry only uses weak references, and instances that get garbage collected are removed from the registry
>>> from codestare.async_utils import RegistryMeta >>> class T(metaclass=RegistryMeta): ... __unique_key_attr__ = 'name' ... def __init__(self, name): ... self.name = name ... def __repr__(self): ... return f"{self.__class__.__name__}(name={self.name!r})" ... >>> a = T('foo') >>> T.registry {'foo': T(name='foo')} >>> import gc >>> gc.collect() 0 >>> T.registry {'foo': T(name='foo')} >>> a = T('bar') >>> T.registry {'foo': T(name='foo'), 'bar': T(name='bar')} >>> gc.collect() 0 >>> T.registry {'bar': T(name='bar')}
- class codestare.async_utils.Registry(*args, **kwargs)¶
Bases:
objectYou can inherit from this class to implicitly use the
RegistryMetametaclassExample
Define a registry with the metaclass or by inheriting
Registryfrom codestare.async_utils import RegistryMeta, Registry # virtually equivalent for most intents and purposes class Foo(metaclass=RegistryMeta): pass class Bar(Registry): pass
See also
RegistryMeta– more information about working with registry classes
- codestare.async_utils.async_exit_on_exc(ctx_manager: AsyncContextManager, task: Task, loop: Optional[BaseEventLoop] = None) None¶
Schedules exit of the
ctx_managerif the getting the task result raises an exception other than aasyncio.CancelledError- Parameters:
ctx_manager – Some context manager that needs to be closed with exception info for exceptions raised by the
tasktask – a task that maybe succeeded or raised an exception
loop – event loop to schedule the exit, uses current running loop if not provided – optional
- class codestare.async_utils.awaitable_predicate(predicate: Callable[[], bool], condition: asyncio.Condition | None = None, timeout=None)¶
Bases:
objectTypically, to let an
asynccoroutine wait until some predicate is True, one uses aasyncio.Condition.Condition.wait_for(predicate)will block the coroutine until thepredicatereturns True –predicatewill be reevaluated every time the conditionnotifieswaiting coroutines.An
awaitable_predicateobject does exactly that, but it can also be evaluated to a boolean to make code more conciseExample
>>> from codestare.async_utils import awaitable_predicate >>> value = 0 >>> is_zero = awaitable_predicate(lambda: value == 0) >>> bool(is_zero) True >>> value = 1 >>> bool(is_zero) False
Or we can wait until the predicate is actually True
>>> [...] # continued from above >>> async def set_value(number): ... global value ... async with is_zero.condition: ... value = number ... is_zero.condition.notify() ... >>> async def wait_for_zero(): ... await is_zero ... print(f"Finally! value: {value}") ... >>> import asyncio >>> async def main(): ... asyncio.create_task(wait_for_zero()) ... for n in reversed(range(3)): ... await set_value(n) ... >>> asyncio.run(main()) Finally! value: 0