ubii.framework.processing module

ubii.framework.processing.__json_name_to_field_name__ = {'bool': 'bool', 'boolList': 'bool_list', 'clientId': 'client_id', 'color': 'color', 'double': 'double', 'doubleList': 'double_list', 'float': 'float', 'floatList': 'float_list', 'image2D': 'image2D', 'image2DList': 'image2D_list', 'int32': 'int32', 'int32List': 'int32_list', 'keyEvent': 'key_event', 'matrix3x2': 'matrix3x2', 'matrix4x4': 'matrix4x4', 'mouseEvent': 'mouse_event', 'myoEvent': 'myo_event', 'object2D': 'object2D', 'object2DList': 'object2D_list', 'object3D': 'object3D', 'object3DList': 'object3D_list', 'pose2D': 'pose2D', 'pose3D': 'pose3D', 'quaternion': 'quaternion', 'quaternionList': 'quaternion_list', 'session': 'session', 'string': 'string', 'stringList': 'string_list', 'timestamp': 'timestamp', 'topic': 'topic', 'touchEvent': 'touch_event', 'touchEventList': 'touch_event_list', 'vector2': 'vector2', 'vector2List': 'vector2_list', 'vector3': 'vector3', 'vector3List': 'vector3_list', 'vector4': 'vector4', 'vector4List': 'vector4_list'}

Used in ProcessingProtocol.helpers.fix_io_fmt() to compute the field name of the ubii.proto.TopicDataRecord.type one-of group from the message type given by a ubii.proto.ModuleIO.message_format field.

ubii.framework.processing.fix_io_fmt(message_format: str) str

Computes the field name of the type oneof corresponding to the type in a message_format field (of the form ubii.{proto_package}.{type} as defined in the .proto file, not the python package!)

Example

The inputs of a routine contain a ubii.proto.ModuleIO message with message_format ubii.dataStructure.Matrix4x4. The name of the corresponding field in a ubii.proto.TopicDataRecord is matrix4x4. This method performs this conversion.

Parameters:

message_format – format string for message type

Returns:

name of corresponding field in a ubii.proto.TopicDataRecord message

class ubii.framework.processing.perf_calc_callable(*args, **kwargs)

Bases: Protocol

__call__(scheduler: Scheduler) float

This should evalutate the performance of a scheduler instance somehow.

Parameters:

scheduler – scheduler instance

Returns:

a value indicating the performance

ubii.framework.processing.perf_calc(attr: str, scheduler: Scheduler) float
Parameters:
  • scheduler – scheduler instance

  • attr – name of attribute of the scheduler which contains the data for the performance evaluation.

Returns:

performace of scheduler, calculated by computing relative error of data to delay i.e. for an average of past execution times \(\overline{t} = avg(\text{scheduler.exec_delta_times})\) return \(1\) if \(\overline{t} < \text{scheduler.delay}\) otherwise calculate the relative deviation \(\Delta t = \frac{\overline{t} - \text{scheduler.delay}}{\text{scheduler.delay}}\) and return \(1 - \Delta t\) (negative values if \(\Delta t > 1\) allowed)

ubii.framework.processing.check_datatype(io: ModuleIO, record: TopicDataRecord)

Check if ModuleIO datatype matches record ‘type’ oneof name

Parameters:
  • io – a IO definition

  • record – a record

Returns:

True if the right field is set, false otherwise

See also

fix_io_fmt() – helper to translate message specification to oneof field name

class ubii.framework.processing.Scheduler(callback: Callable[[], None], inputs: Iterable[Callable[[], Awaitable[T]]], mode: ProcessingMode, *, schedule_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'schedule_delta_times'), exec_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'exec_delta_times'))

Bases: CoroutineWrapper

A Scheduler is a wrapper around the coroutine created by its get_trigger_loop() method. A scheduler – depending on the used mode – waits for a certain delay and / or until any or all of its inputs are available and then schedules its callback.

DELTA_TIME_CACHE_SIZE = 30

number of cached delta times

__init__(callback: Callable[[], None], inputs: Iterable[Callable[[], Awaitable[T]]], mode: ProcessingMode, *, schedule_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'schedule_delta_times'), exec_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'exec_delta_times'))

The mode and inputs determine when the callback is executed.

Parameters:
  • callback – Schedule execution of this callable when conditions are right

  • inputs – depending on the ProcessingMode inputs are part of the condition – or simply retrieved – for execution of the callback. For each callable passed in inputs the returned Awaitable will either be in done or pending when the callback is scheduled

  • mode – used ProcessingMode

  • schedule_perf_metric – callable to evaluate scheduling overhead / performance, schould take one argument, the scheduler instance

  • exec_perf_metric – callable to evaluate execution callback overhead / performance

See also

ubii.proto.ProcessingMode.mode – details on processing modes

