ubii.node.protocol module

ubii.node.protocol.DefaultProtocol

alias of ResettableProtocol

class ubii.node.protocol.States(value)

Bases: IntFlag

States of the default Client Protocol

STARTING = 1
CREATED = 2
REGISTERED = 4
CONNECTED = 8
STOPPED = 16
HALTED = 32
ANY = 63
class ubii.node.protocol.LatePMInitProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)

Bases: LegacyProtocol

This is the updated version of the ubii.node.protocol.LegacyProtocol able to load installed processing modules that require a partly initialized client node.

SETUPTOOLS_PM_ENTRYPOINT_KEY = 'ubii.processing_modules'

Processing modules need to register their entry points with this key

classmethod load_pm_entry_points() Dict[str, Any]

Loads setuptools entrypoints for key SETUPTOOLS_PM_ENTRYPOINT_KEY

Returns:

list of ProcessingRoutine types

async create_client(context: Context)

In addition to the behaviour of LegacyProtocol.create_client() also initializes all processing modules of the client defined by it’s InitProcessingModules behaviour

config: constants.UbiiConfig

Config used – contains e.g. default topic for initial server configuration service call

class ubii.node.protocol.LegacyProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)

Bases: AbstractClientProtocol[States]

The standard protocol creates one UbiiClient, registers it, implements all required behaviours and device registration as well as handling of processing modules.

starting_state: T_EnumFlag = 1
end_state: T_EnumFlag = 16
class Context(server: ubii.proto.Server | None = None, constants: ubii.proto.Constants | None = None, client: client.UbiiClient | None = None, service_connection: connections.AIOHttpRestConnection | None = None, service_map: services.ServiceMap | None = None, topic_connection: connections.AIOHttpWebsocketConnection | None = None, register_manager: AsyncContextManager['client.UbiiClient'] | None = None, subscription_manager: SubscriptionManager | None = None, session_manager: SessionManager | None = None, topic_store: topics.TopicStore[topics.BasicTopic] | None = None, exc_info: Tuple[Exception | None, Type[Exception] | None, Any] | None = None, state_change: Tuple[States, States] | None = None)

Bases: object

The fields get assigned during the lifetime of the client

server: ubii.proto.Server | None = None

The server message as send by the master node after the initial server_configuration service call

constants: ubii.proto.Constants | None = None

The constants used by the master node (default topics, data types etc.)

client: client.UbiiClient | None = None

The client that owns the protocol

service_connection: connections.AIOHttpRestConnection | None = None

Usable service connection created in create_service_map()

service_map: services.ServiceMap | None = None

Usable service map created in create_service_map()

topic_connection: connections.AIOHttpWebsocketConnection | None = None

Usable topic connection created in create_topic_connection()

register_manager: AsyncContextManager['client.UbiiClient'] | None = None

async context manager to register and unregister the client, created in register_client()

subscription_manager: SubscriptionManager | None = None

async context manager which implements the subscriptions, created in implement_subscriptions()

session_manager: SessionManager | None = None

async context manager to start and stop sessions, created in implement_sessions()

topic_store: topics.TopicStore[topics.BasicTopic] | None = None

Used to access topics that the client is subscribed to, created in implement_subscriptions()

exc_info: Tuple[Exception | None, Type[Exception] | None, Any] | None = None

Filled when some protocol task raises an exception

state_change: Tuple[States, States] | None = None

The state change that triggered the last callback that handled the context

client: client.UbiiClient | None

Explicitly assign the client to this attribute after initialization, otherwise a dedicated client will be created during protocol execution

property aiohttp_session

shared session for aiohttp connections

property context: Context

Returning context object for better typing

property state: T_EnumFlag
async create_service_map(context: Context)

Create a ServiceMap in the context as context.service_map.

Also initializes

async update_config(context: Context)

Update the server configuration in the context. After completion of this coroutine

async update_services(context: Context)

Update the service map in the context.

  • context.service_map is able to perform all service calls advertised by the master node after this coroutine completes.

async create_client(context: Context)

Create a client in the context.

The client is also available as client attribute

register_client(context: Context)

Needs to return a context manager, to register and unregister the client, but also sets the context.register_manager attribute so that this context manager might be used somewhere else

async create_topic_connection(context: Context)

Create a context.topic_connection using a ubii.framework.connections.AIOHttpWebSocketConnection using aiohttp_session

The state will be set to States.HALTED when the topic connection disconnects.

async implement_client(context: Context)
Implements
  • subscription behaviour <implement_subscriptions>

  • publish behaviour <implement_publish>

  • client registration / deregistration <implement_register>

context.client can be awaited after this coroutine is finished, to return a fully functional client.

async implement_sessions(context: Context)
implement_subscriptions(context: Context)

