ubii.node.connections module

Concrete implementation of ServiceConnection and DataConnection using aiohttp.

ubii.node.connections.local_ip

global attribute containing the local ip of the machine

class ubii.node.connections.AIOHttpConnection(url: str, host_ip: str = local_ip)

Bases: object

Base class for connections using aiohttp

__init__(url: str, host_ip: str = local_ip)

Create connection to specific url

Parameters:
  • url – url that you want to connect to, make sure that it uses a valid url scheme

  • host_ip – your local ip, needed to create the right CORS headers

url: str

field to access url

https: bool

True if url scheme of url is ‘https’ else False

host_ip: str

Used for headers

property headers: Dict[str, str]

dictionary containing key origin with value equal to the host’s tcp port url, to be included in the requests for CORS authentication with the master node

Example

>>> from ubii.node import connect_client
>>> import asyncio
>>> async def main():
...     async with connect_client() as client:
...             topic_connection = client.protocol._context.topic_connection
...             assert topic_connection is not None
...             print(topic_connection.host_ip)
...             print(topic_connection.headers)
...
>>> asyncio.run(main())
'10.0.0.1'
{'origin': 'http://10.0.0.1:8080'}
property session: ClientSession

Session used for requests, can have special debug handling or JSON formatter

When setting this property a private event is set, that can be used to “queue” requests until a session is defined.

When this property is deleted, the event is unset i.e. future requests can wait for a new session to be set.

class ubii.node.connections.AIOHttpWebsocketConnection(url: str, host_ip: str = local_ip, max_message_size: int | None = 0)

Bases: AIOHttpConnection, DataConnection

A simple WebSocket connection that implements the DataConnection abstract base class.

class Events(connected: ~asyncio.locks.Event = <factory>, disconnected: ~asyncio.locks.Event = <factory>)

Bases: object

public events that are set and unset during the lifetime of the connection

connected: Event
disconnected: Event
log_socket_in = <Logger ubii.node.connections.in.socket (WARNING)>

Logger for incoming traffic

log_socket_out = <Logger ubii.node.connections.out.socket (WARNING)>

Logger for outgoing traffic

__init__(url: str, host_ip: str = local_ip, max_message_size: int | None = 0)

Create websocket connection

Parameters:
  • url – Url to connect to, according to broker config, see AIOHttpConnection for more info

  • host_ip – ip of host, see AIOHttpConnection for more info

  • max_message_size – maximum allowed size for socket reads, set to None to use aiohttp default (4MB), out default is 0 (no limit)

url: str

field to access url

https: bool

True if url scheme of url is ‘https’ else False

host_ip: str

Used for headers

events: AIOHttpWebsocketConnection.Events

Public events to wait for connection / disconnection in client code

connect(client_id: str)

Use this async context manager to establish a connection for a specific client, and disconnect it afterwards.

Warning

A AIOHttpWebsocketConnection can only be connected with one client id. If you try to connect a connected connection, a warning will be raised.

Example

Setup:
  • Master node running on localhost

  • local ip address 10.0.0.1

Note that the url of the connection that is created by default (with connect_client) contains the local ip of the machine not localhost because the default protocol adjusts the connections’ url – once the master node communicates its ip address – to match the ip of the master node. On the one hand this makes sure that the master node sends a valid Server message, on the other hand this makes output / logs more readable because all connections share the same ip in their url (the service_connection is also adjusted once the master node sends the Server message)

>>> from ubii.node import connect_client
>>> import asyncio
>>> async def main():
...     async with connect_client() as client:
...         topic_connection = client.protocol._context.topic_connection
...         assert topic_connection is not None
...         async with topic_connection.connect(client.id) as connected:
...             print(connected.url)
...
>>> asyncio.run(main())
[...] UserWarning: <ubii.framework.connections.AIOHttpWebsocketConnection with url=ws://10.0.0.1:8104> is already connected.
ws://10.0.0.1:8104

It’s also possible to handle messages from the topic connection manually (but not advisable)

from ubii.node import connect_client
import asyncio

async def foo():
    async with connect_client() as client:
        topic_connection = client.protocol.context.topic_connection
        assert topic_connection is not None
        async for topic_data in topic_connection:
            # do something
            ...
Parameters:

client_id – e.g. content of a ubii.proto.Client.id field.

Returns:

an async context manager

property ws: aiohttp.ClientWebSocketResponse | None

If connection is open use this ClientWebSocketResponse object to read and send messages

Setting this attribute clears the disconnected event, and sets the connected event in events

Deleting this attribute clears the connected event, and sets the disconnected event in events (unless already disconnected)

property client_id

If set, the connection belongs to a specific client. If not set, the connection is not usable.

Setting this attribute is only allowed if it is not already set. Otherwise, a warning will be raised. Delete the attribute to clear it and allow setting it to a different id.

async send(data: TopicData, timeout=None)

Use this coroutine to send TopicData messages to the master node. Will wait until events.connected is set i.e. until the connection has a valid url with client id.

Parameters:
  • data – message to send

  • timeout – used for waiting for the events.connected event, as well as for actually sending of the message i.e. for the first message the actual time until an error is raised can be at most double the timeout value (in seconds). If not set, coroutine waits an unlimited time – optional

Raises:

asyncio.TimeoutError – if timeout is set and connection or sending of message takes longer

class ubii.node.connections.AIOHttpRestConnection(url: str, host_ip: str = local_ip)

Bases: AIOHttpConnection, ServiceConnection

Send Service Request Messages

url: str

field to access url

https: bool

True if url scheme of url is ‘https’ else False

host_ip: str

Used for headers

async send(request: ServiceRequest, timeout=None) ServiceReply

Send a ServiceRequest and wait for ServiceReply from master node.

Parameters:
  • request – as proto plus message wrapper object for a request

  • timeout – if not set, wait indefinitely. Else wait for timeout seconds for the connection to become usable (needs a session), then send the message and wait for timeout seconds for reply, then raise error

Returns:

a wrapper object for the reply

Raises:

asyncio.TimeoutError – if timeout is set and connection is not usable in time or reply is not received in time

See also

ubii.proto.ServiceRequest.type – the oneof group for possible request fields

ubii.proto.ServiceReply.type – the oneof group for possible reply fields

ubii.node.connections.aiohttp_session()

We create a aiohttp session with our custom json encoder and some logging handlings in debug mode.

If ubii.framework.util.debug() is used to turn on debug mode of the framework, this function will return a different session, with logging for requests (see aiohttp.TraceConfig) and a timeout of 1 second compared to 300 seconds for the normal session.

Both sessions use the ubii.proto.util.ProtoEncoder to serialize json, and raise Exceptions when requests fail (see aiohttp.ClientSession.raise_for_status)

>>> from ubii.node import connections
>>> from ubii.framework import debug
>>> session = connections.aiohttp_session()
>>> debug(True)
True
>>> debug_session = connections.aiohttp_session()
>>> session.timeout
ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None)
>>> debug_session.timeout
ClientTimeout(total=1, connect=None, sock_read=None, sock_connect=None)
>>> session.trace_configs
[]
>>> debug_session.trace_configs
[<aiohttp.tracing.TraceConfig object at 0x7f657a080c70>]
>>> session.raise_for_status
True
>>> debug_session.raise_for_status
True
>>> session.json_serialize
functools.partial(<function dumps at 0x7f657b3f4940>, cls=<class 'ubii.proto.util.ProtoEncoder'>)
>>> debug_session.json_serialize
functools.partial(<function dumps at 0x7f657b3f4940>, cls=<class 'ubii.proto.util.ProtoEncoder'>)