ubii.node.protocol module¶
- ubii.node.protocol.DefaultProtocol¶
alias of
ResettableProtocol
- class ubii.node.protocol.States(value)¶
Bases:
IntFlagStates of the default Client Protocol
- STARTING = 1¶
- CREATED = 2¶
- REGISTERED = 4¶
- CONNECTED = 8¶
- STOPPED = 16¶
- HALTED = 32¶
- ANY = 63¶
- class ubii.node.protocol.LatePMInitProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)¶
Bases:
LegacyProtocolThis is the updated version of the
ubii.node.protocol.LegacyProtocolable to load installed processing modules that require a partly initialized client node.- SETUPTOOLS_PM_ENTRYPOINT_KEY = 'ubii.processing_modules'¶
Processing modules need to register their entry points with this key
- classmethod load_pm_entry_points() Dict[str, Any]¶
Loads setuptools entrypoints for key
SETUPTOOLS_PM_ENTRYPOINT_KEY- Returns:
list of
ProcessingRoutinetypes
- async create_client(context: Context)¶
In addition to the behaviour of
LegacyProtocol.create_client()also initializes all processing modules of the client defined by it’sInitProcessingModulesbehaviour
- config: constants.UbiiConfig¶
Config used – contains e.g. default topic for initial server configuration service call
- class ubii.node.protocol.LegacyProtocol(config: constants.UbiiConfig = constants.GLOBAL_CONFIG, log: logging.Logger | None = None)¶
Bases:
AbstractClientProtocol[States]The standard protocol creates one UbiiClient, registers it, implements all required behaviours and device registration as well as handling of processing modules.
- starting_state: T_EnumFlag = 1¶
- end_state: T_EnumFlag = 16¶
- class Context(server: ubii.proto.Server | None = None, constants: ubii.proto.Constants | None = None, client: client.UbiiClient | None = None, service_connection: connections.AIOHttpRestConnection | None = None, service_map: services.ServiceMap | None = None, topic_connection: connections.AIOHttpWebsocketConnection | None = None, register_manager: AsyncContextManager['client.UbiiClient'] | None = None, subscription_manager: SubscriptionManager | None = None, session_manager: SessionManager | None = None, topic_store: topics.TopicStore[topics.BasicTopic] | None = None, exc_info: Tuple[Exception | None, Type[Exception] | None, Any] | None = None, state_change: Tuple[States, States] | None = None)¶
Bases:
objectThe fields get assigned during the lifetime of the client
- server: ubii.proto.Server | None = None¶
The server message as send by the master node after the initial
server_configurationservice call
- constants: ubii.proto.Constants | None = None¶
The constants used by the master node (default topics, data types etc.)
- client: client.UbiiClient | None = None¶
The client that owns the protocol
- service_connection: connections.AIOHttpRestConnection | None = None¶
Usable service connection created in
create_service_map()
- service_map: services.ServiceMap | None = None¶
Usable service map created in
create_service_map()
- topic_connection: connections.AIOHttpWebsocketConnection | None = None¶
Usable topic connection created in
create_topic_connection()
- register_manager: AsyncContextManager['client.UbiiClient'] | None = None¶
async context manager to register and unregister the client, created in
register_client()
- subscription_manager: SubscriptionManager | None = None¶
async context manager which implements the subscriptions, created in
implement_subscriptions()
- session_manager: SessionManager | None = None¶
async context manager to start and stop sessions, created in
implement_sessions()
- topic_store: topics.TopicStore[topics.BasicTopic] | None = None¶
Used to access topics that the client is subscribed to, created in
implement_subscriptions()
- client: client.UbiiClient | None¶
Explicitly assign the client to this attribute after initialization, otherwise a dedicated client will be created during protocol execution
- property aiohttp_session¶
shared session for aiohttp connections
- property state: T_EnumFlag¶
- async create_service_map(context: Context)¶
Create a
ServiceMapin the context ascontext.service_map.Also initializes
context.serverfromconfigcontext.constantsfromconfigcontext.service_connectionwith workingubii.framework.connections.AIOHttpRestConnectionusingaiohttp_session
- async update_config(context: Context)¶
Update the server configuration in the context. After completion of this coroutine
context.serveris aServermessage with the configuration of the master nodecontext.constantsis aConstantsmessage of the default constants of the master node
- async update_services(context: Context)¶
Update the service map in the context.
context.service_mapis able to perform all service calls advertised by the master node after this coroutine completes.
- async create_client(context: Context)¶
Create a client in the context.
context.clientis aUbiiClientusing this protocol asprotocol
The client is also available as
clientattribute
- register_client(context: Context)¶
Needs to return a context manager, to register and unregister the client, but also sets the
context.register_managerattribute so that this context manager might be used somewhere else
- async create_topic_connection(context: Context)¶
Create a
context.topic_connectionusing aubii.framework.connections.AIOHttpWebSocketConnectionusingaiohttp_sessionThe
statewill be set toStates.HALTEDwhen the topic connection disconnects.
- async implement_client(context: Context)¶
- Implements
subscription behaviour <implement_subscriptions>
publish behaviour <implement_publish>
client registration / deregistration <implement_register>
context.clientcan be awaited after this coroutine is finished, to return a fully functional client.
- implement_subscriptions(context: Context)¶
Implement
ubii.node.Subscriptionsbehaviour ofcontext.client
- implement_publish(context: Context)¶
Implement
ubii.node.Publishbehaviour ofcontext.client
- implement_register(context: Context)¶
Implement
ubii.node.Registerbehaviour ofcontext.clientusing thecontext.register_manager
- implement_devices(context: Context)¶
Implement
ubii.node.Devicesbehaviour ofcontext.client
- async on_registration(context: Context)¶
Use
implement_devices()to implement device registration behaviour for the client, andimplement_processing()to implement handling of processing modules.Register all devices of the
context.client
- async implement_processing(context: Context)¶
Implement
ubii.node.RunProcessingModulesbehaviour ofcontext.client
- async on_halted(context: Context)¶
When the protocol is in
States.HALTEDstate, we apply some error handling.Currently, this callback checks for connection problems (e.g. unavailable master node) and restarts the protocol if they are fixed.
- state_changes: Mapping[Tuple[T_EnumFlag | None, ...], Callback] = {(None, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.STARTING: 1>, <States.CREATED: 2>): <function LegacyProtocol.on_create>, (<States.CREATED: 2>, <States.REGISTERED: 4>): <function LegacyProtocol.on_registration>, (<States.REGISTERED: 4>, <States.CONNECTED: 8>): <ubii.framework.util.functools.hook object>, (<States.ANY: 63>, <States.HALTED: 32>): <function LegacyProtocol.on_halted>, (<States.HALTED: 32>, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.ANY: 63>, <States.STOPPED: 16>): <ubii.framework.util.functools.hook object>, (<States.STOPPED: 16>, <States.STARTING: 1>): <function LegacyProtocol.on_start>, (<States.STOPPED: 16>, None): <function ResettableProtocol.on_reset>}¶
Callbacks for state changes
- config: constants.UbiiConfig¶
Config used – contains e.g. default topic for initial server configuration service call
- class ubii.node.protocol.SessionManager(client: UbiiClient, timeout=3, _SessionManager__remove_session_on_clean_exit: bool = False)¶
Bases:
objectThis implements the Session behaviour using a context manager, so that topic subscriptions are removed when the client stops. Also when the context exit is caused by an exception, started sessions are stopped.
- sessions: Dict[str, ubii.proto.Session]¶
The started sessions, mapping \(id \rightarrow Session\)
- async __aenter__() SessionManager¶
Subscribes to the info topics for broker communication, then returns reference to self
- async __aexit__(*exc_info) None¶
Remove started sessions if the session manager is closed with an exception or its
stop_sessions_on_exitattribute is True- Parameters:
*exc_info – Exception info
- Returns:
always returns None so possible exceptions are not suppressed
- async get_sessions() SessionList¶
Simple wrapper around a service request which returns the session list inside the response
- Returns:
list of running sessions
- class ubii.node.protocol.ProcessingModuleManager(context)¶
Bases:
objectHandles processing module setup and teardown for a
LegacyProtocol- class access_handler(container: Mapping[str, ProcessingModule])¶
Bases:
object- container: Mapping[str, ProcessingModule]¶
managed container of \(name \rightarrow ProcessingModule\)
- status(*statuses: Status) List[ProcessingModule]¶
Get modules with matching status
- Parameters:
statuses – allowed protobuf status, only modules with matching status will be returned
- Returns:
matching processing modules
- name(name: str) ubii.proto.ProcessingModule | None¶
Get module with matching name. Only one module can match, so it is returned if found
- Parameters:
name – a protobuf name, only module with this name will be returned
- Returns:
processing module if found or None
- id(id: str) ubii.proto.ProcessingModule | None¶
Get module with matching id. Only one module can match, so it is returned if found
- Parameters:
id – a protobuf id, only module with this id will be returned
- Returns:
generator yielding processing modules
- __init__(context)¶
Create a handler for a protocol context :Parameters: context – the context we are working with
- managed_modules: MutableMapping[str, ProcessingRoutine]¶
modules instantiated by the manager
- by: access_handler¶
Use the specific methods to access the managed modules by status / name / id
- async get_instance(name: str, *status: Status) ProcessingRoutine¶
- async create_module(name: str) None¶
Create a module if specifications for the module are part of the clients processing modules
- Parameters:
name – The name of the module
- Returns:
a processing module instance
- async notify_broker(modules: Iterable[ProcessingModule], remove=False)¶
- async on_start_session(record: TopicDataRecord)¶
When session is started, handle contained processing modules
- async on_stop_session(record)¶
Stop processing modules for session.
Warning
When a client subscribes to the stop session topic, it will get sent the last stop request, without having started any modules first, we need to handle that.