ubii.framework.topics module

This module provides some classes to implement the handling of the ubii.proto.TopicData messages which are send by the master node when the client subscribes to a topic via a ubii.framework.services.ServiceCall.

The client node will need a DataConnection to the master node, e.g. a websocket connection. Some example connections are implemented in ubii.framework.connections. At which point during its lifetime the client is able to establish this DataConnection is described in greater detail in the ubii.node.protocol module.

After the DataConnection is established, messages can be read using the async interator API and send using the DataConnection.send() method.

Note

The JS API currently defines methods to directly add a callback for a topic pattern – to do the same in the Python API you first get a reference to the specific Topic using the pattern, then add the callback with the topics register_callback() method.

import logging
from ubii.node import connect_client
from ubii.framework.client import Subscriptions

log = logging.getLogger('MyLogger')

async def main():
    async with connect_client() as client:
        topic, = await client[Subscriptions].subscribe_topic('foo/bar') # get a topic reference
        topic.register_callback(log.info) # add callback

See also

TopicStore – details how topics are retrieved by pattern

class ubii.framework.topics.Consumer(*args, **kwargs)

Bases: Protocol[T_contra]

__call__(value: T_contra) Coroutine[Any, Any, None] | None

Consumer objects need to have this call signature

Parameters:

value – something to consume

Returns:

no return or coroutine with no return

class ubii.framework.topics.DataConnection

Bases: AsyncIterator[TopicData]

A DataConnection can be used to asynchronously iterate over received TopicData messages, and to send TopicData messages to the master node.

abstract async send(data: TopicData)
class ubii.framework.topics.TopicCoroutine(*, buffer: AsyncIterator[T_Buffer], callback: Consumer[T_Buffer])

Bases: CoroutineWrapper[Any, Any, None], Generic[T_Buffer]

A topic coroutine waits until a value is written to the topic and then runs its callback.

__init__(*, buffer: AsyncIterator[T_Buffer], callback: Consumer[T_Buffer])
Parameters:
  • buffer – Iterator producing buffer value

  • callback – a callable consuming the buffer values

class ubii.framework.topics.TopicDataBufferManager

Bases: Generic[T_Buffer], ABC

Simple ABC to make sure the inheriting class has a buffer field

abstract property buffer: accessor[T_Buffer]

Use this attribute to synchronize access to the managed resource

class ubii.framework.topics.Topic(pattern, *, token_factory: Callable[[], T_Token], task_nursery: util.TaskNursery | None = None, **kwargs)

Bases: AsyncIterator[T_Buffer], TopicDataBufferManager[T_Buffer], Generic[T_Buffer, T_Token], ABC

A Topic can be used to asynchronously iterate over TopicDataRecords which are published to the topic. It can also register (and unregister) callbacks to handle the published values in a background task.

To “publish” a value to the topic locally use await topic.buffer.set(...).

The Client Protocol makes sure that all TopicDataRecord messages received via the clients DataConnection (created in its Context) get forwarded to matching topics in this way. One doesn’t need to manually “publish” data except when e.g. mocking a connection.

To publish TopicData to the master node instead use the Publish behaviour of the UbiiClient.

Warning

The master node does not allow the clients to subscribe to the same topic multiple times. The application code on the other hand might want to “subscribe” to the topic, without caring about other subscriptions in other parts of the application. Implementing this kind of “subscription manager” can be easily done with the subscriber_count property as well as the add_subscriber(), remove_subscriber() and remove_all_subscribers() methods, which trigger the on_subscribers_change callback. E.g. set the actual service call as on_subscribers_change callback of the topic and simply count calls to ‘subscribe’ and ‘unsubscribe’ from the topic by setting the virtual subscriber_count.

See also

implement_subscriptions – example implementation of Subscriptions behaviour using this module

buffer

inherited from TopicDataBufferManager

__init__(pattern, *, token_factory: Callable[[], T_Token], task_nursery: util.TaskNursery | None = None, **kwargs) None
Parameters:
  • pattern – initializes pattern

  • token_factory – initializes token_factory

  • task_nursery – initializes task_nursery

  • **kwargs – passed to super class init methods

on_subscribers_change: OnSubscribersChange | None

Special callback to execute when subscriber_count changes – defaults to None

pattern: str

Wildcard pattern or simple string defining the topic

task_nursery: util.TaskNursery

takes care of managing tasks for callbacks

token_factory: Callable[[], T_Token]

called to create unique tokens to unregister callbacks, see register_callback()

callback_tasks: Dict[T_Token, asyncio.Task]

Mapping \(Token \rightarrow Task\) to access created callback tasks

property subscriber_count

