ubii.framework.client module

This module implements the framework to implement a Ubi Interact Node. A Ubi Interact Node is expected to perform some communication with the master node and conceptualizes the API for all master node interactions, e.g. subscribing to topics, registering devices and so on. What exactly is expected from a client node to be functional, and what the specific client node actually implements on top of the required behavior depends on the state of the master node implementation.

The currently used master node for all Ubi Interact scenarios is the Node JS node, which expects the client node to:

  • provide an API for all advertised service calls

  • register itself

  • establish a data connection to receive TopicData messages for subscribed topics

Optionally, some client nodes (e.g. a Python Node using the ubii.node.protocol.DefaultProtocol or the Node JS node know how to communicate with the master node to e.g.

  • start and stop Processing Modules

These two requirements are separated in the python framework:

  1. A UbiiClient defines what kind of behavior and features it is able to implement

  2. A AbstractClientProtocol provides a flexible framework to implement the necessary communication with the master node to implement those features

A UbiiClient only knows how to communicate with the master node through its protocol i.e. an existing UbiiClient should be able to handle different master node versions by simply using a corresponding protocol if necessary.

A client node typically provides lots of different features, some could be methods (like subscribing and unsubscribing from topics) or simply other objects that encapsulate different parts of a feature. Instead of fixing the design, the framework uses dataclasses to describe an arbitrary number of user defined attributes, grouped by feature:

  • each dataclass describes one feature

  • the UbiiClient is initialized with lists of dataclasses for required and optional behaviors

  • the attributes of the dataclass are accessible through the UbiiClient (for every class passed during initialization)

  • since dataclasses enforce the use of typehints, a UbiiClient provides a typed API even for its dynamically added attributes (the tradeoff being increased verbosity when accessing the attributes)

  • each attribute will be “assigned” at some point during the execution of the clients UbiiClient.protocol

Note

A client is considered usable if all attributes defined by required behaviors have been assigned!

By default, the UbiiClient has the required behaviors:
and optional behaviors

The ubii.node.DefaultProtocol will register the client and implement the behaviors. The client is considered usable as soon as the required behaviors are implemented, i.e. when it is able to make service calls, (un-)subscribe to/from topics and publish its own ubii.proto.TopicData. Subscribing and unsubscribing are technically partly also service calls, but in addition to communicating the intent to subscribe or unsubscribe to the master node, they return special Topic objects, that can be used to handle to published TopicData. This is explained in greater detail in the Topic documentation.

class ubii.framework.client.BehaviorDict

Bases: dict

required_behaviors: Tuple[Type, ...]
optional_behaviors: Tuple[Type, ...]
class ubii.framework.client.subscribe_call(*args, **kwargs)

Bases: Protocol

patterns: Tuple[str, ...]
with_callback(callback: Consumer) Awaitable[Tuple[Tuple[Topic, ...], Tuple]]

set optional callback that should be registered for the subscribed topics

Parameters:

*callback – topic data consumer

Returns:

topic and tokens for callback de-registration

__call__(*pattern: str) subscribe_call

subscribe_call objects need to have this call signature needs to set patterns attribute.

Parameters:

*pattern – unix wildcard patterns or absolute topic names

__await__() Generator[Any, None, Tuple[Topic, ...]]
Returns:

a tuple of processed topics (one for each pattern, in patterns same order)

class ubii.framework.client.unsubscribe_call(*args, **kwargs)

Bases: Protocol

__call__(*pattern: str) Awaitable[Tuple[Topic, ...]]

unsubscribe_call objects need to have this call signature

Parameters:

*pattern – unix wildcard patterns or absolute topic names

Returns:

awaitable returning a tuple of processed topics (one for each pattern, same order)

class ubii.framework.client.publish_call(*args, **kwargs)

Bases: Protocol

__call__(*records: ubii.proto.TopicDataRecord | Dict) Awaitable[None]

publish_call objects need to have this call signature

Parameters:

*recordsTopicDataRecord messages or compatible dictionaries

Returns:

some awaitable performing the master node communication

class ubii.framework.client.start_session(*args, **kwargs)

Bases: Protocol

async __call__(session: Session) Session

Await to start a session :Parameters: session – session request to start

Returns:

the started session specifications

class ubii.framework.client.stop_session(*args, **kwargs)

Bases: Protocol

async __call__(session: Session) bool

Await to stop a session :Parameters: session – session request to stop

Returns:

wether stopping was successful

class ubii.framework.client.Services(service_map: services.DefaultServiceMap | None = None)

Bases: object

Behavior to make service calls (accessed via the service map)

Example

>>> from ubii.node import *
>>> client = await connect_client()
>>> assert client.implements(Services)
>>> await client[Services].service_map.server_config()
server {
  id: "c2741cca-c75a-41d4-820a-80ac6407d791"
  name: "master-node"
  [...]
}
service_map: services.DefaultServiceMap | None = None

The DefaultServiceMap can be accessed with “shortcuts” for service topics

See also

services.DefaultServiceMap.defaults – how attribute access for the service map works

class ubii.framework.client.Subscriptions(subscribe_regex: subscribe_call | None = None, subscribe_topic: subscribe_call | None = None, unsubscribe_regex: unsubscribe_call | None = None, unsubscribe_topic: unsubscribe_call | None = None)

Bases: object

Behavior to subscribe and unsubscribe from topics

Example

>>> from ubii.node import *
>>> client = await connect_client()
>>> start_pm, = await client[Subscriptions].subscribe_topic('/info/processing_module/start')
>>> start_pm.subscriber_count
1
subscribe_regex: subscribe_call | None = None

await to subscribe with regex

subscribe_topic: subscribe_call | None = None

await to subscribe with simple topic

unsubscribe_regex: unsubscribe_call | None = None

await to unsubscribe with regex

unsubscribe_topic: unsubscribe_call | None = None

await to unsubscribe with simple topic

class ubii.framework.client.Publish(publish: publish_call | None = None)

Bases: object

Behavior to publish ubii.proto.TopicDataRecord messages. If multiple records are passed they should be converted to a ubii.proto.TopicDataList and published as such, otherwise they should be wrapped in a ubii.proto.TopicData message.

publish: publish_call | None = None

await to publish topic data

class ubii.framework.client.Register(register: Callable[[], Awaitable[UbiiClient]] | None = None, deregister: Callable[[], Awaitable[bool | None]] | None = None)

Bases: object

Behavior to optionally unregister and re-register the client node (registering once is probably required to establish a data connection for Publish behavior but unregistering and re-registering is typically optional – consult the documentation of the used protocol for details)

register: Callable[[], Awaitable[UbiiClient]] | None = None

await to register client node

deregister: Callable[[], Awaitable[bool | None]] | None = None

await to unregister client node

class ubii.framework.client.Devices(register_device: Callable[[ubii.proto.Device], Awaitable[ubii.proto.Device]] | None = None, deregister_device: Callable[[ubii.proto.Device], Awaitable[None]] | None = None)

Bases: object

Behavior to register and deregister Devices (optional)

register_device: Callable[[ubii.proto.Device], Awaitable[ubii.proto.Device]] | None = None

await to register a device

deregister_device: Callable[[ubii.proto.Device], Awaitable[None]] | None = None

await to deregister a device

class ubii.framework.client.Sessions(sessions: Dict[str, ubii.proto.Session] | None = None, start_session: start_session | None = None, stop_session: stop_session | None = None, get_sessions: Callable[[], Awaitable[ubii.proto.SessionList]] | None = None)

Bases: object

Behavior to start and stop Sessions

sessions: Dict[str, ubii.proto.Session] | None = None

the sessions started by this node

start_session: start_session | None = None

await to start a session

stop_session: stop_session | None = None

await to stop a session

get_sessions: Callable[[], Awaitable[ubii.proto.SessionList]] | None = None

await to get running sessions from broker

class ubii.framework.client.wait_for_module(*args, **kwargs)

Bases: Protocol

__call__(name: str, *possible_status: Status) Awaitable[ProcessingRoutine]

Wait until the specified module has the specified status

Parameters:
  • name – module name

  • possible_status – callable should use INITIALIZED by default if no other status is given, any statuses given will be accepted

Returns:

an awaitable returning the module once it has (one of) the specified status(es)

class ubii.framework.client.RunProcessingModules(get_module_instance: wait_for_module | None = None)

Bases: object

Access all running processing module instances

get_module_instance: wait_for_module | None = None

Wait until the specified module is e.g. initialized, then return the module

ubii.framework.client.ProcessingModuleFactory

Convenience Type

alias of Callable[[…], ProcessingRoutine]

class ubii.framework.client.InitProcessingModules(module_factories: Mapping[str, ProcessingModuleFactory] | None = None)

Bases: object

Behavior to initialize ProcessingModules with custom callables

module_factories: Mapping[str, ProcessingModuleFactory] | None = None

Mapping \(name \rightarrow factory\) for module names to callables which return a processing.ProcessingRoutine instance. If the client implements it, you can put custom callables inside, so they will get used during module instantiation

class ubii.framework.client.DiscoverProcessingModules(discover_processing_modules: Callable[[], Dict[str, ProcessingModuleFactory]] | None = None)

Bases: object

Behaviour to automatically load ProcessingModules

discover_processing_modules: Callable[[], Dict[str, ProcessingModuleFactory]] | None = None

name rightarrow factory for module names to callables which return a processing.ProcessingRoutine instance.

Type:

Callable returning a mapping of math

class ubii.framework.client.UbiiClient(mapping=None, *, protocol: T_Protocol, required_behaviors: Tuple[Type, ...] = (Services, Subscriptions, Publish), optional_behaviors: Tuple[Type, ...] = (Register, Devices, RunProcessingModules, InitProcessingModules, DiscoverProcessingModules, Sessions), **kwargs)

Bases: Client, Awaitable[UbiiClient], Generic[T_Protocol]

A UbiiClient inherits its proto message wrapping capabilities from ubii.proto.Client.

The protocol of the client typically implements the following additional behaviors:

  • making ServiceCalls via the Services behavior – this involves accessing the right Service for your task by topic, and calling it with the right kind of data (see https://github.com/SandroWeber/ubi-interact/wiki/Requests for more documentation on default topics for services and expected data)

  • subscribe to topics (or topic patterns) at the master node – this process involves making the right service call and then creating a internal representation of the topic to add callbacks and forward received data. Because of this complexity you should not subscribe to topics via a simple ServiceCall, and instead use the Subscriptions behavior. Make sure to use the _regex version of a method when you subscribe to a wildcard pattern.

  • publish data on topics – this requires a TopicDataRecord message or a compatible dictionary (see documentation of the message formats) and the Publish behavior

  • run Processing Modules – processing modules need to be registered at the master node. Add the modules to the processing_modules field of the client for PMs which can be initialized when the client node is created, or to the late_init_processing_modules field of the InitProcessingModules behavior for modules that need to be initialized at a later point of the protocol (e.g. a processing module might need to know the master node’s definition of datatype messages, so it can only be initialized after some initial communication between client and master node.

The UbiiClient will start it’s Client Protocol when it is awaited directly or indirectly (see examples below). The protocol will implement the behaviors.

It’s required to link a client and its protocol explicitly:

from ubii.node.protocol import DefaultProtocol
from ubii.framework.client import UbiiClient, Services
import asyncio

async def main():
    protocol = DefaultProtocol()
    client = UbiiClient(protocol=protocol)
    protocol.client = client

    ...

asyncio.run(main())

Awaiting a UbiiClient object:

from ubii.node.protocol import DefaultProtocol
from ubii.framework.client import UbiiClient, Services
import asyncio

async def main():
    protocol = DefaultProtocol()
    client = UbiiClient(protocol=protocol, name="Foo")  # name is a message field
    protocol.client = client
    assert client.name == 'Foo'

    # you could set some attributes before you 'start' the client
    client.is_dedicated_processing_node = True

    # now wait for the client to be usable
    client = await client
    assert client.id  # will be set because the client is registered now

Using the UbiiClient object as an async context manager:

from ubii.node.protocol import DefaultProtocol
from ubii.framework.client import UbiiClient, Services
import asyncio

async def main():
    protocol = DefaultProtocol()
    client = UbiiClient(protocol=protocol, name="Foo")  # name is a message field
    protocol.client = client

    async with client as running:
        assert running.id  # client is already registered

    assert not client.id  # client gets unregistered when context exits

When the client is awaited (either directly or as an async context manager) the protocol is started internally unless it is already running. Refer to AbstractClientProtocol.start() for details.

registry

Mapping \(id \rightarrow Client\) containing all live UbiiClients with id. Refer to the documentation of util.ProtoRegistry for details.

from ubii.framework.client import UbiiClient
from ubii.node.protocol import DefaultProtocol

async def main():
    # you could instead use ubii.node.connect_client
    protocol = DefaultProtocol()
    client = UbiiClient(protocol=protocol)
    protocol.client = client

    # empty dictionary, since client does not have an id
    assert not client.id
    assert not UbiiClient.registry

    # starts client protocol and returns control when client has id
    await client

    assert client.id
    assert UbiiClient.registry[client.id] == client
Type:

Dict[str, UbiiClient]

id

Field of type STRING – inherited from Client

Type:

proto.fields.Field

name

Field of type STRING – inherited from Client

Type:

proto.fields.Field

devices

RepeatedField of type Device – inherited from Client

Type:

proto.fields.RepeatedField

tags

RepeatedField of type STRING – inherited from Client

Type:

proto.fields.RepeatedField

description

Field of type STRING – inherited from Client

Type:

proto.fields.Field

processing_modules

RepeatedField of type ProcessingModule – inherited from Client

Type:

proto.fields.RepeatedField

is_dedicated_processing_node

Field of type BOOL – inherited from Client

Type:

proto.fields.Field

host_ip

Field of type STRING – inherited from Client

Type:

proto.fields.Field

metadata_json

Field of type STRING – inherited from Client

Type:

proto.fields.Field

state

Field of type State – inherited from Client

Type:

proto.fields.Field

latency

Field of type FLOAT – inherited from Client

Type:

proto.fields.Field

IMPLEMENT_TIMEOUT = None

Set this value if you want to debug code that hangs in waiting for implementations using implements

class ClientInitTaskWrapper(client: UbiiClient)

Bases: Awaitable[UbiiClient]

This is a wrapper around a task that waits until the client implements the required behaviors, and then returns the client. The wrapper can be reset, with reset so that a new task is created to be used inside the wrapper.

reset() None

Use this method to reset the client behaviors and create a new wrapped task inside the wrapper.

Returns:

Reference to self, with new wrapped task

__init__(mapping=None, *, protocol: T_Protocol, required_behaviors: Tuple[Type, ...] = (Services, Subscriptions, Publish), optional_behaviors: Tuple[Type, ...] = (Register, Devices, RunProcessingModules, InitProcessingModules, DiscoverProcessingModules, Sessions), **kwargs)

Creates a UbiiClient object. The UbiiClient is awaitable. When it is used in an Await expression, the coroutine will wait until all attributes for the clients required_behaviors are assigned. These assignments typically happen as part of the clients protocol running, sometime the types passed as required_behaviors or optional_behaviors are referred to as behaviors, and assigning something to their attributes is referred to as implementing the behavior.

Parameters:
  • mapping (Union[dict, ~.Message]) – A dictionary or message to be used to determine the values for the message fields.

  • protocol (AbstractClientProtocol) – A concrete protocol instance to be used py the client node

  • required_behaviors (typing.Tuple[typing.Type, …]) – tuple of dataclass types that need to be implemented by the protocol to consider the UbiiClient as usable

  • optional_behaviors (typing.Tuple[typing.Type, …]) – tuple of dataclass types that can optionally be implemented by the protocol whose attributes can be accessed through the UbiiClient node.

  • **kwargs – passed to ubii.proto.Client (e.g. field assignments)

property initial_specs: dict

Since clients can be reset, the clients current representation needs to be separated from the initial protobuf specifications. When the client is reset(), it’s specifications will be set to it’s current initial_specs.

The initial specs can be adapted during the client’s lifetime by explicitly assigning to values of this dictionary, otherwise it contains the specifications that were used when the object was initialized.

notify() None

Creates a task to notify all coroutines waiting for change_specs (allows easy notification from outside a coroutine i.e. a non-async callback, where it’s impossible to acquire the change_specs lock asynchronously)

property task_nursery: TaskNursery

the TaskNursery used by the protocol

property change_specs: Condition

Allows waiting for behavior attribute assignments. See also: implements

from ubii.node import connect_client

# we use connect_client to create a UbiiClient as well as a protocol and connect them
# see documentation of connect_client for details

async def main():
    async with connect_client() as client:
        await client.change_specs.wait()
        print("A behavior was implemented!")
implements(*behaviors, timeout: float | None = IMPLEMENT_TIMEOUT) util.awaitable_predicate

Returns an object that can be used to check if the client implements a certain behavior or wait until it is implemented.

from ubii.node.protocol import DefaultProtocol
from ubii.framework.client import UbiiClient, Services
import asyncio

async def main():
    protocol = DefaultProtocol()
    client = UbiiClient(protocol=protocol)
    protocol.client = client

    async def wait_for_required_behaviors_implicitly():
        return await client

    async def wait_for_behavior_explicitly():
        await client.implements(Services)  # used in await expression
        assert client.implements(Services)  # used in boolean expression

    await asyncio.gather(
        wait_for_required_behaviors_implicitly(),
        wait_for_behavior_explicitly()
    )


asyncio.run(main())
Parameters:
  • *behaviors – tuple of dataclass types passed to this UbiiClient as required_behaviors or optional_behaviors during initialization.

  • timeout – if not None, the returned awaitable will raise a asyncio.TimeoutError after specified time

Returns:

an awaitable_predicate that converts to True if all fields of the passed behaviors are initialized in this UbiiClient and / or can be used in an Await expression to wait until that is the case

property behaviors: BehaviorDict

Return mapping \((optional / required) \rightarrow dataclass\) showing which behaviors are defined as optional or required by this client. You can check their implementation status using implements().

wants(*behaviors) bool

Checks if the passed behaviours are part of the clients required or optional behaviours and are not implemented Basically just a shorthand for

all(
    (behavior in self.behaviors['optional_behaviors'] or behavior in self.behaviors['required_behavior']
    for behavior in behaviors
)
Parameters:

*behaviours – behavior types to check

Returns:

True if all behaviors are contained in required or optional behaviors

async reset()

Use this method to reset the client behaviors and allow explicitly restarting the client protocol if it is finished. Also resets the protobuf values to the contents of initial_values

Warning

This behavior is experimental, it is better to simply create a new client instance

property protocol: T_Protocol

Reference to protocol used by the client

class ubii.framework.client.AbstractClientProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)

Bases: AbstractProtocol[T_EnumFlag], Registry, ABC

ABC to implement client protocols, i.e. define the communication between client node and master node during the lifetime of the client.

state_changes

inherited from AbstractProtocol

hook_function: util.registry[str, util.hook] = <ubii.framework.util.functools.registry object>

This callable wraps the util.hook decorator but registers every decorated function, so that decorators can be easily applied to all registered hooks simultaneously

config: constants.UbiiConfig

Config used – contains e.g. default topic for initial server configuration service call

abstract async create_service_map(context)

Create a ServiceMap in the context as context.service_map which has to be able to make a single service call context.service_map.server_config

abstract async update_config(context)

Update the server configuration in the context. After completion of this coroutine

  • context.server is a Server message with the configuration of the master node

  • context.constants is a Constants message of the default constants of the master node

abstract async update_services(context)

Update the service map in the context.

  • context.service_map is able to perform all service calls advertised by the master node after this coroutine completes.

abstract async create_client(context)

Create a client in the context.

  • context.client typically is a ubii.proto.Client wrapper, e.g. a UbiiClient which at this moment is not expected to be fully functional.

abstract register_client(context) AsyncContextManager[None]

Return a context manager to register the context.client client, and unregister it when the protocol stops. After successful registration the context manager typically needs to also set the protocol state to whatever the concrete implementation expects.

  • context.client is expected to be up-to-date and usable after registration

abstract async create_topic_connection(context)

Should create a ubii.framework.topics.DataConnection.

  • context.topic_connection is expected to be a fully functional topic connection after this coroutine is completed.

abstract async implement_client(context)

Make sure the context.client has fully implemented behavior. The context at this point should contain a context.service_map and a context.topic_connection.

  • context.client can be awaited after this coroutine is finished, to return a fully functional client.

on_start(context: Any) Awaitable[None]
Awaits (in order):

The context is passed for each call, and updated according to the concrete implementation.

Note

For a concrete implementation of a client protocol, assign this callback to a state change in state_changes

Parameters:

context – A namespace or dataclass or similar object as container for manipulated values

This callable had the hook_function decorator applied. Original signature: async def on_start(self, context: 'typing.Any') -> 'None'

on_create(context) Awaitable[None]

Enters the async context manager created by register_client() in the task_nursery i.e. registers the client and prepares to unregister it if the protocol should be stopped

The context is passed to register_client()

Note

For a concrete implementation of a client protocol, assign this callback to a state change in state_changes

Parameters:

context – A namespace or dataclass or similar object as container for manipulated values

This callable had the hook_function decorator applied. Original signature: async def on_create(self, context) -> 'None'

on_registration(context) Awaitable[None]
Awaits (in order):

Then the context.client is awaited to make sure that all behaviors are implemented. The context is passed for each call, and updated according to the concrete implementation.

Note

For a concrete implementation of a client protocol, assign this callback to a state change in state_changes

Parameters:

context – A namespace or dataclass or similar object as container for manipulated values

Raises:

RuntimeError – if awaiting the context.client raises a asyncio.TimeoutError after a timeout of \(5s\)

This callable had the hook_function decorator applied. Original signature: async def on_registration(self, context) -> 'None'

on_connect(context) Awaitable[None]

Starts a ubii.framework.topics.StreamSplitRoutine in the task_nursery to split ùbii.proto.TopicData messages from the context.topic_connection to the topics of the context.topic_store

Note

For a concrete implementation of a client protocol, assign this callback to a state change in state_changes

Parameters:

context – A namespace or dataclass or similar object as container for manipulated values

This callable had the hook_function decorator applied. Original signature: async def on_connect(self, context) -> 'None'

on_stop(context) Awaitable[None]

Sets the state of the client to UNAVAILABLE

Note

For a concrete implementation of a client protocol, assign this callback to a state change in state_changes

Parameters:

context – A namespace or dataclass or similar object as container for manipulated values

This callable had the hook_function decorator applied. Original signature: async def on_stop(self, context) -> 'None'

classmethod __init_subclass__()

Register decorators for hook functions