schedule_perf_metric: Callable[[], float]

Call this method to calculate the performance of the scheduling timings

exec_perf_metric: Callable[[], float]

Call this method to calculate the performance of the execution

inputs: Iterable[Callable[[], Awaitable[T]]]

callables to create awaitables for possibly needed inputs

schedule_delta_times: Deque

keep track of times between callback schedules for performance evaluation

exec_delta_times: Deque

keep track of execution times for callback for performance evaluation

done: List[Awaitable]

contains awaitables created from inputs argument that finished when callback needs to be scheduled

mode: ProcessingMode

used mode, determines which conditions need to be matched to schedule the callback

callback: Callable[[], None]

callback to be scheduled

executor: ThreadPoolExecutor

callbacks are possibly non-async callables, will be scheduled in this executor using asyncio.loop.run_in_executor()

task_clean_frequency

After every n callback schedules, cancel old input awaitables

property timing_thresholds: Tuple[float, ...]

These are the thresholds and minimum times for the asyncio sleep call. They depend on your system and event loop implementation. If they are set, the scheduler will adjust it’s scheduling behaviour in frequency mode accordingly. If they are not set, no adjustments are made.

Example

On linux with the default event loop you can set

scheduler.timing_epsilons = (0.001, 0.00015)

to adjust for the behaviour of the epoll_await system call

property timing_adjustment

This read only property caches the appropriate adjustment for the timing code according to the set timing_thresholds and the processing mode (in all modes but frequency mode, the adjustment is 0)

property delay: float

Delay in seconds to schedule callback, depending on mode

Raises:

NotImplementedError – when the mode is lockstep

halt() None

Call this method to stop the internal scheduler loop after the next iteration, which will finish the wrapped coroutine

class ubii.framework.processing.ProcessingRoutine(mapping=None, eval_strings=False, **kwargs)

Bases: ProcessingModule

This adds a ProcessingProtocol providing processing behaviour to a ProcessingModule representation. Refer to the documentation of the ProcessingProtocol for information related to processing behaviour.

id

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

name

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

authors

RepeatedField of type STRING – inherited from ProcessingModule

Type:

proto.fields.RepeatedField

tags

RepeatedField of type STRING – inherited from ProcessingModule

Type:

proto.fields.RepeatedField

description

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

node_id

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

session_id

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

status

Field of type Status – inherited from ProcessingModule

Type:

proto.fields.Field

processing_mode

Field of type ProcessingMode – inherited from ProcessingModule

Type:

proto.fields.Field

inputs

RepeatedField of type ModuleIO – inherited from ProcessingModule

Type:

proto.fields.RepeatedField

outputs

RepeatedField of type ModuleIO – inherited from ProcessingModule

Type:

proto.fields.RepeatedField

language

Field of type Language – inherited from ProcessingModule

Type:

proto.fields.Field

on_processing_stringified

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

on_created_stringified

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

on_halted_stringified

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

on_destroyed_stringified

Field of type STRING – inherited from ProcessingModule

Type:

proto.fields.Field

class rules

Bases: object

Rules to validate the protobuf message

static validate_language(pm: ProcessingRoutine)

Only python modules can be run by the python client node. Sets language to PY if language is not set.

Parameters:

pm – needs validation

Raises:

ValueError – if language is already set, but not to PY

static validate_id(pm: ProcessingRoutine)

Check if id is present

Parameters:

pm – needs validation

Raises:

ValueError – if id is not set

__init__(mapping=None, eval_strings=False, **kwargs)
Parameters:
  • mapping (Union[dict, Message]) – A dictionary or message to be used to determine the values for this message.

  • eval_strings (bool) – If this flag is set or the framework is used in debug() mode, fields of the protobuf message that have names ending with _stringified (e.g. on_processing_stringified) will be evaluated. Creating a ProcessingRoutine whose language is not PY is not possible, see validate()

  • kwargs (dict) – Keys and values corresponding to the fields of the message and other arguments passed to ubii.proto.ProcessingModule

property protocol

Reference to the ProcessingProtocol managing this routine

property local_output_topics: TopicStore

Container to interact with the output topics of the module (e.g. register callbacks on the topics).

property change_specs: Condition

Coordinate access to the protobuf specs of this module. Whenever part of the wrapped protobuf message changes, this should be done while acquiring this condition.

When the status or input / output mappings change as part of running this protocol, coroutines waiting for this condition are notified automatically.

Example:

def change_output_mapping(pm: ProcessingRoutine, io: ubii.proto.ModuleIO):
    async with pm.change_specs:
        pm.outputs = [io]
        pm.change_specs.notify_all()
input_getter(io: ubii.proto.ModuleIO) AsyncGetter[ubii.proto.TopicDataRecord | ubii.proto.TopicDataRecordList] | None
output_setter(io: ubii.proto.ModuleIO) AsyncSetter | None
async apply_io_mapping(io_mapping: IOMapping, remote_topic_map: Mapping[str, Topic])

