(Python) Help me understand async with haio

   981   2   1
User Avatar
Member
45 posts
Joined: May 2019
Offline
I am building an hda that performs some network communication, and wanted to try using asyncio/haio for it. In addition to regular requests, there is also a Server Sent Events component, which receives server initiated messages.

It mostly works, but sometimes I get this error that I do not understand:
Exception in callback
handle: <AsyncioHandle AsyncioTransport._loop_reading(<Future finis.../haio.py:2188>) created at /opt/hfs20.5.510/houdini/python3.11libs/haio.py:2149>
source_traceback: Object created at (most recent call last):
  File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", line 1182, in _on_read
    fut.set_result((buffer, length))
  File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", line 2149, in call_soon
    handle = self._call_soon(callback, args, context)
Traceback (most recent call last):
  File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", line 2004, in _run
    self._context.run(self._callback, *self._args)
  File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", line 1821, in _loop_reading
    self._data_received(data, length)
  File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", line 1841, in _data_received
    assert self._pending_data_length == -1
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I am guessing the transport goes into inconsistent state.

Apparently Houdini always maintains a single event loop, new_event_loop just returns the existing instance. I read in the code comments that Qt already maintains an event loop.
User Avatar
Member
45 posts
Joined: May 2019
Offline
Going further with this, I set PYTHONTRACEMALLOC=25
And got this when attempting a request:
sys:1: RuntimeWarning: coroutine 'connect_tcp.<locals>.try_connect' was never awaited
Object allocated at (most recent call last):
File "/opt/hfs20.5.510/houdini/python3.11libs/haio.py", lineno 2004
self._context.run(self._callback, *self._args)
File "/home/mmoshev@bottleship.local/code/cairos-houdini-user-client/python/cairos_houdini_user_client.py", lineno 282
response = await login_outseta_login_post.asyncio_detailed(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/cairos_python_lowlevel/cairos_python_lowlevel/api/default/login_outseta_login_post.p
y", lineno 147
response = await client.get_async_httpx_client().request(**kwargs)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_client.py", lineno 1585
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_client.py", lineno 1674
response = await self._send_handling_auth(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_client.py", lineno 1702
response = await self._send_handling_redirects(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_client.py", lineno 1739
response = await self._send_single_request(request)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_client.py", lineno 1776
response = await transport.handle_async_request(request)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpx/_transports/default.py", lineno 377
resp = await self._pool.handle_async_request(req)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", lineno 236
response = await connection.handle_async_request(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpcore/_async/connection.py", lineno 78
stream = await self._connect(request)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpcore/_async/connection.py", lineno 124
stream = await self._network_backend.connect_tcp(**kwargs)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpcore/_backends/auto.py", lineno 31
return await self._backend.connect_tcp(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/httpcore/_backends/anyio.py", lineno 115
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/anyio/_core/_sockets.py", lineno 231
tg.start_soon(try_connect, addr, event)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", lineno 885
self._spawn(func, args, name)
File "/usr/local/share/cairos/venvs/cairos/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", lineno 850
coro = func(*args, **kwargs)

In addition, I tried explicitly passing a haio.AsyncTransport to the httpx client, but that did not work, because it is missing the method handle_async_request.
User Avatar
Member
45 posts
Joined: May 2019
Offline
I fixed a couple of more obvious errors in haio.py, caught by the static analyzer too.

It was calling instance instead of isinstance, don't think that's something exposed by the compiled library?
Also there was an extra argument to self._data_received, an Error that does not appear in the signature.

Now I am getting the initial error (assertion error on received data length), but SSE communication seems to continue.

I propose the following patch:
--- /opt/hfs20.5.368/houdini/python3.11libs/haio.py     2024-09-25 16:39:39.000000000 +0300
+++ /opt/hfs20.5.510/houdini/python3.11libs/haio.py     2025-05-30 15:08:53.938096321 +0300
@@ -654,7 +654,7 @@
         self.write(data)
 
     def write(self, data):
-        if not instance(data, (bytes, bytearray, memoryview)):
+        if not isinstance(data, (bytes, bytearray, memoryview)):
             raise TypeError(f'data argument must be a bytes-like object, '
                             f'not {type(data).__name__}')
         if self._eof_written:
@@ -1770,7 +1770,7 @@
         data = self._pending_data
         self._pending_data = None
         if length > -1 and data is not None:
-            self._loop.call_soon(self._data_received, Error(), data, length)
+            self._loop.call_soon(self._data_received, data, length)
 
         if self._loop.get_debug():
             self._loop._logger.debug('%r resumes reading', self)
@@ -2834,7 +2834,7 @@
         return asyncio.wrap_future(executor.submit(func, *args), loop=self)
 
     def set_default_executor(self, executor):
-        if not instance(executor, concurrent.futures.ThreadPoolExecutor):
+        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
             raise TypeError('executor must be ThreadPoolExecutor instance')
         self._default_executor = executor
Edited by monomon - May 30, 2025 09:05:01
  • Quick Links