number of virtual “subscribers” to this topic. Executes on_subscribers_changed when it is set – with args

  • topic = this topic

  • change = (old subscriber count, new subscriber count)

async add_subscriber()

Increase subscriber count and wait for on_subscriber_change callback to finish

async remove_subscriber()

Decrease subscriber count and wait for on_subscriber_change callback to finish

property registered_callbacks: Container[Callable]

Use this property to check if a callback is registered in this topic

Example

>>> import itertools
>>> topic = Topic('foo', token_factory=itertools.count().__next__)
>>> print in topic.registered_callbacks
False
>>> token = topic.register_callback(print)
>>> print in topic.registered_callbacks
True
get_token(callback: Callable) Tuple[T_Token]

If the callback is registered in this topic, get the registration token[s]

Parameters:

callback – some callback that could have been registered previously

Returns:

tuple of all registration tokens for callable or None if callback was not registered

async remove_all_subscribers()

Set subscriber count to 0 and wait for on_subscriber_change callback to finish

register_callback(callback: Consumer[T_Buffer]) T_Token

Register a callback function as a callback on this topic.

This starts a task which waits for the buffer to be written to, and then executes the callback function on the value provided by the buffer accessor.

If the callback is not a coroutine function, it will be run in the default executor pool (see https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools)

This method returns a unique identifier (supplied by the token_factory) which can be used to deregister the callback at a later time. Registering a callback multiple times will make the callback run multiple times for each published value. One needs to keep the token around to later deregister (as you don’t get the same token again by calling register_callback() with the same callback).

See also

task_manager – how to change who is responsible for executing callback tasks

Parameters:

callback – some callable which acts on a TopicDataRecord

Returns:

a unique token (supplied by the token_factory) to deregister the callback later

async unregister_callback(token: T_Token, timeout=None) bool

Unregister a callback with the unique token from the registration. Also cancel the callback task.

Parameters:
Returns:

True if operation succeeded, False if no callback for the token is registered.

Raises:

TimeoutError – If timeout is set and the task does not complete (cancel) in time – default behaviour: block until task is canceled successfully

class ubii.framework.topics.BasicTopic(pattern, **kwargs)

Bases: Topic[TopicDataRecord, int]

A Topic with a token_factory simply producing ascending integers and a simple buffer for TopicDataRecord messages

class integer_token_factory

Bases: object

creates increasing integers

>>> factory = integer_token_factory()
>>> another_factory = integer_token_factory()
>>> factory()
0
>>> another_factory()
1
>>> factory()
2
...
property buffer: accessor[TopicDataRecord]

You can use the get() and set() methods to synchronize access to the internal ubii.proto.TopicDataRecord value.

See also

accessor – details about access to shared resource

on_subscribers_change: OnSubscribersChange | None

Special callback to execute when subscriber_count changes – defaults to None

pattern: str

Wildcard pattern or simple string defining the topic

task_nursery: util.TaskNursery

takes care of managing tasks for callbacks

token_factory: Callable[[], T_Token]

called to create unique tokens to unregister callbacks, see register_callback()

callback_tasks: Dict[T_Token, asyncio.Task]

Mapping \(Token \rightarrow Task\) to access created callback tasks

class ubii.framework.topics.TopicStore(base_factory: Callable[[T_Key], T_co])

Bases: MatchMappingMixin, DefaultHookMap[str, Topic_co], Generic[Topic_co]

A TopicStore acts like a defaultdict mapping of \(pattern \rightarrow Topic\), but allows for complex matching of keys using match_pattern() and match_name()

Example

>>> from ubii.framework.topics import TopicStore
>>> class Topic:
...     def __init__(self, pattern):
...             self.pattern = pattern
...
>>> store = TopicStore(default_factory=Topic)
>>> store.default_factory('topic/glob/pattern_one')
>>> store.default_factory('topic/glob/pattern_two')
>>> [topic.pattern for topic in store.match_pattern('topic*')]
['topic/glob/pattern_one', 'topic/glob/pattern_two'])
class on_create_register_callback(*callbacks)

Bases: object

This decorator can simply be applied once, if you later want to add additional callbacks just add them to the callbacks of the registered on_create_register_callback

class ubii.framework.topics.StreamSplitRoutine(*, stream: AsyncIterator[ubii.proto.TopicData], container: TopicStore[Topic[ubii.proto.TopicDataRecord, Any]], logger: logging.Logger | None = None)

Bases: CoroutineWrapper[Any, Any, None]

A StreamSplitRoutine splits TopicDataRecords from a TopicData stream to the buffers of topics from a TopicStore container

Warning

This Coroutine only works with Topics with TopicDataRecord buffers. For topics with different buffer type, use an adjusted StreamSplitRoutine.

