ubii.framework.topics module¶
This module provides some classes to implement the handling of the ubii.proto.TopicData messages
which are send by the master node when the client subscribes to a topic via a
ubii.framework.services.ServiceCall.
The client node will need a DataConnection to the master node, e.g. a websocket connection. Some example
connections are implemented in ubii.framework.connections. At which point during its lifetime the client is
able to establish this DataConnection is described in greater detail in the ubii.node.protocol module.
After the DataConnection is established, messages can be read using the async interator API and send
using the DataConnection.send() method.
Note
The JS API currently defines methods to directly add a callback for a topic pattern – to do the same in the Python API
you first get a reference to the specific Topic using the pattern, then add the callback with the topics
register_callback() method.
import logging
from ubii.node import connect_client
from ubii.framework.client import Subscriptions
log = logging.getLogger('MyLogger')
async def main():
async with connect_client() as client:
topic, = await client[Subscriptions].subscribe_topic('foo/bar') # get a topic reference
topic.register_callback(log.info) # add callback
See also
TopicStore – details how topics are retrieved by pattern
- class ubii.framework.topics.Consumer(*args, **kwargs)¶
- class ubii.framework.topics.DataConnection¶
Bases:
AsyncIterator[TopicData]A DataConnection can be used to asynchronously iterate over received TopicData messages, and to send TopicData messages to the master node.
- class ubii.framework.topics.TopicCoroutine(*, buffer: AsyncIterator[T_Buffer], callback: Consumer[T_Buffer])¶
Bases:
CoroutineWrapper[Any,Any,None],Generic[T_Buffer]A topic coroutine waits until a value is written to the topic and then runs its callback.
- __init__(*, buffer: AsyncIterator[T_Buffer], callback: Consumer[T_Buffer])¶
- Parameters:
buffer – Iterator producing buffer value
callback – a callable consuming the buffer values
- class ubii.framework.topics.TopicDataBufferManager¶
-
Simple
ABCto make sure the inheriting class has a buffer field
- class ubii.framework.topics.Topic(pattern, *, token_factory: Callable[[], T_Token], task_nursery: util.TaskNursery | None = None, **kwargs)¶
Bases:
AsyncIterator[T_Buffer],TopicDataBufferManager[T_Buffer],Generic[T_Buffer,T_Token],ABCA
Topiccan be used to asynchronously iterate overTopicDataRecordswhich are published to the topic. It can also register (and unregister) callbacks to handle the published values in a background task.To “publish” a value to the topic locally use
await topic.buffer.set(...).The
Client Protocolmakes sure that allTopicDataRecordmessages received via the clientsDataConnection(created in itsContext) get forwarded to matching topics in this way. One doesn’t need to manually “publish” data except when e.g. mocking a connection.To publish TopicData to the master node instead use the
Publishbehaviour of theUbiiClient.Warning
The master node does not allow the clients to subscribe to the same topic multiple times. The application code on the other hand might want to “subscribe” to the topic, without caring about other subscriptions in other parts of the application. Implementing this kind of “subscription manager” can be easily done with the
subscriber_countproperty as well as theadd_subscriber(),remove_subscriber()andremove_all_subscribers()methods, which trigger theon_subscribers_changecallback. E.g. set the actual service call ason_subscribers_changecallback of the topic and simply count calls to ‘subscribe’ and ‘unsubscribe’ from the topic by setting the virtualsubscriber_count.See also
implement_subscriptions– example implementation ofSubscriptionsbehaviour using this module- buffer¶
inherited from
TopicDataBufferManager
- __init__(pattern, *, token_factory: Callable[[], T_Token], task_nursery: util.TaskNursery | None = None, **kwargs) None¶
- Parameters:
pattern – initializes
patterntoken_factory – initializes
token_factorytask_nursery – initializes
task_nursery**kwargs – passed to super class init methods
- on_subscribers_change: OnSubscribersChange | None¶
Special callback to execute when
subscriber_countchanges – defaults toNone
- task_nursery: util.TaskNursery¶
takes care of managing tasks for callbacks
- token_factory: Callable[[], T_Token]¶
called to create unique tokens to unregister callbacks, see
register_callback()
- callback_tasks: Dict[T_Token, asyncio.Task]¶
Mapping \(Token \rightarrow Task\) to access created callback tasks
- property subscriber_count¶
number of virtual “subscribers” to this topic. Executes
on_subscribers_changedwhen it is set – with argstopic= this topicchange= (old subscriber count, new subscriber count)
- async add_subscriber()¶
Increase subscriber count and wait for
on_subscriber_changecallback to finish
- async remove_subscriber()¶
Decrease subscriber count and wait for
on_subscriber_changecallback to finish
- property registered_callbacks: Container[Callable]¶
Use this property to check if a callback is registered in this topic
Example
>>> import itertools >>> topic = Topic('foo', token_factory=itertools.count().__next__) >>> print in topic.registered_callbacks False >>> token = topic.register_callback(print) >>> print in topic.registered_callbacks True
- get_token(callback: Callable) Tuple[T_Token]¶
If the callback is registered in this topic, get the registration token[s]
- Parameters:
callback – some callback that could have been registered previously
- Returns:
tuple of all registration tokens for callable or None if callback was not registered
- async remove_all_subscribers()¶
Set subscriber count to
0and wait foron_subscriber_changecallback to finish
- register_callback(callback: Consumer[T_Buffer]) T_Token¶
Register a callback function as a callback on this topic.
This starts a task which waits for the
bufferto be written to, and then executes the callback function on the value provided by thebufferaccessor.If the callback is not a coroutine function, it will be run in the default executor pool (see https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools)
This method returns a unique identifier (supplied by the
token_factory) which can be used to deregister the callback at a later time. Registering a callback multiple times will make the callback run multiple times for each published value. One needs to keep the token around to later deregister (as you don’t get the same token again by callingregister_callback()with the same callback).See also
task_manager– how to change who is responsible for executing callback tasks- Parameters:
callback – some callable which acts on a TopicDataRecord
- Returns:
a unique token (supplied by the
token_factory) to deregister the callback later
- async unregister_callback(token: T_Token, timeout=None) bool¶
Unregister a callback with the unique token from the registration. Also cancel the callback task.
- Parameters:
timeout – don’t wait indefinitely for task to cancel
token – from
register_callback()
- Returns:
True if operation succeeded, False if no callback for the token is registered.
- Raises:
TimeoutError – If
timeoutis set and the task does not complete (cancel) in time – default behaviour: block until task is canceled successfully
- class ubii.framework.topics.BasicTopic(pattern, **kwargs)¶
Bases:
Topic[TopicDataRecord,int]A
Topicwith atoken_factorysimply producing ascending integers and a simplebufferforTopicDataRecordmessages- class integer_token_factory¶
Bases:
objectcreates increasing integers
>>> factory = integer_token_factory() >>> another_factory = integer_token_factory() >>> factory() 0 >>> another_factory() 1 >>> factory() 2 ...
- property buffer: accessor[TopicDataRecord]¶
You can use the
get()andset()methods to synchronize access to the internalubii.proto.TopicDataRecordvalue.See also
accessor– details about access to shared resource
- on_subscribers_change: OnSubscribersChange | None¶
Special callback to execute when
subscriber_countchanges – defaults toNone
- task_nursery: util.TaskNursery¶
takes care of managing tasks for callbacks
- token_factory: Callable[[], T_Token]¶
called to create unique tokens to unregister callbacks, see
register_callback()
- callback_tasks: Dict[T_Token, asyncio.Task]¶
Mapping \(Token \rightarrow Task\) to access created callback tasks
- class ubii.framework.topics.TopicStore(base_factory: Callable[[T_Key], T_co])¶
Bases:
MatchMappingMixin,DefaultHookMap[str,Topic_co],Generic[Topic_co]A TopicStore acts like a
defaultdictmapping of \(pattern \rightarrow Topic\), but allows for complex matching of keys usingmatch_pattern()andmatch_name()Example
>>> from ubii.framework.topics import TopicStore >>> class Topic: ... def __init__(self, pattern): ... self.pattern = pattern ... >>> store = TopicStore(default_factory=Topic) >>> store.default_factory('topic/glob/pattern_one') >>> store.default_factory('topic/glob/pattern_two') >>> [topic.pattern for topic in store.match_pattern('topic*')] ['topic/glob/pattern_one', 'topic/glob/pattern_two'])
- class ubii.framework.topics.StreamSplitRoutine(*, stream: AsyncIterator[ubii.proto.TopicData], container: TopicStore[Topic[ubii.proto.TopicDataRecord, Any]], logger: logging.Logger | None = None)¶
Bases:
CoroutineWrapper[Any,Any,None]A StreamSplitRoutine splits
TopicDataRecordsfrom aTopicDatastream to the buffers of topics from aTopicStorecontainerWarning
This Coroutine only works with Topics with
TopicDataRecordbuffers. For topics with different buffer type, use an adjustedStreamSplitRoutine.- __init__(*, stream: AsyncIterator[ubii.proto.TopicData], container: TopicStore[Topic[ubii.proto.TopicDataRecord, Any]], logger: logging.Logger | None = None)¶
Set the static attributes for the coroutine, the
TopicDatasource andTopicDataRecordcontainer- Parameters:
stream – source for data
container – sink for data
logger – optional logger for processed data
- async make_record() AsyncIterator[TopicDataRecord]¶
Creates iterator for
TopicDataRecordmessages from theTopicDataiterator passed asstreamduring initialization, by unpacking possibletopic_data_record_listfields- Returns:
Iterator over individual records
- async split_to_topics() None¶
Uses
make_record()to create a stream of records, then iterates over them andcreates the topic inside the
TopicStorepassed ascontainerduring initialization if no topics matching the glob pattern of receivedTopicDataRecordsare foundsets the topic’s
bufferto the record
- class ubii.framework.topics.OnSubscribeCallback(*args, **kwargs)¶
Bases:
Protocol- async __call__(client_id: str, topic: Topic, as_regex: bool = ..., unsubscribe: bool = ...)¶
Callbacks for the :class:OnSubscribersChange` objects need to have this signature
- Parameters:
client_id – id of client node
topic – topic that was (un)subscribed
as_regex – patterns need to be handled differently from simple strings
unsubscribe – is this a subscription or a subscription cancellation?
- class ubii.framework.topics.OnSubscribersChange(client_id: str, as_regex: bool, callback: OnSubscribeCallback)¶
Bases:
objectThis callable is a proxy for its
callback(seeOnSubscribeCallback).- event¶
application code can wait for this event – is set whenever this is called
- Type:
- callback¶
actual callback
- Type:
- __call__(topic: Topic, change: Tuple[int, int]) asyncio.Task | None¶
Called when subscriber counts change in a topic.
- Executes the
callbackwhen the subscriber count increases to 1 or decreases to 0 – with args
- Parameters:
topic – typically the topic which owns this callable
change – old and new subscriber count
- Executes the
- class ubii.framework.topics.MetaMuxRecord(mapping: Dict | proto.Message = None, *, ignore_unknown_fields: bool = False, **kwargs)¶
Bases:
TopicDataRecordTopicDataRecords need to get additional meta information when they are produced by a Muxer. This meta information is not part of the proto schema. This mechanism is similar to the JS way muxers ands demuxers are implemented, to keep the processing code comparable.
- topic¶
Fieldof typeSTRING– inherited fromTopicDataRecord- Type:
- timestamp¶
Fieldof typeTimestamp– inherited fromTopicDataRecord- Type:
- client_id¶
Fieldof typeSTRING– inherited fromTopicDataRecord- Type:
- double¶
Fieldof typeDOUBLE– oneoftype– inherited fromTopicDataRecord- Type:
- bool¶
Fieldof typeBOOL– oneoftype– inherited fromTopicDataRecord- Type:
- string¶
Fieldof typeSTRING– oneoftype– inherited fromTopicDataRecord- Type:
- int32¶
Fieldof typeINT32– oneoftype– inherited fromTopicDataRecord- Type:
- float¶
Fieldof typeFLOAT– oneoftype– inherited fromTopicDataRecord- Type:
- vector2¶
Fieldof typeVector2– oneoftype– inherited fromTopicDataRecord- Type:
- vector2_list¶
Fieldof typeVector2List– oneoftype– inherited fromTopicDataRecord- Type:
- vector3¶
Fieldof typeVector3– oneoftype– inherited fromTopicDataRecord- Type:
- vector3_list¶
Fieldof typeVector3List– oneoftype– inherited fromTopicDataRecord- Type:
- vector4¶
Fieldof typeVector4– oneoftype– inherited fromTopicDataRecord- Type:
- vector4_list¶
Fieldof typeVector4List– oneoftype– inherited fromTopicDataRecord- Type:
- quaternion¶
Fieldof typeQuaternion– oneoftype– inherited fromTopicDataRecord- Type:
- quaternion_list¶
Fieldof typeQuaternion– oneoftype– inherited fromTopicDataRecord- Type:
- matrix3x2¶
Fieldof typeMatrix3x2– oneoftype– inherited fromTopicDataRecord- Type:
- matrix4x4¶
Fieldof typeMatrix4x4– oneoftype– inherited fromTopicDataRecord- Type:
- color¶
Fieldof typeColor– oneoftype– inherited fromTopicDataRecord- Type:
- touch_event¶
Fieldof typeTouchEvent– oneoftype– inherited fromTopicDataRecord- Type:
- touch_event_list¶
Fieldof typeTouchEventList– oneoftype– inherited fromTopicDataRecord- Type:
- key_event¶
Fieldof typeKeyEvent– oneoftype– inherited fromTopicDataRecord- Type:
- mouse_event¶
Fieldof typeMouseEvent– oneoftype– inherited fromTopicDataRecord- Type:
- myo_event¶
Fieldof typeMyoEvent– oneoftype– inherited fromTopicDataRecord- Type:
- pose2D¶
Fieldof typePose2D– oneoftype– inherited fromTopicDataRecord- Type:
- pose3D¶
Fieldof typePose3D– oneoftype– inherited fromTopicDataRecord- Type:
- object2D¶
Fieldof typeObject2D– oneoftype– inherited fromTopicDataRecord- Type:
- object3D¶
Fieldof typeObject3D– oneoftype– inherited fromTopicDataRecord- Type:
- object2D_list¶
Fieldof typeObject2DList– oneoftype– inherited fromTopicDataRecord- Type:
- object3D_list¶
Fieldof typeObject3DList– oneoftype– inherited fromTopicDataRecord- Type:
- int32_list¶
Fieldof typeInt32List– oneoftype– inherited fromTopicDataRecord- Type:
- float_list¶
Fieldof typeFloatList– oneoftype– inherited fromTopicDataRecord- Type:
- double_list¶
Fieldof typeDoubleList– oneoftype– inherited fromTopicDataRecord- Type:
- string_list¶
Fieldof typeStringList– oneoftype– inherited fromTopicDataRecord- Type:
- bool_list¶
Fieldof typeBoolList– oneoftype– inherited fromTopicDataRecord- Type:
- image2D¶
Fieldof typeImage2D– oneoftype– inherited fromTopicDataRecord- Type:
- image2D_list¶
Fieldof typeImage2DList– oneoftype– inherited fromTopicDataRecord- Type:
- session¶
Fieldof typeSession– oneoftype– inherited fromTopicDataRecord- Type:
- __init__(mapping: Dict | proto.Message = None, *, ignore_unknown_fields: bool = False, **kwargs)¶
- Parameters:
mapping – A dictionary or message to be used to determine the values for this message. If it is a dictionary, keys that are not part of the proto schema will be added to the metadata instead
ignore_unknown_fields – If True, do not raise errors for unknown fields. Only applied if mapping is a mapping type or there are keyword parameters.
**kwargs – Keys and values corresponding to the fields of the message. Any keyword arguments that are passed on initialisation, that are not part of the
TopicDataRecordschema, will be simply inserted into the metadata mapping.
- metadata(**kwargs) Optional[MutableMapping]¶
TopicDataRecordoverwrites attribute access, so we can’t write any attributes that are not part of the protobuf specification.This method allows saving metadata for a record in a specified way, without altering the proto schema.
- Parameters:
**kwargs – If you specify keyword arguments, they will be written to the metadata mapping
- Returns:
if metadata has been associated with the record, returns the metadata mapping, otherwise returns None
- class ubii.framework.topics.TopicMuxer(mapping=None, **kwargs)¶
Bases:
TopicMuxA
TopicMuxerdefines a specialrecordsattribute, to handle topic data records. The muxer will identify metadata for the records, and associate it with them.- property records: List[MetaMuxRecord]¶
Use this
util.condition_propertyto read and write records handled by the muxer (which will convert them toMetaMuxRecordobjects that carry additional meta information)Example
The example uses an async interpreter
>>> import asyncio >>> from ubii.framework.topics import TopicMuxer >>> import ubii.proto >>> muxer = TopicMuxer( ... id="fake-id", ... data_type='int32', ... topic_selector='/topic/*', ... identity_match_pattern='(?:/topic/([0-9a-z-]+))' ... ) >>> records = [ubii.proto.TopicDataRecord(topic=f"/topic/{num}", int32=num) for num in range(5)] >>> records [topic: "/topic/0" int32: 0 , topic: "/topic/1" int32: 1 , topic: "/topic/2" int32: 2 , topic: "/topic/3" int32: 3 , topic: "/topic/4" int32: 4 ] >>> await muxer.records.set(records) >>> muxed = await muxer.records.get(predicate=lambda _ : True) >>> muxed[0].metadata() {'identity': '0'}
- class ubii.framework.topics.TopicDemuxer(mapping=None, param_regex=DEFAULT_REGEX_OUTPUT_PARAM, **kwargs)¶
Bases:
TopicDemuxA Demuxer converts
MetaMuxRecordobjects to regular records, by setting their attributes (currently only the topic) according to their associated metadata and it’s own attributes.- id¶
Fieldof typeSTRING– inherited fromTopicDemux- Type:
- name¶
Fieldof typeSTRING– inherited fromTopicDemux- Type:
- data_type¶
Fieldof typeSTRING– inherited fromTopicDemux- Type:
- output_topic_format¶
Fieldof typeSTRING– inherited fromTopicDemux- Type:
- DEFAULT_REGEX_OUTPUT_PARAM = '{{#([0-9]+)}}'¶
- convert_record_objects(records: List) TopicDataRecordList¶
The Demuxer converts the records to actual TopicDataRecords with the right topic
- Parameters:
records – records with metadata, likely created as outputs of a processing module
- Returns:
elements contain all records that where successfully converted
Example
The example uses an async interpreter
>>> import asyncio >>> from ubii.framework.topics import TopicDemuxer >>> import ubii.proto >>> records = [{'int32': num, 'output_params': (str(num),)} for num in range(5)] >>> demuxer = TopicDemuxer( ... id='fake-id', ... data_type='int32', ... output_topic_format='/topic/{{#0}}' ... ) >>> demuxer.convert_record_objects(records) elements { topic: "/topic/0" int32: 0 } elements { topic: "/topic/1" int32: 1 } elements { topic: "/topic/2" int32: 2 } elements { topic: "/topic/3" int32: 3 } elements { topic: "/topic/4" int32: 4 }