[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH RFC 6/7] qmp_protocol: add QMP client implementation
From: |
John Snow |
Subject: |
[PATCH RFC 6/7] qmp_protocol: add QMP client implementation |
Date: |
Tue, 13 Apr 2021 11:55:52 -0400 |
Using everything added so far, add the QMP client itself.
So far, this QMP object cannot actually pretend to be a server; it only
implements the client logic (receiving events and sending commands.)
Future work may involve implementing the ability to send events and
receive RPC commands, so that we can create a QMP test server for unit
test purposes.
(It can, however, both connect to or receive a connection from QEMU so
that it can be used to instrument iotests.)
Note: the event handling is a total hack; I need to figure out the most
delightful way to create an interface to consume these easily, as I
think it's one of the biggest shortcomings of the synchronous library so
far. Consider that part very much a work-in-progress.
Signed-off-by: John Snow <jsnow@redhat.com>
---
qmp_protocol.py | 420 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 420 insertions(+)
create mode 100644 qmp_protocol.py
diff --git a/qmp_protocol.py b/qmp_protocol.py
new file mode 100644
index 0000000..6e6ac25
--- /dev/null
+++ b/qmp_protocol.py
@@ -0,0 +1,420 @@
+"""
+QMP Client Implementation
+
+This module provides the QMP class, which can be used to connect and
+send commands to a QMP server such as QEMU. The QMP class can be used to
+either connect to a listening server, or used to listen and accept an
+incoming connection from the server.
+"""
+
+import asyncio
+import logging
+from typing import (
+ Awaitable,
+ Callable,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ cast,
+)
+
+from error import (
+ AQMPError,
+ DisconnectedError,
+ DeserializationError,
+ GreetingError,
+ NegotiationError,
+ StateError,
+ UnexpectedTypeError,
+)
+from message import (
+ Message,
+ ObjectTypeError,
+ ServerParseError,
+)
+from models import (
+ ErrorInfo,
+ ErrorResponse,
+ Greeting,
+ ParsingError,
+ ServerResponse,
+ SuccessResponse,
+)
+from protocol import AsyncProtocol
+from util import create_task, pretty_traceback
+
+
+class ExecuteError(AQMPError):
+ """Execution statement returned failure."""
+ def __init__(self,
+ sent: Message,
+ received: Message,
+ error: ErrorInfo):
+ super().__init__()
+ self.sent = sent
+ self.received = received
+ self.error = error
+
+ def __str__(self) -> str:
+ return self.error.desc
+
+
+_EventCallbackFn = Callable[['QMP', Message], Awaitable[None]]
+
+
+class QMP(AsyncProtocol[Message]):
+ """
+ Implements a QMP connection to/from the server.
+
+ Basic usage looks like this::
+
+ qmp = QMP('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ :param name: Optional nickname for the connection, used for logging.
+ """
+ #: Logger object for debugging messages
+ logger = logging.getLogger(__name__)
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+
+ # Greeting
+ self.await_greeting = True
+ self._greeting: Optional[Greeting] = None
+ self.greeting_timeout = 5 # (In seconds)
+
+ # RFC: Do I even want to use any timeouts internally? They're
+ # not defined in the protocol itself. Theoretically, a client
+ # could simply use asyncio.wait_for(qmp.connect(...), timeout=5)
+ # and then I don't have to support this interface at all.
+ #
+ # We don't need to support any timeouts so long as we never initiate
+ # any long-term wait that wasn't in direct response to a user action.
+
+ # Command ID counter
+ self._execute_id = 0
+
+ # Event handling
+ self._event_queue: asyncio.Queue[Message] = asyncio.Queue()
+ self._event_callbacks: List[_EventCallbackFn] = []
+
+ # Incoming RPC reply messages
+ self._pending: Dict[str, Tuple[
+ asyncio.Future[object],
+ asyncio.Queue[Message]]] = {}
+
+ def on_event(self, func: _EventCallbackFn) -> _EventCallbackFn:
+ """
+ FIXME: Quick hack: decorator to register event handlers.
+
+ Use it like this::
+
+ @qmp.on_event
+ async def my_event_handler(qmp, event: Message) -> None:
+ print(f"Received event: {event['event']}")
+
+ RFC: What kind of event handler would be the most useful in
+ practical terms? In tests, we are usually waiting for an
+ event with some criteria to occur; maybe it would be useful
+ to allow "coroutine" style functions where we can block
+ until a certain event shows up?
+ """
+ if func not in self._event_callbacks:
+ self._event_callbacks.append(func)
+ return func
+
+ async def _new_session(self, coro: Awaitable[None]) -> None:
+ self._event_queue = asyncio.Queue()
+ await super()._new_session(coro)
+
+ async def _on_connect(self) -> None:
+ """
+ Wait for the QMP greeting prior to the engagement of the full loop.
+
+ :raise: GreetingError when the greeting is not understood.
+ """
+ if self.await_greeting:
+ self._greeting = await self._get_greeting()
+
+ async def _on_start(self) -> None:
+ """
+ Perform QMP negotiation right after the loop starts.
+
+ Negotiation is performed afterwards so that the implementation
+ can simply use `execute()`, which relies on the loop machinery
+ to be running.
+
+ :raise: NegotiationError if the negotiation fails in some way.
+ """
+ await self._negotiate()
+
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise: GreetingError (Many causes.)
+ """
+ self.logger.debug("Awaiting greeting ...")
+ try:
+ msg = await asyncio.wait_for(self._recv(), self.greeting_timeout)
+ return Greeting.parse_msg(msg)
+ except Exception as err:
+ if isinstance(err, (asyncio.TimeoutError, OSError, EOFError)):
+ emsg = "Failed to receive Greeting"
+ elif isinstance(err, (DeserializationError, UnexpectedTypeError)):
+ emsg = "Failed to understand Greeting"
+ elif isinstance(err, ObjectTypeError):
+ emsg = "Failed to validate Greeting"
+ else:
+ emsg = "Unknown failure acquiring Greeting"
+
+ self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+
+ async def _negotiate(self) -> None:
+ """
+ :raise: NegotiationError (Many causes.)
+ """
+ self.logger.debug("Negotiating capabilities ...")
+ arguments: Dict[str, List[str]] = {'enable': []}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments['enable'].append('oob')
+ try:
+ await self.execute('qmp_capabilities', arguments=arguments)
+ except Exception as err:
+ # FIXME: what exceptions do we actually expect execute to raise?
+ emsg = "Failure negotiating capabilities"
+ self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+
+ async def _bh_disconnect(self) -> None:
+ # See AsyncProtocol._bh_disconnect().
+ await super()._bh_disconnect()
+
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ for key in self._pending:
+ self.logger.debug("Cancelling execution %s", key)
+ # NB: This signals cancellation, but doesn't fully quiesce;
+ # it merely requests the cancellation; it will be thrown into
+ # that tasks's context on the next event loop cycle.
+ #
+ # This task is being awaited on by `_execute()`, which will
+ # exist in the user's callstack in the upper-half. Since
+ # we're here, we know it isn't running! It won't have a
+ # chance to run again except to receive a cancellation.
+ #
+ # NB: Python 3.9 adds a msg= parameter to cancel that would
+ # be useful for debugging the 'cause' of cancellations.
+ self._pending[key][0].cancel()
+
+ self.logger.debug("QMP Disconnected.")
+
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+
+ :raise: RawProtocolError (`_recv` via `Message._deserialize`)
+ :raise: ServerParseError (Message has no 'event' nor 'id' field)
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_queue.put(msg)
+ # FIXME: quick hack; event queue handling.
+ for func in self._event_callbacks:
+ await func(self, msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+
+ if 'id' in msg:
+ exec_id = str(msg['id'])
+ if exec_id not in self._pending:
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.warning("Unknown ID '%s', response dropped.",
+ exec_id)
+ return
+ else:
+ # This is a server parsing error;
+ # It inherently does not "belong" to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ raise ServerParseError(
+ "Server sent a message without an ID,"
+ " indicating parse failure.", msg)
+
+ _, queue = self._pending[exec_id]
+ await queue.put(msg)
+
+ async def _do_recv(self) -> Message:
+ """
+ :raise: OSError (Stream errors)
+ :raise: `EOFError` (When the stream is at EOF)
+ :raise: `RawProtocolError` (via `Message._deserialize`)
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise: ValueError (JSON serialization failure)
+ :raise: TypeError (JSON serialization failure)
+ :raise: OSError (Stream errors)
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ def _cleanup(self) -> None:
+ super()._cleanup()
+ self._greeting = None
+ assert self._pending == {}
+ self._event_queue = asyncio.Queue()
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent by `execute_msg` later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If true, execute "out of band".
+
+ :return: An executable QMP message.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg
+
+ async def _bh_execute(self, msg: Message,
+ queue: 'asyncio.Queue[Message]') -> object:
+ """
+ Execute a QMP Message and wait for the result.
+
+ :param msg: Message to execute.
+ :param queue: The queue we should expect to see a reply delivered to.
+
+ :return: Execution result from the server.
+ The type depends on the command sent.
+ """
+ if not self.running:
+ raise StateError("QMP is not running.")
+ assert self._outgoing
+
+ self._outgoing.put_nowait(msg)
+ reply_msg = await queue.get()
+
+ # May raise ObjectTypeError (Unlikely - only if it has missing keys.)
+ reply = ServerResponse.parse_msg(reply_msg).__root__
+ assert not isinstance(reply, ParsingError) # Handled by BH
+
+ if isinstance(reply, ErrorResponse):
+ # Server indicated execution failure.
+ raise ExecuteError(msg, reply_msg, reply.error)
+
+ assert isinstance(reply, SuccessResponse)
+ return reply.return_
+
+ async def _execute(self, msg: Message) -> object:
+ """
+ The same as `execute_msg()`, but without safety mechanisms.
+
+ Does not assign an execution ID and does not check that the form
+ of the message being sent is valid.
+
+ This method *Requires* an 'id' parameter to be set on the
+ message, it will not set one for you like `execute()` or
+ `execute_msg()`.
+
+ Do not use "__aqmp#00000" style IDs, use something else to avoid
+ potential clashes. If this ID clashes with an ID presently
+ in-use or otherwise clashes with the auto-generated IDs, the
+ response routing mechanisms in _on_message may very well fail
+ loudly enough to cause the entire loop to crash.
+
+ The ID should be a str; or at least something JSON
+ serializable. It *must* be hashable.
+ """
+ exec_id = cast(str, msg['id'])
+ self.logger.debug("Execute(%s): '%s'", exec_id,
+ msg.get('execute', msg.get('exec-oob')))
+
+ queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
+ task = create_task(self._bh_execute(msg, queue))
+ self._pending[exec_id] = (task, queue)
+
+ try:
+ result = await task
+ except asyncio.CancelledError as err:
+ raise DisconnectedError("Disconnected") from err
+ finally:
+ del self._pending[exec_id]
+
+ return result
+
+ async def execute_msg(self, msg: Message) -> object:
+ """
+ Execute a QMP message and return the response.
+
+ :param msg: The QMP `Message` to execute.
+ :raises: ValueError if the QMP `Message` does not have either the
+ 'execute' or 'exec-oob' fields set.
+ :raises: ExecuteError if the server returns an error response.
+ :raises: DisconnectedError if the connection was terminated early.
+
+ :return: Execution response from the server. The type of object depends
+ on the command that was issued, though most return a dict.
+ """
+ if not ('execute' in msg or 'exec-oob' in msg):
+ raise ValueError("Requires 'execute' or 'exec-oob' message")
+ if self.disconnecting:
+ raise StateError("QMP is disconnecting/disconnected."
+ " Call disconnect() to fully disconnect.")
+
+ # FIXME: Copy the message here, to avoid leaking the ID back out.
+
+ exec_id = f"__aqmp#{self._execute_id:05d}"
+ msg['id'] = exec_id
+ self._execute_id += 1
+
+ return await self._execute(msg)
+
+ async def execute(self, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> object:
+ """
+ Execute a QMP command and return the response.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If true, execute "out of band".
+
+ :raise: ExecuteError if the server returns an error response.
+ :raise: DisconnectedError if the connection was terminated early.
+
+ :return: Execution response from the server. The type of object depends
+ on the command that was issued, though most return a dict.
+ """
+ # Note: I designed arguments to be its own argument instead of
+ # kwparams so that we are able to add other modifiers that
+ # change execution parameters later on. A theoretical
+ # higher-level API that is generated against a particular QAPI
+ # Schema should generate function signatures the way we want at
+ # that point; modifying those commands to behave differently
+ # could be performed using context managers that alter the QMP
+ # loop for any commands that occur within that block.
+ msg = self.make_execute_msg(cmd, arguments, oob=oob)
+ return await self.execute_msg(msg)
--
2.30.2
[PATCH RFC 7/7] linter config, John Snow, 2021/04/13
Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft, Stefan Hajnoczi, 2021/04/14