Extract relevant information from a ubii.proto.IOMapping message to initialize the get_output_topic and get_input_topic callables.

Notifies awaitables waiting on change_specs unless the mapping was empty.

Parameters:
on_created(context: SimpleNamespace) None

Will be executed by the ProcessingProtocol whenever ProcessingProtocol.on_created() is called

Parameters:

context – Same context that is used by ProcessingProtocol.on_created()

This callable had the hook decorator applied. Original signature: def on_created(self, context: 'types.SimpleNamespace') -> 'None'

on_init(context: SimpleNamespace) None

Will be executed by the ProcessingProtocol whenever ProcessingProtocol.on_init() is called

Parameters:

context – Same context that is used by ProcessingProtocol.on_init()

This callable had the hook decorator applied. Original signature: def on_init(self, context: 'types.SimpleNamespace') -> 'None'

on_processing(context: SimpleNamespace) None

Will be executed by the ProcessingProtocol whenever ProcessingProtocol.on_processing() is called

Parameters:

context – Same context that is used by ProcessingProtocol.on_processing()

This callable had the hook decorator applied. Original signature: def on_processing(self, context: 'types.SimpleNamespace') -> 'None'

on_halted(context: SimpleNamespace) None

Will be executed by the ProcessingProtocol whenever ProcessingProtocol.on_halted() is called

Parameters:

context – Same context that is used by ProcessingProtocol.on_halted()

This callable had the hook decorator applied. Original signature: def on_halted(self, context: 'types.SimpleNamespace') -> 'None'

on_destroyed(context: SimpleNamespace) None

Will be executed by the ProcessingProtocol whenever ProcessingProtocol.on_destroyed() is called

Parameters:

context – Same context that is used by ProcessingProtocol.on_destroyed()

This callable had the hook decorator applied. Original signature: def on_destroyed(self, context: 'types.SimpleNamespace') -> 'None'

validate()

Run all rules in validation_rules

classmethod mutate_pm()

Change specifications of a processing routine, but notify waiters for the modules change_specs condition.

Parameters:

This callable had the hook decorator applied. Original signature: async def mutate_pm(cls, module: 'ProcessingRoutine', specs: 'ubii.proto.ProcessingModule') -> 'None'

classmethod start()

Start the internal ProcessingProtocol of the passed routine.

Parameters:

pm – instance that needs to be started

Returns:

routine passed as pm

This callable had the hook decorator applied. Original signature: async def start(cls, pm: 'ProcessingRoutine') -> 'ProcessingRoutine'

classmethod stop()

Stop the internal ProcessingProtocol of the passed routine and remove the routine from the registry.

Parameters:

pm – instance that needs to be stopped

Returns:

routine passed as argument

This callable had the hook decorator applied. Original signature: async def stop(cls, pm: 'ProcessingRoutine')

classmethod halt()

Halt the internal ProcessingProtocol of the passed routine. If th protocol is already in end state, do nothing but raise a warning.

Parameters:

pm – instance that needs to be halted

Returns:

routine passed as argument

This callable had the hook decorator applied. Original signature: async def halt(cls, pm: 'ProcessingRoutine')

validation_rules: List[Callable[[ProcessingRoutine], None]] = [<function ProcessingRoutine.rules.validate_language>, <function ProcessingRoutine.rules.validate_id>]

Callables to validate the protobuf message used in validate()

See also

ProcessingRoutine.rules – details on rules used here

class ubii.framework.processing.PM_STAT(value)

Bases: IntFlag

Proxy for Status but as enum.IntFlag, to allow defining combinations of states.

INITIALIZED = 1
CREATED = 2
PROCESSING = 4
HALTED = 8
DESTROYED = 16
class ubii.framework.processing.ProcessingProtocol(pm: ProcessingRoutine)

Bases: AbstractProtocol[PM_STAT]

This AbstractProtocol implementation defines the Protocol used to run ProcessingRoutines.

It defines valid state_changes and callbacks, as well as the starting_state and end_state

The pm_proxy methods are used to decorate the lifecycle callbacks on_created(), on_init(), on_processing(), on_halted(), on_destroyed(), to execute the proxied methods in the ProcessingRoutine which owns the ProcessingProtocol and set the ProcessingRoutine.status

task_nursery

inherited from ubii.framework.protocol.AbstractProtocol

state

inherited from ubii.framework.protocol.AbstractProtocol

starting_state: T_EnumFlag = 1

Before the protocol has the starting state, it’s state is None

end_state: T_EnumFlag = 16

If the protocol ends up in its end state, the coroutine running the protocol will finish

AnyState = 31

This is a combination of all possible states to allow easier transitions in state_changes

class pm_proxy

