[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH RFC 3/7] protocol: generic async message-based protocol loop
From: |
Stefan Hajnoczi |
Subject: |
Re: [PATCH RFC 3/7] protocol: generic async message-based protocol loop |
Date: |
Tue, 13 Apr 2021 21:00:53 +0100 |
On Tue, Apr 13, 2021 at 11:55:49AM -0400, John Snow wrote:
> This module provides the protocol-agnostic framework upon which QMP will
> be built. I also have (not included in this series) a qtest
> implementation that uses this same framework, which is why it is split
> into two portions like this.
>
> The design uses two independent tasks in the "bottol half", a writer and
> a reader. These tasks run for the duration of the connection and
> independently send and receive messages, respectively.
>
> A third task, disconnect, is scheduled whenever an error occurs and
> facilitates coalescing of the other two tasks. MultiException is used in
> this case if *both* tasks should have Exceptions that need to be
> reported, though at the time of writing, I think this circumstance might
> only be a theoretical concern.
>
> The generic model here does not provide execute(), but the model for QMP
> is informative for how this class is laid out. Below, QMP's execute()
> function deposits a message into the outbound queue. The writer task
> wakes up to process the queue and deposits information in the write
> buffer, where the message is finally dispatched. Meanwhile, the
> execute() call is expected to block on an RPC mailbox waiting for a
> reply from the server.
>
> On the return trip, the reader wakes up when data arrives in the
> buffer. The message is deserialized and handed off to the protocol layer
> to route accordingly. QMP will route this message into either the Event
> queue or one of the pending RPC mailboxes.
>
> Upon this message being routed to the correct RPC mailbox, execute()
> will be woken up and allowed to process the reply and deliver it back to
> the caller.
>
> The reason for separating the inbound and outbound tasks to such an
> extreme degree is to allow for designs and extensions where this
> asynchronous loop may be launched in a separate thread. In this model,
> it is possible to use a synchronous, thread-safe function to deposit new
> messages into the outbound queue; this was seen as a viable way to offer
> solid synchronous bindings while still allowing events to be processed
> truly asynchronously.
>
> Separating it this way also allows us to fairly easily support
> Out-of-band executions with little additional effort; essentially all
> commands are treated as out-of-band.
>
> The execute graph:
>
> +---------+
> | caller |
> +---------+
> |
> v
> +---------+
> +---------------- |execute()| <----------+
> | +---------+ |
> | |
> -----------------------------------------------------------
> v |
> +----+----+ +-----------+ +------+-------+
> |Mailboxes| |Event Queue| |Outbound Queue|
> +----+----+ +------+----+ +------+-------+
> | | ^
> v v |
> +--+----------------+---+ +-----------+-----------+
> | Reader Task/Coroutine | | Writer Task/Coroutine |
> +-----------+-----------+ +-----------+-----------+
> | ^
> v |
> +-----+------+ +-----+------+
> |StreamReader| |StreamWriter|
> +------------+ +------------+
The arrow directions confuse me. I don't understand what they convey.
>
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
> protocol.py | 704 ++++++++++++++++++++++++++++++++++++++++++++++++++++
Yikes, this is complex. I'm not sure the abstractions are worth the
cost. Hopefully everything will be tied up with a simple high-level API
later in the series.
> 1 file changed, 704 insertions(+)
> create mode 100644 protocol.py
>
> diff --git a/protocol.py b/protocol.py
> new file mode 100644
> index 0000000..27d1558
> --- /dev/null
> +++ b/protocol.py
> @@ -0,0 +1,704 @@
> +"""
> +Async message-based protocol support.
> +
> +This module provides a generic framework for sending and receiving
> +messages over an asyncio stream.
> +
> +`AsyncProtocol` is an abstract class that implements the core mechanisms
> +of a simple send/receive protocol, and is designed to be extended.
> +
> +`AsyncTasks` provides a container class that aggregates tasks that make
> +up the loop used by `AsyncProtocol`.
> +"""
> +
> +import asyncio
> +from asyncio import StreamReader, StreamWriter
> +import logging
> +from ssl import SSLContext
> +from typing import (
> + Any,
> + Awaitable,
> + Callable,
> + Coroutine,
> + Iterator,
> + List,
> + Generic,
> + Optional,
> + Tuple,
> + TypeVar,
> + Union,
> +)
> +
> +from error import (
> + ConnectError,
> + MultiException,
> + StateError,
> +)
> +from util import create_task, pretty_traceback, wait_closed
> +
> +
> +T = TypeVar('T')
> +_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
> +_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
> +_GatherRet = List[Optional[BaseException]]
> +
> +
> +class AsyncTasks:
> + """
> + AsyncTasks is a collection of bottom half tasks designed to run forever.
> +
> + This is a convenience wrapper to make calls from `AsyncProtocol` simpler
> to
> + follow by behaving as a simple aggregate of two or more tasks, such that
> + a higher-level connection manager can simply refer to "the bottom half"
> + as one coherent entity instead of several.
> +
> + The general flow is:
> +
> + 1. ``tasks = AsyncTasks(logger_for_my_client)``
> + 2. ``tasks.start(my_reader, my_writer)``
> + 3. ``...``
> + 4. ``await tasks.cancel()``
> + 5. ``tasks.result()``
> +
> + :param logger: A logger to use for debugging messages. Useful to
> + associate messages with a particular server context.
> + """
> +
> + logger = logging.getLogger(__name__)
> +
> + def __init__(self, logger: Optional[logging.Logger] = None):
> + if logger is not None:
> + self.logger = logger
> +
> + # Named tasks
> + self.reader: Optional['asyncio.Future[None]'] = None
> + self.writer: Optional['asyncio.Future[None]'] = None
> +
> + # Internal aggregate of all of the above tasks.
> + self._all: Optional['asyncio.Future[_GatherRet]'] = None
> +
> + def _all_tasks(self) -> Iterator[Optional['asyncio.Future[None]']]:
> + """Yields all tasks, defined or not, in ideal cancellation order."""
> + yield self.writer
> + yield self.reader
> +
> + def __iter__(self) -> Iterator['asyncio.Future[None]']:
> + """Yields all defined tasks, in ideal cancellation order."""
> + for task in self._all_tasks():
> + if task is not None:
> + yield task
> +
> + @property
> + def _all_tasks_defined(self) -> bool:
> + """Returns True if all tasks are defined."""
> + return all(map(lambda task: task is not None, self._all_tasks()))
> +
> + @property
> + def _some_tasks_done(self) -> bool:
> + """Returns True if any defined tasks are done executing."""
> + return any(map(lambda task: task.done(), iter(self)))
> +
> + def __bool__(self) -> bool:
> + """Returns True when any tasks are defined at all."""
> + return bool(tuple(iter(self)))
> +
> + @property
> + def running(self) -> bool:
> + """Returns True if all tasks are defined and still running."""
> + return self._all_tasks_defined and not self._some_tasks_done
> +
> + def start(self,
> + reader_coro: Coroutine[Any, Any, None],
> + writer_coro: Coroutine[Any, Any, None]) -> None:
> + """
> + Starts executing tasks in the current async context.
> +
> + :param reader_coro: Coroutine, message reader task.
> + :param writer_coro: Coroutine, message writer task.
> + """
> + self.reader = create_task(reader_coro)
> + self.writer = create_task(writer_coro)
> +
> + # Uses extensible self-iterator.
> + self._all = asyncio.gather(*iter(self), return_exceptions=True)
> +
> + async def cancel(self) -> None:
> + """
> + Cancels all tasks and awaits full cancellation.
> +
> + Exceptions, if any, can be obtained by calling `result()`.
> + """
> + for task in self:
> + if task and not task.done():
> + self.logger.debug("cancelling task %s", str(task))
> + task.cancel()
> +
> + if self._all:
> + self.logger.debug("Awaiting all tasks to finish ...")
> + await self._all
> +
> + def _cleanup(self) -> None:
> + """
> + Erase all task handles; asserts that no tasks are running.
> + """
> + def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
> + assert (task is None) or task.done()
> + return None if (task and task.done()) else task
> +
> + self.reader = _paranoid_task_erase(self.reader)
> + self.writer = _paranoid_task_erase(self.writer)
> + self._all = _paranoid_task_erase(self._all)
> +
> + def result(self) -> None:
> + """
> + Raises exception(s) from the finished tasks, if any.
> +
> + Called to fully quiesce this task group. asyncio.CancelledError is
> + never raised; in the event of an intentional cancellation this
> + function will not raise any errors.
> +
> + If an exception in one bottom half caused an unscheduled disconnect,
> + that exception will be raised.
> +
> + :raise: `Exception` Arbitrary exceptions re-raised on behalf of
> + the bottom half.
> + :raise: `MultiException` Iterable Exception used to multiplex
> multiple
> + exceptions when multiple threads failed.
> + """
> + exceptions: List[BaseException] = []
> + results = self._all.result() if self._all else ()
> + self._cleanup()
> +
> + for result in results:
> + if result is None:
> + continue
> + if not isinstance(result, asyncio.CancelledError):
> + exceptions.append(result)
> +
> + if len(exceptions) == 1:
> + raise exceptions.pop()
> + if len(exceptions) > 1:
> + raise MultiException(exceptions)
> +
> +
> +class AsyncProtocol(Generic[T]):
> + """AsyncProtocol implements a generic async message-based protocol.
> +
> + This protocol assumes the basic unit of information transfer between
> + client and server is a "message", the details of which are left up
> + to the implementation. It assumes the sending and receiving of these
> + messages is full-duplex and not necessarily correlated; i.e. it
> + supports asynchronous inbound messages.
> +
> + It is designed to be extended by a specific protocol which provides
> + the implementations for how to read and send messages. These must be
> + defined in `_do_recv()` and `_do_send()`, respectively.
> +
> + Other callbacks that have a default implemention, but may be
> + extended or overridden:
> + - _on_connect: Actions performed prior to loop start.
> + - _on_start: Actions performed immediately after loop start.
> + - _on_message: Actions performed when a message is received.
> + The default implementation does nothing at all.
> + - _cb_outbound: Log/Filter outgoing messages.
> + - _cb_inbound: Log/Filter incoming messages.
This reminds me of asyncio.protocols and twisted.internet.protocol.
> +
> + :param name: Name used for logging messages, if any.
> + """
> + #: Logger object for debugging messages
> + logger = logging.getLogger(__name__)
> +
> + # -------------------------
> + # Section: Public interface
> + # -------------------------
> +
> + def __init__(self, name: Optional[str] = None) -> None:
> + self.name = name
> + if self.name is not None:
> + self.logger = self.logger.getChild(self.name)
> +
> + # stream I/O
> + self._reader: Optional[StreamReader] = None
> + self._writer: Optional[StreamWriter] = None
> +
> + # I/O queues
> + self._outgoing: asyncio.Queue[T] = asyncio.Queue()
> +
> + # I/O tasks (message reader, message writer)
> + self._tasks = AsyncTasks(self.logger)
> +
> + # Disconnect task; separate from the core loop.
> + self._dc_task: Optional[asyncio.Future[None]] = None
> +
> + @property
> + def running(self) -> bool:
> + """
> + Return True when the loop is currently connected and running.
> + """
> + if self.disconnecting:
> + return False
> + return self._tasks.running
> +
> + @property
> + def disconnecting(self) -> bool:
> + """
> + Return True when the loop is disconnecting, or disconnected.
> + """
> + return bool(self._dc_task)
> +
> + @property
> + def unconnected(self) -> bool:
> + """
> + Return True when the loop is fully idle and quiesced.
> +
> + Returns True specifically when the loop is neither `running`
> + nor `disconnecting`. A call to `disconnect()` is required
> + to transition from `disconnecting` to `unconnected`.
> + """
> + return not (self.running or self.disconnecting)
> +
> + async def accept(self, address: Union[str, Tuple[str, int]],
> + ssl: Optional[SSLContext] = None) -> None:
> + """
> + Accept a connection and begin processing message queues.
> +
> + :param address: Address to connect to;
> + UNIX socket path or TCP address/port.
> + :param ssl: SSL context to use, if any.
> +
> + :raise: `StateError` (loop is running or disconnecting.)
> + :raise: `ConnectError` (Connection was not successful.)
> + """
> + if self.disconnecting:
> + raise StateError("Client is disconnecting/disconnected."
> + " Call disconnect() to fully disconnect.")
> + if self.running:
> + raise StateError("Client is already connected and running.")
> + assert self.unconnected
> +
> + try:
> + await self._new_session(self._do_accept(address, ssl))
> + except Exception as err:
> + emsg = "Failed to accept incoming connection"
> + self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
> + raise ConnectError(f"{emsg}: {err!s}") from err
Wrapping the exception in ConnectError() obfuscates what's going on IMO.
> +
> + async def connect(self, address: Union[str, Tuple[str, int]],
> + ssl: Optional[SSLContext] = None) -> None:
> + """
> + Connect to the server and begin processing message queues.
> +
> + :param address: Address to connect to;
> + UNIX socket path or TCP address/port.
> + :param ssl: SSL context to use, if any.
> +
> + :raise: `StateError` (loop is running or disconnecting.)
> + :raise: `ConnectError` (Connection was not successful.)
> + """
> + if self.disconnecting:
> + raise StateError("Client is disconnecting/disconnected."
> + " Call disconnect() to fully disconnect.")
> + if self.running:
> + raise StateError("Client is already connected and running.")
> + assert self.unconnected
> +
> + try:
> + await self._new_session(self._do_connect(address, ssl))
> + except Exception as err:
> + emsg = "Failed to connect to server"
> + self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
> + raise ConnectError(f"{emsg}: {err!s}") from err
> +
> + async def disconnect(self) -> None:
> + """
> + Disconnect and wait for all tasks to fully stop.
> +
> + If there were exceptions that caused the bottom half to terminate
> + prematurely, they will be raised here.
> +
> + :raise: `Exception` Arbitrary exceptions re-raised on behalf of
> + the bottom half.
> + :raise: `MultiException` Iterable Exception used to multiplex
> multiple
> + exceptions when multiple tasks failed.
> + """
> + self._schedule_disconnect()
> + await self._wait_disconnect()
> +
> + # -----------------------------
> + # Section: Connection machinery
> + # -----------------------------
> +
> + async def _register_streams(self,
> + reader: asyncio.StreamReader,
> + writer: asyncio.StreamWriter) -> None:
> + """Register the Reader/Writer streams."""
> + self._reader = reader
> + self._writer = writer
> +
> + async def _new_session(self, coro: Awaitable[None]) -> None:
> + """
> + Create a new session.
> +
> + This is called for both `accept()` and `connect()` pathways.
> +
> + :param coro: An awaitable that will perform either connect or accept.
> + """
> + assert self._reader is None
> + assert self._writer is None
> +
> + # NB: If a previous session had stale messages, they are dropped
> here.
> + self._outgoing = asyncio.Queue()
> +
> + # Connect / Await Connection
> + await coro
> + assert self._reader is not None
> + assert self._writer is not None
> +
> + await self._on_connect()
> +
> + reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
> + writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
> + self._tasks.start(reader_coro, writer_coro)
> +
> + await self._on_start()
> +
> + async def _do_accept(self, address: Union[str, Tuple[str, int]],
> + ssl: Optional[SSLContext] = None) -> None:
> + """
> + Acting as the protocol server, accept a single connection.
> +
> + Used as the awaitable callback to `_new_session()`.
> + """
> + self.logger.debug("Awaiting connection ...")
> + connected = asyncio.Event()
> + server: Optional[asyncio.AbstractServer] = None
> +
> + async def _client_connected_cb(reader: asyncio.StreamReader,
> + writer: asyncio.StreamWriter) -> None:
> + """Used to accept a single incoming connection, see below."""
> + nonlocal server
> + nonlocal connected
> +
> + # A connection has been accepted; stop listening for new ones.
> + assert server is not None
> + server.close()
> + await server.wait_closed()
> + server = None
> +
> + # Register this client as being connected
> + await self._register_streams(reader, writer)
> +
> + # Signal back: We've accepted a client!
> + connected.set()
> +
> + if isinstance(address, tuple):
> + coro = asyncio.start_server(
> + _client_connected_cb,
> + host=address[0],
> + port=address[1],
> + ssl=ssl,
> + backlog=1,
> + )
> + else:
> + coro = asyncio.start_unix_server(
> + _client_connected_cb,
> + path=address,
> + ssl=ssl,
> + backlog=1,
> + )
> +
> + server = await coro # Starts listening
> + await connected.wait() # Waits for the callback to fire (and finish)
> + assert server is None
Async callbacks defeat the readability advantages of coroutines :(.
asyncio.start_server() is designed for spawning client connections in
separate tasks but we just want to accept a client connection in the
current coroutine. It might be possible to eliminate the callback
business by not using the server and instead doing:
conn, addr = await loop.sock_accept(listen_sock)
Whether that ends up being simpler, I'm not sure because you may need to
unwrap/wrap the listen_sock and conn sockets into asyncio classes to
interface with the rest of the code.
> +
> + self.logger.debug("Connection accepted")
> +
> + async def _do_connect(self, address: Union[str, Tuple[str, int]],
> + ssl: Optional[SSLContext] = None) -> None:
> + self.logger.debug("Connecting ...")
> +
> + if isinstance(address, tuple):
> + connect = asyncio.open_connection(address[0], address[1],
> ssl=ssl)
> + else:
> + connect = asyncio.open_unix_connection(path=address, ssl=ssl)
> + reader, writer = await(connect)
> + await self._register_streams(reader, writer)
> +
> + self.logger.debug("Connected")
> +
> + async def _on_connect(self) -> None:
> + """
> + Async callback invoked after connection, but prior to loop start.
> +
> + This callback is invoked after the stream is opened, but prior to
> + starting the reader/writer tasks. Use this callback to handle
> + handshakes, greetings, &c to avoid having special edge cases in the
> + generic message handler.
> + """
> + # Nothing to do in the general case.
> +
> + async def _on_start(self) -> None:
> + """
> + Async callback invoked after connection and loop start.
> +
> + This callback is invoked after the stream is opened AND after
> + the reader/writer tasks have been started. Use this callback to
> + auto-perform certain tasks during the connect() call.
> + """
> + # Nothing to do in the general case.
> +
> + def _schedule_disconnect(self) -> None:
> + """
> + Initiate a disconnect; idempotent.
> +
> + This is called by the reader/writer tasks upon exceptions,
> + or directly by a user call to `disconnect()`.
> + """
> + if not self._dc_task:
> + self._dc_task = create_task(self._bh_disconnect())
> +
> + async def _wait_disconnect(self) -> None:
> + """
> + _wait_disconnect waits for a scheduled disconnect to finish.
> +
> + This function will gather any bottom half exceptions and re-raise
> them;
> + so it is intended to be used in the upper half call chain.
> +
> + If a single exception is encountered, it will be re-raised
> faithfully.
> + If multiple are found, they will be multiplexed into a
> MultiException.
> +
> + :raise: `Exception` Many kinds; anything the bottom half raises.
> + :raise: `MultiException` When the Reader/Writer both have exceptions.
> + """
> + assert self._dc_task
> + await self._dc_task
> + self._dc_task = None
> +
> + try:
> + self._tasks.result()
> + finally:
> + self._cleanup()
> +
> + def _cleanup(self) -> None:
> + """
> + Fully reset this object to a clean state.
> + """
> + assert not self.running
> + assert self._dc_task is None
> + # _tasks.result() called in _wait_disconnect does _tasks cleanup, so:
> + assert not self._tasks
> +
> + self._reader = None
> + self._writer = None
> +
> + # ------------------------------
> + # Section: Bottom Half functions
> + # ------------------------------
> +
> + async def _bh_disconnect(self) -> None:
> + """
> + Disconnect and cancel all outstanding tasks.
> +
> + It is designed to be called from its task context, self._dc_task.
> + """
> + # RFC: Maybe I shot myself in the foot by trying too hard to
> + # group the tasks together as one unit. I suspect the ideal
> + # cancellation order here is actually: MessageWriter,
> + # StreamWriter, MessageReader
> +
> + # What I have here instead is MessageWriter, MessageReader,
> + # StreamWriter
> +
> + # Cancel the the message reader/writer.
> + await self._tasks.cancel()
> +
> + # Handle the stream writer itself, now.
> + if self._writer:
> + if not self._writer.is_closing():
> + self.logger.debug("Writer is open; draining")
> + await self._writer.drain()
> + self.logger.debug("Closing writer")
> + self._writer.close()
> + self.logger.debug("Awaiting writer to fully close")
> + await wait_closed(self._writer)
> + self.logger.debug("Fully closed.")
> +
> + # TODO: Add a hook for higher-level protocol cancellations here?
> + # (Otherwise, the disconnected logging event happens too soon.)
> +
> + self.logger.debug("Protocol Disconnected.")
> +
> + async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
> + """
> + Run one of the bottom-half functions in a loop forever.
> +
> + If the bottom half ever raises any exception, schedule a disconnect.
> + """
> + try:
> + while True:
> + await async_fn()
> + except asyncio.CancelledError as err:
> + # We are cancelled (by _bh_disconnect), so no need to call it.
> + self.logger.debug("Task.%s: cancelled: %s.",
> + name, type(err).__name__)
> + raise
> + except:
> + self.logger.error("Task.%s: failure:\n%s\n", name,
> + pretty_traceback())
> + self.logger.debug("Task.%s: scheduling disconnect.", name)
> + self._schedule_disconnect()
> + raise
> + finally:
> + self.logger.debug("Task.%s: exiting.", name)
> +
> + async def _bh_send_message(self) -> None:
> + """
> + Wait for an outgoing message, then send it.
> + """
> + self.logger.log(5, "Waiting for message in outgoing queue to send
> ...")
> + msg = await self._outgoing.get()
> + try:
> + self.logger.log(5, "Got outgoing message, sending ...")
> + await self._send(msg)
> + finally:
> + self._outgoing.task_done()
> + self.logger.log(5, "Outgoing message sent.")
> +
> + async def _bh_recv_message(self) -> None:
> + """
> + Wait for an incoming message and call `_on_message` to route it.
> +
> + Exceptions seen may be from `_recv` or from `_on_message`.
> + """
> + self.logger.log(5, "Waiting to receive incoming message ...")
> + msg = await self._recv()
> + self.logger.log(5, "Routing message ...")
> + await self._on_message(msg)
> + self.logger.log(5, "Message routed.")
> +
> + # ---------------------
> + # Section: Datagram I/O
> + # ---------------------
> +
> + def _cb_outbound(self, msg: T) -> T:
> + """
> + Callback: outbound message hook.
> +
> + This is intended for subclasses to be able to add arbitrary hooks to
> + filter or manipulate outgoing messages. The base implementation
> + does nothing but log the message without any manipulation of the
> + message. It is designed for you to invoke super() at the tail of
> + any overridden method.
> +
> + :param msg: raw outbound message
> + :return: final outbound message
> + """
> + self.logger.debug("--> %s", str(msg))
> + return msg
> +
> + def _cb_inbound(self, msg: T) -> T:
> + """
> + Callback: inbound message hook.
> +
> + This is intended for subclasses to be able to add arbitrary hooks to
> + filter or manipulate incoming messages. The base implementation
> + does nothing but log the message without any manipulation of the
> + message. It is designed for you to invoke super() at the head of
> + any overridden method.
> +
> + This method does not "handle" incoming messages; it is a filter.
> + The actual "endpoint" for incoming messages is `_on_message()`.
> +
> + :param msg: raw inbound message
> + :return: processed inbound message
> + """
> + self.logger.debug("<-- %s", str(msg))
> + return msg
> +
> + async def _readline(self) -> bytes:
> + """
> + Wait for a newline from the incoming reader.
> +
> + This method is provided as a convenience for upper-layer
> + protocols, as many will be line-based.
> +
> + This function *may* return a sequence of bytes without a
> + trailing newline if EOF occurs, but *some* bytes were
> + received. In this case, the next call will raise EOF.
> +
> + :raise OSError: Stream-related errors.
> + :raise EOFError: If the reader stream is at EOF and there
> + are no bytes to return.
> + """
> + assert self._reader is not None
> + msg_bytes = await self._reader.readline()
> + self.logger.log(5, "Read %d bytes", len(msg_bytes))
> +
> + if not msg_bytes:
> + if self._reader.at_eof():
> + self.logger.debug("EOF")
> + raise EOFError()
> +
> + return msg_bytes
> +
> + async def _do_recv(self) -> T:
> + """
> + Abstract: Read from the stream and return a message.
> +
> + Very low-level; intended to only be called by `_recv()`.
> + """
> + raise NotImplementedError
> +
> + async def _recv(self) -> T:
> + """
> + Read an arbitrary protocol message. (WARNING: Extremely low-level.)
> +
> + This function is intended primarily for _bh_recv_message to use
> + in an asynchronous task loop. Using it outside of this loop will
> + "steal" messages from the normal routing mechanism. It is safe to
> + use during `_on_connect()`, but should not be used otherwise.
> +
> + This function uses `_do_recv()` to retrieve the raw message, and
> + then transforms it using `_cb_inbound()`.
> +
> + Errors raised may be any of those from either method implementation.
> +
> + :return: A single (filtered, processed) protocol message.
> + """
> + message = await self._do_recv()
> + return self._cb_inbound(message)
> +
> + def _do_send(self, msg: T) -> None:
> + """
> + Abstract: Write a message to the stream.
> +
> + Very low-level; intended to only be called by `_send()`.
> + """
> + raise NotImplementedError
> +
> + async def _send(self, msg: T) -> None:
> + """
> + Send an arbitrary protocol message. (WARNING: Low-level.)
> +
> + Like `_read()`, this function is intended to be called by the writer
> + task loop that processes outgoing messages. This function will
> + transform any outgoing messages according to `_cb_outbound()`.
> +
> + :raise: OSError - Various stream errors.
> + """
> + assert self._writer is not None
> + msg = self._cb_outbound(msg)
> + self._do_send(msg)
> +
> + async def _on_message(self, msg: T) -> None:
> + """
> + Called when a new message is received.
> +
> + Executed from within the reader loop BH, so be advised that waiting
> + on other asynchronous tasks may be risky, depending. Additionally,
> + any errors raised here will directly cause the loop to halt; limit
> + error checking to what is strictly necessary for message routing.
> +
> + :param msg: The incoming message, already logged/filtered.
> + """
> + # Nothing to do in the abstract case.
> --
> 2.30.2
>
signature.asc
Description: PGP signature
- [PATCH RFC 0/7] RFC: Asynchronous QMP Draft, John Snow, 2021/04/13
- [PATCH RFC 2/7] error: Error classes and so on., John Snow, 2021/04/13
- [PATCH RFC 4/7] message: add QMP Message type, John Snow, 2021/04/13
- [PATCH RFC 5/7] models: Add well-known QMP objects, John Snow, 2021/04/13
- [PATCH RFC 6/7] qmp_protocol: add QMP client implementation, John Snow, 2021/04/13