__init__(*, stream: AsyncIterator[ubii.proto.TopicData], container: TopicStore[Topic[ubii.proto.TopicDataRecord, Any]], logger: logging.Logger | None = None)

Set the static attributes for the coroutine, the TopicData source and TopicDataRecord container

Parameters:
  • stream – source for data

  • container – sink for data

  • logger – optional logger for processed data

async make_record() AsyncIterator[TopicDataRecord]

Creates iterator for TopicDataRecord messages from the TopicData iterator passed as stream during initialization, by unpacking possible topic_data_record_list fields

Returns:

Iterator over individual records

async split_to_topics() None

Uses make_record() to create a stream of records, then iterates over them and

  • creates the topic inside the TopicStore passed as container during initialization if no topics matching the glob pattern of received TopicDataRecords are found

  • sets the topic’s buffer to the record

class ubii.framework.topics.OnSubscribeCallback(*args, **kwargs)

Bases: Protocol

async __call__(client_id: str, topic: Topic, as_regex: bool = ..., unsubscribe: bool = ...)

Callbacks for the :class:OnSubscribersChange` objects need to have this signature

Parameters:
  • client_id – id of client node

  • topic – topic that was (un)subscribed

  • as_regex – patterns need to be handled differently from simple strings

  • unsubscribe – is this a subscription or a subscription cancellation?

class ubii.framework.topics.OnSubscribersChange(client_id: str, as_regex: bool, callback: OnSubscribeCallback)

Bases: object

This callable is a proxy for its callback (see OnSubscribeCallback).

as_regex

static argument for eventual call to callback

Type:

bool

client_id

static argument for eventual call to callback

Type:

str

event

application code can wait for this event – is set whenever this is called

Type:

asyncio.Event

callback

actual callback

Type:

OnSubscribeCallback

__call__(topic: Topic, change: Tuple[int, int]) asyncio.Task | None

Called when subscriber counts change in a topic.

Executes the callback when the subscriber count increases to 1 or decreases to 0 – with args
  • client_id = client_id

  • topic = reference to self

  • as_regex = as_regex

  • unsubscribe = True if subscriber count decreased to 0, False if it was increased to 1

Parameters:
  • topic – typically the topic which owns this callable

  • change – old and new subscriber count

class ubii.framework.topics.MetaMuxRecord(mapping: Dict | proto.Message = None, *, ignore_unknown_fields: bool = False, **kwargs)

Bases: TopicDataRecord

TopicDataRecords need to get additional meta information when they are produced by a Muxer. This meta information is not part of the proto schema. This mechanism is similar to the JS way muxers ands demuxers are implemented, to keep the processing code comparable.

topic

Field of type STRING – inherited from TopicDataRecord

Type:

proto.fields.Field

timestamp

Field of type Timestamp – inherited from TopicDataRecord

Type:

proto.fields.Field

client_id

Field of type STRING – inherited from TopicDataRecord

Type:

proto.fields.Field

double

Field of type DOUBLEoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

bool

Field of type BOOLoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

string

Field of type STRINGoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

int32

Field of type INT32oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

float

Field of type FLOAToneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector2

Field of type Vector2oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector2_list

Field of type Vector2Listoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector3

Field of type Vector3oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector3_list

Field of type Vector3Listoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector4

Field of type Vector4oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

vector4_list

Field of type Vector4Listoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

quaternion

Field of type Quaterniononeof type – inherited from TopicDataRecord

Type:

proto.fields.Field

quaternion_list

Field of type Quaterniononeof type – inherited from TopicDataRecord

Type:

proto.fields.Field

matrix3x2

Field of type Matrix3x2oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

matrix4x4

Field of type Matrix4x4oneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

color

Field of type Coloroneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

touch_event

Field of type TouchEventoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

touch_event_list

Field of type TouchEventListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

key_event

Field of type KeyEventoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

mouse_event

Field of type MouseEventoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

myo_event

Field of type MyoEventoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

pose2D

Field of type Pose2Doneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

pose3D

Field of type Pose3Doneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

object2D

Field of type Object2Doneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

object3D

Field of type Object3Doneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

object2D_list

Field of type Object2DListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

object3D_list

Field of type Object3DListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

int32_list

Field of type Int32Listoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

float_list

Field of type FloatListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

double_list

Field of type DoubleListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

string_list

Field of type StringListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

bool_list

Field of type BoolListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

image2D

Field of type Image2Doneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

image2D_list

Field of type Image2DListoneof type – inherited from TopicDataRecord

Type:

proto.fields.Field

session

Field of type Sessiononeof type – inherited from TopicDataRecord

Type:

proto.fields.Field

__init__(mapping: Dict | proto.Message = None, *, ignore_unknown_fields: bool = False, **kwargs)
Parameters:
  • mapping – A dictionary or message to be used to determine the values for this message. If it is a dictionary, keys that are not part of the proto schema will be added to the metadata instead

  • ignore_unknown_fields – If True, do not raise errors for unknown fields. Only applied if mapping is a mapping type or there are keyword parameters.

  • **kwargs – Keys and values corresponding to the fields of the message. Any keyword arguments that are passed on initialisation, that are not part of the TopicDataRecord schema, will be simply inserted into the metadata mapping.

metadata(**kwargs) Optional[MutableMapping]

TopicDataRecord overwrites attribute access, so we can’t write any attributes that are not part of the protobuf specification.

This method allows saving metadata for a record in a specified way, without altering the proto schema.

Parameters:

**kwargs – If you specify keyword arguments, they will be written to the metadata mapping

Returns:

if metadata has been associated with the record, returns the metadata mapping, otherwise returns None

property type: str | None

Returns attribute name of the attribute that is set in the type oneof group for convenience

class ubii.framework.topics.TopicMuxer(mapping=None, **kwargs)

Bases: TopicMux

A TopicMuxer defines a special records attribute, to handle topic data records. The muxer will identify metadata for the records, and associate it with them.

id

Field of type STRING – inherited from TopicMux

Type:

proto.fields.Field

name

Field of type STRING – inherited from TopicMux

Type:

proto.fields.Field

data_type

Field of type STRING – inherited from TopicMux

Type:

proto.fields.Field

topic_selector

Field of type STRING – inherited from TopicMux

Type:

proto.fields.Field

identity_match_pattern

Field of type STRING – inherited from TopicMux

Type:

proto.fields.Field

property records: List[MetaMuxRecord]

Use this util.condition_property to read and write records handled by the muxer (which will convert them to MetaMuxRecord objects that carry additional meta information)

Example

The example uses an async interpreter

>>> import asyncio
>>> from ubii.framework.topics import TopicMuxer
>>> import ubii.proto
>>> muxer = TopicMuxer(
...     id="fake-id",
...     data_type='int32',
...     topic_selector='/topic/*',
...     identity_match_pattern='(?:/topic/([0-9a-z-]+))'
... )
>>> records = [ubii.proto.TopicDataRecord(topic=f"/topic/{num}", int32=num) for num in range(5)]
>>> records
[topic: "/topic/0"
int32: 0
, topic: "/topic/1"
int32: 1
, topic: "/topic/2"
int32: 2
, topic: "/topic/3"
int32: 3
, topic: "/topic/4"
int32: 4
]
>>> await muxer.records.set(records)
>>> muxed = await muxer.records.get(predicate=lambda _ : True)
>>> muxed[0].metadata()
{'identity': '0'}
id: str
name: str
data_type: str
topic_selector: str
identity_match_pattern: str
class ubii.framework.topics.TopicDemuxer(mapping=None, param_regex=DEFAULT_REGEX_OUTPUT_PARAM, **kwargs)

Bases: TopicDemux

A Demuxer converts MetaMuxRecord objects to regular records, by setting their attributes (currently only the topic) according to their associated metadata and it’s own attributes.

id

Field of type STRING – inherited from TopicDemux

Type:

proto.fields.Field

name

Field of type STRING – inherited from TopicDemux

Type:

proto.fields.Field

data_type

Field of type STRING – inherited from TopicDemux

Type:

proto.fields.Field

output_topic_format

Field of type STRING – inherited from TopicDemux

Type:

proto.fields.Field

id: str
name: str
data_type: str
output_topic_format: str
DEFAULT_REGEX_OUTPUT_PARAM = '{{#([0-9]+)}}'
convert_record_objects(records: List) TopicDataRecordList

The Demuxer converts the records to actual TopicDataRecords with the right topic

Parameters:

records – records with metadata, likely created as outputs of a processing module

Returns:

elements contain all records that where successfully converted

Example

The example uses an async interpreter

>>> import asyncio
>>> from ubii.framework.topics import TopicDemuxer
>>> import ubii.proto
>>> records = [{'int32': num, 'output_params': (str(num),)} for num in range(5)]
>>> demuxer = TopicDemuxer(
...     id='fake-id',
...     data_type='int32',
...     output_topic_format='/topic/{{#0}}'
... )
>>> demuxer.convert_record_objects(records)
elements {
  topic: "/topic/0"
  int32: 0
}
elements {
  topic: "/topic/1"
  int32: 1
}
elements {
  topic: "/topic/2"
  int32: 2
}
elements {
  topic: "/topic/3"
  int32: 3
}
elements {
  topic: "/topic/4"
  int32: 4
}