Bases: object

Define some decorators for callables like on_processing(), because all lifecycle callbacks need to set the status of the corresponding processing routine, and call the corresponding callbacks (e.g. if a ProcessingProtocol.on_processing() callback is called, it needs to call the ProcessingRoutine.on_processing() callback of the managed processing routine)

classmethod set_status_in_pm(status: Status) Callable

Callable decorated with returned decorator sets the status of the ProcessingRoutine instance to the passed status when called, and notifies awaitables waiting for change_specs

Parameters:

status – which status will be set

Returns:

Decorator

classmethod callback_in_pm(name: str) Callable

Callable decorated with returned decorator calls the method with specified name of the ProcessingRoutine instance. arguments of decorated callable are passed on.

Parameters:

name – e.g. 'on_destroyed'

Returns:

Decorator

class helpers

Bases: object

Used for pre- / post-processing for the context data passed between lifecycle callbacks

classmethod write_scheduler_data_to_context(scheduler: Scheduler, ctx: SimpleNamespace) None

Before processing, the processing context needs to be enriched with the topic data that the Scheduler retrieved as inputs

Parameters:
  • scheduler – The Scheduler scheduling the callback

  • ctx – The context that will be passed to the callback

classmethod publish_outputs_to_topics(pm: ProcessingRoutine, ctx: SimpleNamespace)

Looks up topics via get_output_topic(), starts a async task to set the buffer value of the topic to the computed value from the context outputs, which triggers publishing if the topics has been set up correctly, see e.g. ubii.node.node_protocol.implement_processing().

Parameters:
  • pm – look up output topics in this routines local_output_topics()

  • ctx – extract computed outputs from this context

__init__(pm: ProcessingRoutine)

The created ProcessingProtocol implements the behaviour of a ProcessingModule or rather a ProcessingRoutine during its lifetime. Each ProcessingProtocol belongs to one ProcessingRoutine

Parameters:

pm – the ubii.proto.ProcessingModule wrapper that owns this protocol

pm: ProcessingRoutine

reference to the ProcessingModule wrapper that owns this protocol

created_tasks: List[asyncio.Task]

All tasks created by this protocol, to be stopped / cancelled when necessary

async on_init(context: SimpleNamespace)

First lifecycle method called after pm has been initialized

Calls pm.on_init() and sets the status to ub.ProcessingModule.Status.INITIALIZED.

Creates the following attributes in the context:

  • context.loop the asyncio loop running the processing

  • context.nursery the entity responsible for starting async tasks (either the context.loop or a reference to task_nursery

  • context.trigger_processing asyncio Event used later, see on_created()

  • context.scheduler a Scheduler instance, responsible for scheduling processing calls. Scheduled callback when inputs are available will set context.trigger_processing

  • context.muxer a reference to the TopicMuxer type, needed to extract information from the records

Finally, sets the status to PM_STAT.CREATED

Parameters:

context – namespace object to hold data

async on_created(context)

Calls pm.on_created() and sets the status to ub.ProcessingModule.Status.CREATED.

Prepares the context for further processing: - creates inputs (context.inputs) - Prepare output mapping (context.outputs)

Waits for context.processing_trigger then extracts inputs from context.scheduler (using helpers.write_scheduler_data_to_context()) and changes the status to PM_STAT.PROCESSING

async on_processing(context)

Calls pm.on_processing() and sets the status to ubii.proto.ProcessingModule.Status.PROCESSING.

Does not change the status of the protocol, state change has to be triggered externally in this state.

async on_halted(context)

Calls pm.on_halted() and sets the status to ub.ProcessingModule.Status.HALTED.

Halts the context.scheduler

Does not change the status of the protocol, state change has to be triggered externally in this state.

async on_destroyed(context)

Calls pm.on_destroyed() and sets the status to ub.ProcessingModule.Status.DESTROYED.

Awaits cancellation of all tasks that have been started by this protocol.

state_changes: Mapping[Tuple[T_EnumFlag | None, ...], Callback] = {(None, <PM_STAT.INITIALIZED: 1>): <function ProcessingProtocol.on_init>, (<PM_STAT.INITIALIZED: 1>, <PM_STAT.CREATED: 2>): <function ProcessingProtocol.on_created>, (<PM_STAT.CREATED: 2>, <PM_STAT.PROCESSING: 4>): <function ProcessingProtocol.on_processing>, (<PM_STAT.DESTROYED|HALTED|PROCESSING|CREATED|INITIALIZED: 31>, <PM_STAT.HALTED: 8>): <function ProcessingProtocol.on_halted>, (<PM_STAT.DESTROYED|HALTED|PROCESSING|CREATED|INITIALIZED: 31>, <PM_STAT.DESTROYED: 16>): <function ProcessingProtocol.on_destroyed>}

Possible state changes and respective callbacks