Implement ubii.node.Subscriptions behaviour of context.client

implement_publish(context: Context)

Implement ubii.node.Publish behaviour of context.client

implement_register(context: Context)

Implement ubii.node.Register behaviour of context.client using the context.register_manager

implement_devices(context: Context)

Implement ubii.node.Devices behaviour of context.client

async on_create(context: Context) None
async on_registration(context: Context)

Use implement_devices() to implement device registration behaviour for the client, and implement_processing() to implement handling of processing modules.

Register all devices of the context.client

async implement_processing(context: Context)

Implement ubii.node.RunProcessingModules behaviour of context.client

async on_halted(context: Context)

When the protocol is in States.HALTED state, we apply some error handling.

Currently, this callback checks for connection problems (e.g. unavailable master node) and restarts the protocol if they are fixed.

async on_start(context: Any) None
state_changes: Mapping[Tuple[T_EnumFlag | None, ...], Callback] = {(None, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.STARTING: 1>, <States.CREATED: 2>): <function LegacyProtocol.on_create>, (<States.CREATED: 2>, <States.REGISTERED: 4>): <function LegacyProtocol.on_registration>, (<States.REGISTERED: 4>, <States.CONNECTED: 8>): <ubii.framework.util.functools.hook object>, (<States.ANY: 63>, <States.HALTED: 32>): <function LegacyProtocol.on_halted>, (<States.HALTED: 32>, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.ANY: 63>, <States.STOPPED: 16>): <ubii.framework.util.functools.hook object>, (<States.STOPPED: 16>, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.STOPPED: 16>, None): <function ResettableProtocol.on_reset>}

Callbacks for state changes

config: constants.UbiiConfig

Config used – contains e.g. default topic for initial server configuration service call

class ubii.node.protocol.SessionManager(client: UbiiClient, timeout=3, _SessionManager__remove_session_on_clean_exit: bool = False)

Bases: object

This implements the Session behaviour using a context manager, so that topic subscriptions are removed when the client stops. Also when the context exit is caused by an exception, started sessions are stopped.

sessions: Dict[str, ubii.proto.Session]

The started sessions, mapping \(id \rightarrow Session\)

async __aenter__() SessionManager

Subscribes to the info topics for broker communication, then returns reference to self

async __aexit__(*exc_info) None

Remove started sessions if the session manager is closed with an exception or its stop_sessions_on_exit attribute is True

Parameters:

*exc_info – Exception info

Returns:

always returns None so possible exceptions are not suppressed

async get_sessions() SessionList

Simple wrapper around a service request which returns the session list inside the response

Returns:

list of running sessions

async start_session(session: Session) Session

Sends session start request and waits for the broker to confirm start

Parameters:

session – Session specification

Returns:

the specifications of the started session as communicated by the broker

async stop_session(session: Session) str

Sends a session stop request, waits for the broker to confirm stopping of the session, then waits for all associated processing modules to be destroyed.

Parameters:

session – session specification as protobuf message

Returns:

the id of the stopped session

class ubii.node.protocol.ProcessingModuleManager(context)

Bases: object

Handles processing module setup and teardown for a LegacyProtocol

class access_handler(container: Mapping[str, ProcessingModule])

Bases: object

container: Mapping[str, ProcessingModule]

managed container of \(name \rightarrow ProcessingModule\)

status(*statuses: Status) List[ProcessingModule]

Get modules with matching status

Parameters:

statuses – allowed protobuf status, only modules with matching status will be returned

Returns:

matching processing modules

name(name: str) ubii.proto.ProcessingModule | None

Get module with matching name. Only one module can match, so it is returned if found

Parameters:

name – a protobuf name, only module with this name will be returned

Returns:

processing module if found or None

id(id: str) ubii.proto.ProcessingModule | None

Get module with matching id. Only one module can match, so it is returned if found

Parameters:

id – a protobuf id, only module with this id will be returned

Returns:

generator yielding processing modules

__init__(context)

Create a handler for a protocol context :Parameters: context – the context we are working with

managed_modules: MutableMapping[str, ProcessingRoutine]

modules instantiated by the manager

by: access_handler

Use the specific methods to access the managed modules by status / name / id

async get_instance(name: str, *status: Status) ProcessingRoutine
async create_module(name: str) None

Create a module if specifications for the module are part of the clients processing modules

Parameters:

name – The name of the module

Returns:

a processing module instance

async create_modules_for_sessions(session: Session)
async notify_broker(modules: Iterable[ProcessingModule], remove=False)
async handle_io_specs(session: Session)
async on_start_session(record: TopicDataRecord)

When session is started, handle contained processing modules

async on_stop_session(record)

Stop processing modules for session.

Warning

When a client subscribes to the stop session topic, it will get sent the last stop request, without having started any modules first, we need to handle that.