ubii.framework.processing module¶
- ubii.framework.processing.__json_name_to_field_name__ = {'bool': 'bool', 'boolList': 'bool_list', 'clientId': 'client_id', 'color': 'color', 'double': 'double', 'doubleList': 'double_list', 'float': 'float', 'floatList': 'float_list', 'image2D': 'image2D', 'image2DList': 'image2D_list', 'int32': 'int32', 'int32List': 'int32_list', 'keyEvent': 'key_event', 'matrix3x2': 'matrix3x2', 'matrix4x4': 'matrix4x4', 'mouseEvent': 'mouse_event', 'myoEvent': 'myo_event', 'object2D': 'object2D', 'object2DList': 'object2D_list', 'object3D': 'object3D', 'object3DList': 'object3D_list', 'pose2D': 'pose2D', 'pose3D': 'pose3D', 'quaternion': 'quaternion', 'quaternionList': 'quaternion_list', 'session': 'session', 'string': 'string', 'stringList': 'string_list', 'timestamp': 'timestamp', 'topic': 'topic', 'touchEvent': 'touch_event', 'touchEventList': 'touch_event_list', 'vector2': 'vector2', 'vector2List': 'vector2_list', 'vector3': 'vector3', 'vector3List': 'vector3_list', 'vector4': 'vector4', 'vector4List': 'vector4_list'}¶
Used in
ProcessingProtocol.helpers.fix_io_fmt()to compute the field name of theubii.proto.TopicDataRecord.typeone-of group from the message type given by aubii.proto.ModuleIO.message_formatfield.
- ubii.framework.processing.fix_io_fmt(message_format: str) str¶
Computes the field name of the
typeoneof corresponding to the type in amessage_formatfield (of the formubii.{proto_package}.{type}as defined in the .proto file, not the python package!)Example
The
inputsof a routine contain aubii.proto.ModuleIOmessage withmessage_formatubii.dataStructure.Matrix4x4. The name of the corresponding field in aubii.proto.TopicDataRecordismatrix4x4. This method performs this conversion.- Parameters:
message_format – format string for message type
- Returns:
name of corresponding field in a
ubii.proto.TopicDataRecordmessage
- ubii.framework.processing.perf_calc(attr: str, scheduler: Scheduler) float¶
- Parameters:
scheduler – scheduler instance
attr – name of attribute of the scheduler which contains the data for the performance evaluation.
- Returns:
performace of scheduler, calculated by computing relative error of data to
delayi.e. for an average of past execution times \(\overline{t} = avg(\text{scheduler.exec_delta_times})\) return \(1\) if \(\overline{t} < \text{scheduler.delay}\) otherwise calculate the relative deviation \(\Delta t = \frac{\overline{t} - \text{scheduler.delay}}{\text{scheduler.delay}}\) and return \(1 - \Delta t\) (negative values if \(\Delta t > 1\) allowed)
- ubii.framework.processing.check_datatype(io: ModuleIO, record: TopicDataRecord)¶
Check if ModuleIO datatype matches record ‘type’ oneof name
- Parameters:
io – a IO definition
record – a record
- Returns:
True if the right field is set, false otherwise
See also
fix_io_fmt()– helper to translate message specification to oneof field name
- class ubii.framework.processing.Scheduler(callback: Callable[[], None], inputs: Iterable[Callable[[], Awaitable[T]]], mode: ProcessingMode, *, schedule_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'schedule_delta_times'), exec_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'exec_delta_times'))¶
Bases:
CoroutineWrapperA Scheduler is a wrapper around the coroutine created by its
get_trigger_loop()method. A scheduler – depending on the usedmode– waits for a certain delay and / or until any or all of its inputs are available and then schedules its callback.- DELTA_TIME_CACHE_SIZE = 30¶
number of cached delta times
- __init__(callback: Callable[[], None], inputs: Iterable[Callable[[], Awaitable[T]]], mode: ProcessingMode, *, schedule_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'schedule_delta_times'), exec_perf_metric: perf_calc_callable = functools.partial(perf_calc, 'exec_delta_times'))¶
The mode and inputs determine when the callback is executed.
- Parameters:
callback – Schedule execution of this callable when conditions are right
inputs – depending on the
ProcessingModeinputs are part of the condition – or simply retrieved – for execution of the callback. For each callable passed ininputsthe returnedAwaitablewill either be indoneorpendingwhen the callback is scheduledmode – used
ProcessingModeschedule_perf_metric – callable to evaluate scheduling overhead / performance, schould take one argument, the scheduler instance
exec_perf_metric – callable to evaluate execution callback overhead / performance
See also
ubii.proto.ProcessingMode.mode– details on processing modes
- schedule_perf_metric: Callable[[], float]¶
Call this method to calculate the performance of the scheduling timings
- exec_perf_metric: Callable[[], float]¶
Call this method to calculate the performance of the execution
- inputs: Iterable[Callable[[], Awaitable[T]]]¶
callables to create awaitables for possibly needed inputs
- schedule_delta_times: Deque¶
keep track of times between callback schedules for performance evaluation
- done: List[Awaitable]¶
contains awaitables created from
inputsargument that finished when callback needs to be scheduled
- mode: ProcessingMode¶
used mode, determines which conditions need to be matched to schedule the callback
- executor: ThreadPoolExecutor¶
callbacks are possibly non-async callables, will be scheduled in this executor using
asyncio.loop.run_in_executor()
- task_clean_frequency¶
After every n callback schedules, cancel old input awaitables
- property timing_thresholds: Tuple[float, ...]¶
These are the thresholds and minimum times for the asyncio sleep call. They depend on your system and event loop implementation. If they are set, the scheduler will adjust it’s scheduling behaviour in frequency mode accordingly. If they are not set, no adjustments are made.
Example
On linux with the default event loop you can set
scheduler.timing_epsilons = (0.001, 0.00015)
to adjust for the behaviour of the epoll_await system call
- property timing_adjustment¶
This read only property caches the appropriate adjustment for the timing code according to the set
timing_thresholdsand the processing mode (in all modes but frequency mode, the adjustment is 0)
- property delay: float¶
Delay in seconds to schedule callback, depending on
modefrequency – simply use
hertztrigger_on_input – use
min_delay_ms
- Raises:
NotImplementedError – when the mode is
lockstep
- class ubii.framework.processing.ProcessingRoutine(mapping=None, eval_strings=False, **kwargs)¶
Bases:
ProcessingModuleThis adds a
ProcessingProtocolproviding processing behaviour to aProcessingModulerepresentation. Refer to the documentation of theProcessingProtocolfor information related to processing behaviour.- id¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- name¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- authors¶
RepeatedFieldof typeSTRING– inherited fromProcessingModule
- tags¶
RepeatedFieldof typeSTRING– inherited fromProcessingModule
- description¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- node_id¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- session_id¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- status¶
Fieldof typeStatus– inherited fromProcessingModule- Type:
- processing_mode¶
Fieldof typeProcessingMode– inherited fromProcessingModule- Type:
- inputs¶
RepeatedFieldof typeModuleIO– inherited fromProcessingModule
- outputs¶
RepeatedFieldof typeModuleIO– inherited fromProcessingModule
- language¶
Fieldof typeLanguage– inherited fromProcessingModule- Type:
- on_processing_stringified¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- on_created_stringified¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- on_halted_stringified¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- on_destroyed_stringified¶
Fieldof typeSTRING– inherited fromProcessingModule- Type:
- class rules¶
Bases:
objectRules to validate the protobuf message
- static validate_language(pm: ProcessingRoutine)¶
Only python modules can be run by the python client node. Sets language to
PYif language is not set.- Parameters:
pm – needs validation
- Raises:
ValueError – if
languageis already set, but not toPY
- static validate_id(pm: ProcessingRoutine)¶
Check if
idis present- Parameters:
pm – needs validation
- Raises:
ValueError – if
idis not set
- __init__(mapping=None, eval_strings=False, **kwargs)¶
- Parameters:
mapping (Union[dict, Message]) – A dictionary or message to be used to determine the values for this message.
eval_strings (bool) – If this flag is set or the framework is used in
debug()mode, fields of the protobuf message that have names ending with_stringified(e.g.on_processing_stringified) will be evaluated. Creating aProcessingRoutinewhoselanguageis notPYis not possible, seevalidate()kwargs (dict) – Keys and values corresponding to the fields of the message and other arguments passed to
ubii.proto.ProcessingModule
- property protocol¶
Reference to the
ProcessingProtocolmanaging this routine
- property local_output_topics: TopicStore¶
Container to interact with the output topics of the module (e.g. register callbacks on the topics).
- property change_specs: Condition¶
Coordinate access to the protobuf specs of this module. Whenever part of the wrapped protobuf message changes, this should be done while acquiring this condition.
When the
statusor input / output mappings change as part ofrunningthis protocol, coroutines waiting for this condition are notified automatically.Example:
def change_output_mapping(pm: ProcessingRoutine, io: ubii.proto.ModuleIO): async with pm.change_specs: pm.outputs = [io] pm.change_specs.notify_all()
- input_getter(io: ubii.proto.ModuleIO) AsyncGetter[ubii.proto.TopicDataRecord | ubii.proto.TopicDataRecordList] | None¶
- output_setter(io: ubii.proto.ModuleIO) AsyncSetter | None¶
- async apply_io_mapping(io_mapping: IOMapping, remote_topic_map: Mapping[str, Topic])¶
Extract relevant information from a
ubii.proto.IOMappingmessage to initialize theget_output_topicandget_input_topiccallables.Notifies awaitables waiting on
change_specsunless the mapping was empty.- Parameters:
io_mapping – Mapping that should be applied to this module
remote_topic_map – While output topics will be managed by
local_output_topics, you need to provide a mapping for input topics to look up the topic source of theubii.proto.IOMapping.input_mappings
- on_created(context: SimpleNamespace) None¶
Will be executed by the
ProcessingProtocolwheneverProcessingProtocol.on_created()is called- Parameters:
context – Same context that is used by
ProcessingProtocol.on_created()
This callable had the
hookdecorator applied. Original signature:def on_created(self, context: 'types.SimpleNamespace') -> 'None'
- on_init(context: SimpleNamespace) None¶
Will be executed by the
ProcessingProtocolwheneverProcessingProtocol.on_init()is called- Parameters:
context – Same context that is used by
ProcessingProtocol.on_init()
This callable had the
hookdecorator applied. Original signature:def on_init(self, context: 'types.SimpleNamespace') -> 'None'
- on_processing(context: SimpleNamespace) None¶
Will be executed by the
ProcessingProtocolwheneverProcessingProtocol.on_processing()is called- Parameters:
context – Same context that is used by
ProcessingProtocol.on_processing()
This callable had the
hookdecorator applied. Original signature:def on_processing(self, context: 'types.SimpleNamespace') -> 'None'
- on_halted(context: SimpleNamespace) None¶
Will be executed by the
ProcessingProtocolwheneverProcessingProtocol.on_halted()is called- Parameters:
context – Same context that is used by
ProcessingProtocol.on_halted()
This callable had the
hookdecorator applied. Original signature:def on_halted(self, context: 'types.SimpleNamespace') -> 'None'
- on_destroyed(context: SimpleNamespace) None¶
Will be executed by the
ProcessingProtocolwheneverProcessingProtocol.on_destroyed()is called- Parameters:
context – Same context that is used by
ProcessingProtocol.on_destroyed()
This callable had the
hookdecorator applied. Original signature:def on_destroyed(self, context: 'types.SimpleNamespace') -> 'None'
- validate()¶
Run all rules in
validation_rules
- classmethod mutate_pm()¶
Change specifications of a processing routine, but notify waiters for the modules
change_specscondition.- Parameters:
module – a processing routine
specs – specifications as a
ubii.proto.ProcessingModulemessage
This callable had the
hookdecorator applied. Original signature:async def mutate_pm(cls, module: 'ProcessingRoutine', specs: 'ubii.proto.ProcessingModule') -> 'None'
- classmethod start()¶
Start the internal
ProcessingProtocolof the passed routine.- Parameters:
pm – instance that needs to be started
- Returns:
routine passed as
pm
This callable had the
hookdecorator applied. Original signature:async def start(cls, pm: 'ProcessingRoutine') -> 'ProcessingRoutine'
- classmethod stop()¶
Stop the internal
ProcessingProtocolof the passed routine and remove the routine from the registry.- Parameters:
pm – instance that needs to be stopped
- Returns:
routine passed as argument
This callable had the
hookdecorator applied. Original signature:async def stop(cls, pm: 'ProcessingRoutine')
- classmethod halt()¶
Halt the internal
ProcessingProtocolof the passed routine. If th protocol is already in end state, do nothing but raise a warning.- Parameters:
pm – instance that needs to be halted
- Returns:
routine passed as argument
This callable had the
hookdecorator applied. Original signature:async def halt(cls, pm: 'ProcessingRoutine')
- validation_rules: List[Callable[[ProcessingRoutine], None]] = [<function ProcessingRoutine.rules.validate_language>, <function ProcessingRoutine.rules.validate_id>]¶
Callables to validate the protobuf message used in
validate()See also
ProcessingRoutine.rules– details on rules used here
- class ubii.framework.processing.PM_STAT(value)¶
Bases:
IntFlagProxy for
Statusbut asenum.IntFlag, to allow defining combinations of states.- INITIALIZED = 1¶
- CREATED = 2¶
- PROCESSING = 4¶
- HALTED = 8¶
- DESTROYED = 16¶
- class ubii.framework.processing.ProcessingProtocol(pm: ProcessingRoutine)¶
Bases:
AbstractProtocol[PM_STAT]This
AbstractProtocolimplementation defines the Protocol used to runProcessingRoutines.It defines valid
state_changesand callbacks, as well as thestarting_stateandend_stateThe
pm_proxymethods are used to decorate the lifecycle callbackson_created(),on_init(),on_processing(),on_halted(),on_destroyed(), to execute the proxied methods in theProcessingRoutinewhich owns theProcessingProtocoland set theProcessingRoutine.status- task_nursery¶
inherited from
ubii.framework.protocol.AbstractProtocol
- state¶
inherited from
ubii.framework.protocol.AbstractProtocol
- end_state: T_EnumFlag = 16¶
If the protocol ends up in its end state, the coroutine running the protocol will finish
- AnyState = 31¶
This is a combination of all possible states to allow easier transitions in
state_changes
- class pm_proxy¶
Bases:
objectDefine some decorators for callables like
on_processing(), because all lifecycle callbacks need to set thestatusof the corresponding processing routine, and call the corresponding callbacks (e.g. if aProcessingProtocol.on_processing()callback is called, it needs to call theProcessingRoutine.on_processing()callback of themanaged processing routine)- classmethod set_status_in_pm(status: Status) Callable¶
Callable decorated with returned decorator sets the status of the
ProcessingRoutineinstance to the passed status when called, and notifies awaitables waiting forchange_specs- Parameters:
status – which status will be set
- Returns:
Decorator
- classmethod callback_in_pm(name: str) Callable¶
Callable decorated with returned decorator calls the method with specified name of the
ProcessingRoutineinstance. arguments of decorated callable are passed on.- Parameters:
name – e.g.
'on_destroyed'- Returns:
Decorator
- class helpers¶
Bases:
objectUsed for pre- / post-processing for the context data passed between lifecycle callbacks
- classmethod write_scheduler_data_to_context(scheduler: Scheduler, ctx: SimpleNamespace) None¶
Before processing, the processing context needs to be enriched with the topic data that the
Schedulerretrieved as inputs- Parameters:
scheduler – The Scheduler scheduling the callback
ctx – The context that will be passed to the callback
- classmethod publish_outputs_to_topics(pm: ProcessingRoutine, ctx: SimpleNamespace)¶
Looks up topics via
get_output_topic(), starts a async task to set the buffer value of the topic to the computed value from the context outputs, which triggers publishing if the topics has been set up correctly, see e.g.ubii.node.node_protocol.implement_processing().- Parameters:
pm – look up output topics in this routines
local_output_topics()ctx – extract computed outputs from this context
- __init__(pm: ProcessingRoutine)¶
The created
ProcessingProtocolimplements the behaviour of aProcessingModuleor rather aProcessingRoutineduring its lifetime. EachProcessingProtocolbelongs to oneProcessingRoutine- Parameters:
pm – the
ubii.proto.ProcessingModulewrapper that owns this protocol
- pm: ProcessingRoutine¶
reference to the
ProcessingModulewrapper that owns this protocol
- created_tasks: List[asyncio.Task]¶
All tasks created by this protocol, to be stopped / cancelled when necessary
- async on_init(context: SimpleNamespace)¶
First lifecycle method called after
pmhas been initializedCalls
pm.on_init()and sets thestatustoub.ProcessingModule.Status.INITIALIZED.Creates the following attributes in the context:
context.loopthe asyncio loop running the processingcontext.nurserythe entity responsible for starting async tasks (either thecontext.loopor a reference totask_nurserycontext.trigger_processingasyncio Event used later, seeon_created()context.scheduleraSchedulerinstance, responsible for scheduling processing calls. Scheduled callback when inputs are available will setcontext.trigger_processingcontext.muxera reference to theTopicMuxertype, needed to extract information from the records
Finally, sets the
statustoPM_STAT.CREATED- Parameters:
context – namespace object to hold data
- async on_created(context)¶
Calls
pm.on_created()and sets thestatustoub.ProcessingModule.Status.CREATED.Prepares the context for further processing: - creates inputs (context.inputs) - Prepare output mapping (context.outputs)
Waits for
context.processing_triggerthen extracts inputs fromcontext.scheduler(usinghelpers.write_scheduler_data_to_context()) and changes thestatustoPM_STAT.PROCESSING
- async on_processing(context)¶
Calls
pm.on_processing()and sets thestatustoubii.proto.ProcessingModule.Status.PROCESSING.publishes computed outputs (see
helpers.publish_outputs_to_topics())adjusts the
context.schedulercallback (previously used to simply trigger processing) toextract data from context using
helpers.write_scheduler_data_to_context()compute with
on_processing()of ownerpublish outputs to topics using
helpers.publish_outputs_to_topics()
such that as long as the
statusdoes not change data will be processed.
Does not change the
statusof the protocol, state change has to be triggered externally in this state.
- async on_halted(context)¶
Calls
pm.on_halted()and sets thestatustoub.ProcessingModule.Status.HALTED.Haltsthecontext.schedulerDoes not change the
statusof the protocol, state change has to be triggered externally in this state.
- async on_destroyed(context)¶
Calls
pm.on_destroyed()and sets thestatustoub.ProcessingModule.Status.DESTROYED.Awaits cancellation of all tasks that have been started by this protocol.
- state_changes: Mapping[Tuple[T_EnumFlag | None, ...], Callback] = {(None, <PM_STAT.INITIALIZED: 1>): <function ProcessingProtocol.on_init>, (<PM_STAT.INITIALIZED: 1>, <PM_STAT.CREATED: 2>): <function ProcessingProtocol.on_created>, (<PM_STAT.CREATED: 2>, <PM_STAT.PROCESSING: 4>): <function ProcessingProtocol.on_processing>, (<PM_STAT.DESTROYED|HALTED|PROCESSING|CREATED|INITIALIZED: 31>, <PM_STAT.HALTED: 8>): <function ProcessingProtocol.on_halted>, (<PM_STAT.DESTROYED|HALTED|PROCESSING|CREATED|INITIALIZED: 31>, <PM_STAT.DESTROYED: 16>): <function ProcessingProtocol.on_destroyed>}¶
Possible state changes and respective callbacks