ubii.framework.client module¶
This module implements the framework to implement a Ubi Interact Node. A Ubi Interact Node is expected to perform some communication with the master node and conceptualizes the API for all master node interactions, e.g. subscribing to topics, registering devices and so on. What exactly is expected from a client node to be functional, and what the specific client node actually implements on top of the required behavior depends on the state of the master node implementation.
The currently used master node for all Ubi Interact scenarios is the Node JS node, which expects the client node to:
provide an API for all advertised service calls
register itself
establish a data connection to receive
TopicDatamessages for subscribed topics
Optionally, some client nodes (e.g. a Python Node using the ubii.node.protocol.DefaultProtocol or the
Node JS node know how to communicate with the master node to e.g.
start and stop Processing Modules
These two requirements are separated in the python framework:
A
UbiiClientdefines what kind of behavior and features it is able to implementA
AbstractClientProtocolprovides a flexible framework to implement the necessary communication with the master node to implement those features
A UbiiClient only knows how to communicate with the master node through its
protocol i.e. an existing UbiiClient should be able to handle
different master node versions by simply using a corresponding protocol if necessary.
A client node typically provides lots of different features, some could be methods (like subscribing
and unsubscribing from topics) or simply other objects that encapsulate different parts of a feature.
Instead of fixing the design, the framework uses dataclasses to describe an arbitrary number of
user defined attributes, grouped by feature:
each
dataclassdescribes one featurethe
UbiiClientis initialized with lists of dataclasses for required and optional behaviorsthe attributes of the
dataclassare accessible through theUbiiClient(for every class passed during initialization)since
dataclassesenforce the use of typehints, aUbiiClientprovides a typed API even for its dynamically added attributes (the tradeoff being increased verbosity when accessing the attributes)each attribute will be “assigned” at some point during the execution of the clients
UbiiClient.protocol
Note
A client is considered usable if all attributes defined by required behaviors have been assigned!
- By default, the
UbiiClienthas the required behaviors: - and optional behaviors
The ubii.node.DefaultProtocol will register the client and implement
the behaviors. The client is considered usable as soon as the required behaviors are implemented,
i.e. when it is able to make service calls, (un-)subscribe to/from topics and publish its own
ubii.proto.TopicData.
Subscribing and unsubscribing are technically partly also service calls, but in addition to communicating the
intent to subscribe or unsubscribe to the master node, they return special Topic
objects, that can be used to handle to published TopicData. This is explained in
greater detail in the Topic documentation.
- class ubii.framework.client.subscribe_call(*args, **kwargs)¶
Bases:
Protocol- with_callback(callback: Consumer) Awaitable[Tuple[Tuple[Topic, ...], Tuple]]¶
set optional callback that should be registered for the subscribed topics
- Parameters:
*callback – topic data consumer
- Returns:
topic and tokens for callback de-registration
- __call__(*pattern: str) subscribe_call¶
subscribe_call objects need to have this call signature needs to set
patternsattribute.- Parameters:
*pattern – unix wildcard patterns or absolute topic names
- class ubii.framework.client.publish_call(*args, **kwargs)¶
Bases:
Protocol- __call__(*records: ubii.proto.TopicDataRecord | Dict) Awaitable[None]¶
publish_call objects need to have this call signature
- Parameters:
*records –
TopicDataRecordmessages or compatible dictionaries- Returns:
some awaitable performing the master node communication
- class ubii.framework.client.Services(service_map: services.DefaultServiceMap | None = None)¶
Bases:
objectBehavior to make service calls (accessed via the service map)
Example
>>> from ubii.node import * >>> client = await connect_client() >>> assert client.implements(Services) >>> await client[Services].service_map.server_config() server { id: "c2741cca-c75a-41d4-820a-80ac6407d791" name: "master-node" [...] }
- service_map: services.DefaultServiceMap | None = None¶
The
DefaultServiceMapcan be accessed with “shortcuts” for service topicsSee also
services.DefaultServiceMap.defaults– how attribute access for the service map works
- class ubii.framework.client.Subscriptions(subscribe_regex: subscribe_call | None = None, subscribe_topic: subscribe_call | None = None, unsubscribe_regex: unsubscribe_call | None = None, unsubscribe_topic: unsubscribe_call | None = None)¶
Bases:
objectBehavior to subscribe and unsubscribe from topics
Example
>>> from ubii.node import * >>> client = await connect_client() >>> start_pm, = await client[Subscriptions].subscribe_topic('/info/processing_module/start') >>> start_pm.subscriber_count 1
- subscribe_regex: subscribe_call | None = None¶
await to subscribe with regex
- subscribe_topic: subscribe_call | None = None¶
await to subscribe with simple topic
- unsubscribe_regex: unsubscribe_call | None = None¶
await to unsubscribe with regex
- unsubscribe_topic: unsubscribe_call | None = None¶
await to unsubscribe with simple topic
- class ubii.framework.client.Publish(publish: publish_call | None = None)¶
Bases:
objectBehavior to publish
ubii.proto.TopicDataRecordmessages. If multiple records are passed they should be converted to aubii.proto.TopicDataListand published as such, otherwise they should be wrapped in aubii.proto.TopicDatamessage.- publish: publish_call | None = None¶
await to publish topic data
- class ubii.framework.client.Register(register: Callable[[], Awaitable[UbiiClient]] | None = None, deregister: Callable[[], Awaitable[bool | None]] | None = None)¶
Bases:
objectBehavior to optionally unregister and re-register the client node (registering once is probably required to establish a data connection for
Publishbehavior but unregistering and re-registering is typically optional – consult the documentation of the usedprotocolfor details)- register: Callable[[], Awaitable[UbiiClient]] | None = None¶
await to register client node
- class ubii.framework.client.Devices(register_device: Callable[[ubii.proto.Device], Awaitable[ubii.proto.Device]] | None = None, deregister_device: Callable[[ubii.proto.Device], Awaitable[None]] | None = None)¶
Bases:
objectBehavior to register and deregister Devices (optional)
- register_device: Callable[[ubii.proto.Device], Awaitable[ubii.proto.Device]] | None = None¶
await to register a device
- class ubii.framework.client.Sessions(sessions: Dict[str, ubii.proto.Session] | None = None, start_session: start_session | None = None, stop_session: stop_session | None = None, get_sessions: Callable[[], Awaitable[ubii.proto.SessionList]] | None = None)¶
Bases:
objectBehavior to start and stop Sessions
- sessions: Dict[str, ubii.proto.Session] | None = None¶
the sessions started by this node
- start_session: start_session | None = None¶
await to start a session
- stop_session: stop_session | None = None¶
await to stop a session
- get_sessions: Callable[[], Awaitable[ubii.proto.SessionList]] | None = None¶
await to get running sessions from broker
- class ubii.framework.client.wait_for_module(*args, **kwargs)¶
Bases:
Protocol- __call__(name: str, *possible_status: Status) Awaitable[ProcessingRoutine]¶
Wait until the specified module has the specified status
- Parameters:
name – module name
possible_status – callable should use INITIALIZED by default if no other status is given, any statuses given will be accepted
- Returns:
an awaitable returning the module once it has (one of) the specified status(es)
- class ubii.framework.client.RunProcessingModules(get_module_instance: wait_for_module | None = None)¶
Bases:
objectAccess all running processing module instances
- get_module_instance: wait_for_module | None = None¶
Wait until the specified module is e.g. initialized, then return the module
- ubii.framework.client.ProcessingModuleFactory¶
Convenience Type
alias of
Callable[[…],ProcessingRoutine]
- class ubii.framework.client.InitProcessingModules(module_factories: Mapping[str, ProcessingModuleFactory] | None = None)¶
Bases:
objectBehavior to initialize ProcessingModules with custom callables
- module_factories: Mapping[str, ProcessingModuleFactory] | None = None¶
Mapping \(name \rightarrow factory\) for module names to callables which return a
processing.ProcessingRoutineinstance. If the client implements it, you can put custom callables inside, so they will get used during module instantiation
- class ubii.framework.client.DiscoverProcessingModules(discover_processing_modules: Callable[[], Dict[str, ProcessingModuleFactory]] | None = None)¶
Bases:
objectBehaviour to automatically load ProcessingModules
- class ubii.framework.client.UbiiClient(mapping=None, *, protocol: T_Protocol, required_behaviors: Tuple[Type, ...] = (Services, Subscriptions, Publish), optional_behaviors: Tuple[Type, ...] = (Register, Devices, RunProcessingModules, InitProcessingModules, DiscoverProcessingModules, Sessions), **kwargs)¶
Bases:
Client,Awaitable[UbiiClient],Generic[T_Protocol]A
UbiiClientinherits its proto message wrapping capabilities fromubii.proto.Client.The protocol of the client typically implements the following additional behaviors:
making
ServiceCallsvia theServicesbehavior – this involves accessing the right Service for your task by topic, and calling it with the right kind of data (see https://github.com/SandroWeber/ubi-interact/wiki/Requests for more documentation on default topics for services and expected data)subscribe to topics (or topic patterns) at the master node – this process involves making the right service call and then creating a internal representation of the topic to add callbacks and forward received data. Because of this complexity you should not subscribe to topics via a simple ServiceCall, and instead use the
Subscriptionsbehavior. Make sure to use the_regexversion of a method when you subscribe to a wildcard pattern.publish data on topics – this requires a
TopicDataRecordmessage or a compatible dictionary (see documentation of the message formats) and thePublishbehaviorrun Processing Modules – processing modules need to be registered at the master node. Add the modules to the
processing_modulesfield of the client for PMs which can be initialized when the client node is created, or to thelate_init_processing_modulesfield of theInitProcessingModulesbehavior for modules that need to be initialized at a later point of the protocol (e.g. a processing module might need to know the master node’s definition of datatype messages, so it can only be initialized after some initial communication between client and master node.
The
UbiiClientwill start it’sClient Protocolwhen it is awaited directly or indirectly (see examples below). The protocol will implement the behaviors.It’s required to link a client and its protocol explicitly:
from ubii.node.protocol import DefaultProtocol from ubii.framework.client import UbiiClient, Services import asyncio async def main(): protocol = DefaultProtocol() client = UbiiClient(protocol=protocol) protocol.client = client ... asyncio.run(main())
Awaiting a
UbiiClientobject:from ubii.node.protocol import DefaultProtocol from ubii.framework.client import UbiiClient, Services import asyncio async def main(): protocol = DefaultProtocol() client = UbiiClient(protocol=protocol, name="Foo") # name is a message field protocol.client = client assert client.name == 'Foo' # you could set some attributes before you 'start' the client client.is_dedicated_processing_node = True # now wait for the client to be usable client = await client assert client.id # will be set because the client is registered now
Using the
UbiiClientobject as an async context manager:from ubii.node.protocol import DefaultProtocol from ubii.framework.client import UbiiClient, Services import asyncio async def main(): protocol = DefaultProtocol() client = UbiiClient(protocol=protocol, name="Foo") # name is a message field protocol.client = client async with client as running: assert running.id # client is already registered assert not client.id # client gets unregistered when context exits
When the client is awaited (either directly or as an async context manager) the protocol is started internally unless it is already running. Refer to
AbstractClientProtocol.start()for details.- registry¶
Mapping \(id \rightarrow Client\) containing all live
UbiiClientswith id. Refer to the documentation ofutil.ProtoRegistryfor details.from ubii.framework.client import UbiiClient from ubii.node.protocol import DefaultProtocol async def main(): # you could instead use ubii.node.connect_client protocol = DefaultProtocol() client = UbiiClient(protocol=protocol) protocol.client = client # empty dictionary, since client does not have an id assert not client.id assert not UbiiClient.registry # starts client protocol and returns control when client has id await client assert client.id assert UbiiClient.registry[client.id] == client
- Type:
Dict[str, UbiiClient]
- devices¶
RepeatedFieldof typeDevice– inherited fromClient
- tags¶
RepeatedFieldof typeSTRING– inherited fromClient
- processing_modules¶
RepeatedFieldof typeProcessingModule– inherited fromClient
- IMPLEMENT_TIMEOUT = None¶
Set this value if you want to debug code that hangs in waiting for implementations using
implements
- class ClientInitTaskWrapper(client: UbiiClient)¶
Bases:
Awaitable[UbiiClient]This is a wrapper around a task that waits until the client implements the required behaviors, and then returns the client. The wrapper can be reset, with
resetso that a new task is created to be used inside the wrapper.
- __init__(mapping=None, *, protocol: T_Protocol, required_behaviors: Tuple[Type, ...] = (Services, Subscriptions, Publish), optional_behaviors: Tuple[Type, ...] = (Register, Devices, RunProcessingModules, InitProcessingModules, DiscoverProcessingModules, Sessions), **kwargs)¶
Creates a
UbiiClientobject. TheUbiiClientis awaitable. When it is used in an Await expression, the coroutine will wait until all attributes for the clients required_behaviors are assigned. These assignments typically happen as part of the clientsprotocolrunning, sometime the types passed as required_behaviors or optional_behaviors are referred to as behaviors, and assigning something to their attributes is referred to as implementing the behavior.- Parameters:
mapping (Union[dict, ~.Message]) – A dictionary or message to be used to determine the values for the message fields.
protocol (AbstractClientProtocol) – A concrete protocol instance to be used py the client node
required_behaviors (typing.Tuple[typing.Type, …]) – tuple of
dataclasstypes that need to be implemented by the protocol to consider the UbiiClient as usableoptional_behaviors (typing.Tuple[typing.Type, …]) – tuple of
dataclasstypes that can optionally be implemented by the protocol whose attributes can be accessed through the UbiiClient node.**kwargs – passed to
ubii.proto.Client(e.g. field assignments)
- property initial_specs: dict¶
Since clients can be reset, the clients current representation needs to be separated from the initial protobuf specifications. When the client is
reset(), it’s specifications will be set to it’s currentinitial_specs.The initial specs can be adapted during the client’s lifetime by explicitly assigning to values of this dictionary, otherwise it contains the specifications that were used when the object was initialized.
- notify() None¶
Creates a task to notify all coroutines waiting for
change_specs(allows easy notification from outside a coroutine i.e. a non-async callback, where it’s impossible to acquire thechange_specslock asynchronously)
- property task_nursery: TaskNursery¶
the
TaskNurseryused by theprotocol
- property change_specs: Condition¶
Allows waiting for behavior attribute assignments. See also:
implementsfrom ubii.node import connect_client # we use connect_client to create a UbiiClient as well as a protocol and connect them # see documentation of connect_client for details async def main(): async with connect_client() as client: await client.change_specs.wait() print("A behavior was implemented!")
- implements(*behaviors, timeout: float | None = IMPLEMENT_TIMEOUT) util.awaitable_predicate¶
Returns an object that can be used to check if the client implements a certain behavior or wait until it is implemented.
from ubii.node.protocol import DefaultProtocol from ubii.framework.client import UbiiClient, Services import asyncio async def main(): protocol = DefaultProtocol() client = UbiiClient(protocol=protocol) protocol.client = client async def wait_for_required_behaviors_implicitly(): return await client async def wait_for_behavior_explicitly(): await client.implements(Services) # used in await expression assert client.implements(Services) # used in boolean expression await asyncio.gather( wait_for_required_behaviors_implicitly(), wait_for_behavior_explicitly() ) asyncio.run(main())
- Parameters:
*behaviors – tuple of
dataclasstypes passed to thisUbiiClientas required_behaviors or optional_behaviors during initialization.timeout – if not None, the returned awaitable will raise a
asyncio.TimeoutErrorafter specified time
- Returns:
an
awaitable_predicatethat converts to True if all fields of the passed behaviors are initialized in thisUbiiClientand / or can be used in an Await expression to wait until that is the case
- property behaviors: BehaviorDict¶
Return mapping \((optional / required) \rightarrow dataclass\) showing which behaviors are defined as optional or required by this client. You can check their implementation status using
implements().
- wants(*behaviors) bool¶
Checks if the passed behaviours are part of the clients required or optional behaviours and are not implemented Basically just a shorthand for
all( (behavior in self.behaviors['optional_behaviors'] or behavior in self.behaviors['required_behavior'] for behavior in behaviors )
- Parameters:
*behaviours – behavior types to check
- Returns:
True if all behaviors are contained in required or optional behaviors
- async reset()¶
Use this method to reset the client behaviors and allow explicitly restarting the client protocol if it is finished. Also resets the protobuf values to the contents of
initial_valuesWarning
This behavior is experimental, it is better to simply create a new client instance
- property protocol: T_Protocol¶
Reference to protocol used by the client
- class ubii.framework.client.AbstractClientProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)¶
Bases:
AbstractProtocol[T_EnumFlag],Registry,ABCABCto implement client protocols, i.e. define the communication between client node and master node during the lifetime of the client.- state_changes¶
inherited from
AbstractProtocol
- hook_function: util.registry[str, util.hook] = <ubii.framework.util.functools.registry object>¶
This callable wraps the
util.hookdecorator but registers every decorated function, so that decorators can be easily applied to all registered hooks simultaneously
- config: constants.UbiiConfig¶
Config used – contains e.g. default topic for initial server configuration service call
- abstract async create_service_map(context)¶
Create a
ServiceMapin the context ascontext.service_mapwhich has to be able to make a single service callcontext.service_map.server_config
- abstract async update_config(context)¶
Update the server configuration in the context. After completion of this coroutine
- abstract async update_services(context)¶
Update the service map in the context.
context.service_mapis able to perform all service calls advertised by the master node after this coroutine completes.
- abstract async create_client(context)¶
Create a client in the context.
context.clienttypically is aubii.proto.Clientwrapper, e.g. aUbiiClientwhich at this moment is not expected to be fully functional.
- abstract register_client(context) AsyncContextManager[None]¶
Return a context manager to register the
context.clientclient, and unregister it when the protocol stops. After successful registration the context manager typically needs to also set the protocol state to whatever the concrete implementation expects.context.clientis expected to be up-to-date and usable after registration
- abstract async create_topic_connection(context)¶
Should create a
ubii.framework.topics.DataConnection.context.topic_connectionis expected to be a fully functional topic connection after this coroutine is completed.
- abstract async implement_client(context)¶
Make sure the
context.clienthas fully implemented behavior. The context at this point should contain a context.service_map and a context.topic_connection.context.clientcan be awaited after this coroutine is finished, to return a fully functional client.
- on_start(context: Any) Awaitable[None]¶
- Awaits (in order):
The
contextis passed for each call, and updated according to the concrete implementation.Note
For a concrete implementation of a client protocol, assign this callback to a state change in
state_changes- Parameters:
context – A namespace or dataclass or similar object as container for manipulated values
This callable had the
hook_functiondecorator applied. Original signature:async def on_start(self, context: 'typing.Any') -> 'None'
- on_create(context) Awaitable[None]¶
Enters the async context manager created by
register_client()in thetask_nurseryi.e. registers the client and prepares to unregister it if the protocol should be stoppedThe
contextis passed toregister_client()Note
For a concrete implementation of a client protocol, assign this callback to a state change in
state_changes- Parameters:
context – A namespace or dataclass or similar object as container for manipulated values
This callable had the
hook_functiondecorator applied. Original signature:async def on_create(self, context) -> 'None'
- on_registration(context) Awaitable[None]¶
- Awaits (in order):
Then the
context.clientis awaited to make sure that all behaviors are implemented. Thecontextis passed for each call, and updated according to the concrete implementation.Note
For a concrete implementation of a client protocol, assign this callback to a state change in
state_changes- Parameters:
context – A namespace or dataclass or similar object as container for manipulated values
- Raises:
RuntimeError – if awaiting the
context.clientraises aasyncio.TimeoutErrorafter a timeout of \(5s\)
This callable had the
hook_functiondecorator applied. Original signature:async def on_registration(self, context) -> 'None'
- on_connect(context) Awaitable[None]¶
Starts a
ubii.framework.topics.StreamSplitRoutinein thetask_nurseryto splitùbii.proto.TopicDatamessages from thecontext.topic_connectionto the topics of thecontext.topic_storeNote
For a concrete implementation of a client protocol, assign this callback to a state change in
state_changes- Parameters:
context – A namespace or dataclass or similar object as container for manipulated values
This callable had the
hook_functiondecorator applied. Original signature:async def on_connect(self, context) -> 'None'
- on_stop(context) Awaitable[None]¶
Sets the
stateof theclienttoUNAVAILABLENote
For a concrete implementation of a client protocol, assign this callback to a state change in
state_changes- Parameters:
context – A namespace or dataclass or similar object as container for manipulated values
This callable had the
hook_functiondecorator applied. Original signature:async def on_stop(self, context) -> 'None'
- classmethod __init_subclass__()¶
Register decorators for hook functions