ubii.node.connections module¶
Concrete implementation of ServiceConnection and
DataConnection using aiohttp.
- ubii.node.connections.local_ip¶
global attribute containing the local ip of the machine
- class ubii.node.connections.AIOHttpConnection(url: str, host_ip: str = local_ip)¶
Bases:
objectBase class for connections using
aiohttp- __init__(url: str, host_ip: str = local_ip)¶
Create connection to specific url
- Parameters:
url – url that you want to connect to, make sure that it uses a valid url scheme
host_ip – your local ip, needed to create the right CORS headers
- property headers: Dict[str, str]¶
dictionary containing key
originwith value equal to the host’s tcp port url, to be included in the requests for CORS authentication with the master nodeExample
>>> from ubii.node import connect_client >>> import asyncio >>> async def main(): ... async with connect_client() as client: ... topic_connection = client.protocol._context.topic_connection ... assert topic_connection is not None ... print(topic_connection.host_ip) ... print(topic_connection.headers) ... >>> asyncio.run(main()) '10.0.0.1' {'origin': 'http://10.0.0.1:8080'}
- property session: ClientSession¶
Session used for requests, can have special debug handling or JSON formatter
When setting this property a private event is set, that can be used to “queue” requests until a session is defined.
When this property is deleted, the event is unset i.e. future requests can wait for a new session to be set.
- class ubii.node.connections.AIOHttpWebsocketConnection(url: str, host_ip: str = local_ip, max_message_size: int | None = 0)¶
Bases:
AIOHttpConnection,DataConnectionA simple WebSocket connection that implements the
DataConnectionabstract base class.- class Events(connected: ~asyncio.locks.Event = <factory>, disconnected: ~asyncio.locks.Event = <factory>)¶
Bases:
objectpublic events that are set and unset during the lifetime of the connection
- connected: Event¶
- disconnected: Event¶
- log_socket_in = <Logger ubii.node.connections.in.socket (WARNING)>¶
Logger for incoming traffic
- log_socket_out = <Logger ubii.node.connections.out.socket (WARNING)>¶
Logger for outgoing traffic
- __init__(url: str, host_ip: str = local_ip, max_message_size: int | None = 0)¶
Create websocket connection
- Parameters:
url – Url to connect to, according to broker config, see
AIOHttpConnectionfor more infohost_ip – ip of host, see
AIOHttpConnectionfor more infomax_message_size – maximum allowed size for socket reads, set to None to use aiohttp default (4MB), out default is 0 (no limit)
- events: AIOHttpWebsocketConnection.Events¶
Public events to wait for connection / disconnection in client code
- connect(client_id: str)¶
Use this async context manager to establish a connection for a specific client, and disconnect it afterwards.
Warning
A
AIOHttpWebsocketConnectioncan only be connected with one client id. If you try to connect a connected connection, a warning will be raised.Example
- Setup:
Master node running on localhost
local ip address
10.0.0.1
Note that the url of the connection that is created by default (with
connect_client) contains the local ip of the machine notlocalhostbecause the default protocol adjusts the connections’ url – once the master node communicates its ip address – to match the ip of the master node. On the one hand this makes sure that the master node sends a validServermessage, on the other hand this makes output / logs more readable because all connections share the same ip in their url (theservice_connectionis also adjusted once the master node sends theServermessage)>>> from ubii.node import connect_client >>> import asyncio >>> async def main(): ... async with connect_client() as client: ... topic_connection = client.protocol._context.topic_connection ... assert topic_connection is not None ... async with topic_connection.connect(client.id) as connected: ... print(connected.url) ... >>> asyncio.run(main()) [...] UserWarning: <ubii.framework.connections.AIOHttpWebsocketConnection with url=ws://10.0.0.1:8104> is already connected. ws://10.0.0.1:8104
It’s also possible to handle messages from the topic connection manually (but not advisable)
from ubii.node import connect_client import asyncio async def foo(): async with connect_client() as client: topic_connection = client.protocol.context.topic_connection assert topic_connection is not None async for topic_data in topic_connection: # do something ...
- Parameters:
client_id – e.g. content of a
ubii.proto.Client.idfield.- Returns:
an async context manager
- property ws: aiohttp.ClientWebSocketResponse | None¶
If connection is open use this
ClientWebSocketResponseobject to read and send messagesSetting this attribute clears the
disconnectedevent, and sets theconnectedevent ineventsDeleting this attribute clears the
connectedevent, and sets thedisconnectedevent inevents(unless already disconnected)
- property client_id¶
If set, the connection belongs to a specific client. If not set, the connection is not usable.
Setting this attribute is only allowed if it is not already set. Otherwise, a warning will be raised. Delete the attribute to clear it and allow setting it to a different id.
- async send(data: TopicData, timeout=None)¶
Use this coroutine to send
TopicDatamessages to the master node. Will wait untilevents.connectedis set i.e. until the connection has a valid url with client id.- Parameters:
data – message to send
timeout – used for waiting for the
events.connectedevent, as well as for actually sending of the message i.e. for the first message the actual time until an error is raised can be at most double the timeout value (in seconds). If not set, coroutine waits an unlimited time – optional
- Raises:
asyncio.TimeoutError – if
timeoutis set and connection or sending of message takes longer
- class ubii.node.connections.AIOHttpRestConnection(url: str, host_ip: str = local_ip)¶
Bases:
AIOHttpConnection,ServiceConnectionSend Service Request Messages
- async send(request: ServiceRequest, timeout=None) ServiceReply¶
Send a
ServiceRequestand wait forServiceReplyfrom master node.- Parameters:
request – as proto plus message wrapper object for a request
timeout – if not set, wait indefinitely. Else wait for
timeoutseconds for the connection to become usable (needs asession), then send the message and wait fortimeoutseconds for reply, then raise error
- Returns:
a wrapper object for the reply
- Raises:
asyncio.TimeoutError – if
timeoutis set and connection is not usable in time or reply is not received in time
See also
ubii.proto.ServiceRequest.type– the oneof group for possible request fieldsubii.proto.ServiceReply.type– the oneof group for possible reply fields
- ubii.node.connections.aiohttp_session()¶
We create a aiohttp session with our custom json encoder and some logging handlings in debug mode.
If
ubii.framework.util.debug()is used to turn on debug mode of the framework, this function will return a different session, with logging for requests (seeaiohttp.TraceConfig) and a timeout of 1 second compared to 300 seconds for the normal session.Both sessions use the
ubii.proto.util.ProtoEncoderto serialize json, and raise Exceptions when requests fail (seeaiohttp.ClientSession.raise_for_status)>>> from ubii.node import connections >>> from ubii.framework import debug >>> session = connections.aiohttp_session() >>> debug(True) True >>> debug_session = connections.aiohttp_session() >>> session.timeout ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None) >>> debug_session.timeout ClientTimeout(total=1, connect=None, sock_read=None, sock_connect=None) >>> session.trace_configs [] >>> debug_session.trace_configs [<aiohttp.tracing.TraceConfig object at 0x7f657a080c70>] >>> session.raise_for_status True >>> debug_session.raise_for_status True >>> session.json_serialize functools.partial(<function dumps at 0x7f657b3f4940>, cls=<class 'ubii.proto.util.ProtoEncoder'>) >>> debug_session.json_serialize functools.partial(<function dumps at 0x7f657b3f4940>, cls=<class 'ubii.proto.util.ProtoEncoder'>)