[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v4 17/27] python/aqmp: add QMP protocol support
From: |
John Snow |
Subject: |
[PATCH v4 17/27] python/aqmp: add QMP protocol support |
Date: |
Wed, 15 Sep 2021 12:29:45 -0400 |
The star of our show!
Add most of the QMP protocol, sans support for actually executing
commands. No problem, that happens in the next several commits.
Signed-off-by: John Snow <jsnow@redhat.com>
---
python/qemu/aqmp/__init__.py | 2 +
python/qemu/aqmp/qmp_client.py | 264 +++++++++++++++++++++++++++++++++
2 files changed, 266 insertions(+)
create mode 100644 python/qemu/aqmp/qmp_client.py
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 829166a2e2..d975c752ea 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -25,11 +25,13 @@
from .events import EventListener
from .message import Message
from .protocol import ConnectError, Runstate, StateError
+from .qmp_client import QMPClient
# The order of these fields impact the Sphinx documentation order.
__all__ = (
# Classes, most to least important
+ 'QMPClient',
'Message',
'EventListener',
'Runstate',
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
new file mode 100644
index 0000000000..000ff59c7a
--- /dev/null
+++ b/python/qemu/aqmp/qmp_client.py
@@ -0,0 +1,264 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import logging
+from typing import (
+ Dict,
+ List,
+ Mapping,
+ Optional,
+)
+
+from .error import ProtocolError
+from .events import Events
+from .message import Message
+from .models import Greeting
+from .protocol import AsyncProtocol
+from .util import (
+ bottom_half,
+ exception_summary,
+ pretty_traceback,
+ upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+ """
+ Abstract exception class for Protocol errors that wrap an Exception.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ self.exc = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+ """
+ An exception occurred during the Greeting phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class NegotiationError(_WrappedProtocolError):
+ """
+ An exception occurred during the Negotiation phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+ """
+ Implements a QMP client connection.
+
+ QMP can be used to establish a connection as either the transport
+ client or server, though this class always acts as the QMP client.
+
+ :param name: Optional nickname for the connection, used for logging.
+
+ Basic script-style usage looks like this::
+
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ Basic async client-style usage looks like this::
+
+ class Client:
+ def __init__(self, name: str):
+ self.qmp = QMPClient(name)
+
+ async def watch_events(self):
+ try:
+ async for event in self.qmp.events:
+ print(f"Event: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self, address='/tmp/qemu.socket'):
+ await self.qmp.connect(address)
+ asyncio.create_task(self.watch_events())
+ await self.qmp.runstate_changed.wait()
+ await self.disconnect()
+
+ See `aqmp.events` for more detail on event handling patterns.
+ """
+ #: Logger object used for debugging messages.
+ logger = logging.getLogger(__name__)
+
+ # Read buffer limit; large enough to accept query-qmp-schema
+ _limit = (256 * 1024)
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+ Events.__init__(self)
+
+ #: Whether or not to await a greeting after establishing a connection.
+ self.await_greeting: bool = True
+
+ #: Whether or not to perform capabilities negotiation upon connection.
+ #: Implies `await_greeting`.
+ self.negotiate: bool = True
+
+ # Cached Greeting, if one was awaited.
+ self._greeting: Optional[Greeting] = None
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Initiate the QMP session.
+
+ Wait for the QMP greeting and perform capabilities negotiation.
+
+ :raise GreetingError: When the greeting is not understood.
+ :raise NegotiationError: If the negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ if self.await_greeting or self.negotiate:
+ self._greeting = await self._get_greeting()
+
+ if self.negotiate:
+ await self._negotiate()
+
+ # This will start the reader/writers:
+ await super()._establish_session()
+
+ @upper_half
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise GreetingError: When the greeting is not understood.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+
+ :return: the Greeting object given by the server.
+ """
+ self.logger.debug("Awaiting greeting ...")
+
+ try:
+ msg = await self._recv()
+ return Greeting(msg)
+ except (ProtocolError, KeyError, TypeError) as err:
+ emsg = "Did not understand Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Failed to receive Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @upper_half
+ async def _negotiate(self) -> None:
+ """
+ Perform QMP capabilities negotiation.
+
+ :raise NegotiationError: When negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self.logger.debug("Negotiating capabilities ...")
+
+ arguments: Dict[str, List[str]] = {'enable': []}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments['enable'].append('oob')
+ msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+ # It's not safe to use execute() here, because the reader/writers
+ # aren't running. AsyncProtocol *requires* that a new session
+ # does not fail after the reader/writers are running!
+ try:
+ await self._send(msg)
+ reply = await self._recv()
+ assert 'return' in reply
+ assert 'error' not in reply
+ except (ProtocolError, AssertionError) as err:
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @bottom_half
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_dispatch(msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+ # ... Which we'll implement in the next commit!
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> Message:
+ """
+ :raise OSError: When a stream error is encountered.
+ :raise EOFError: When the stream is at EOF.
+ :raise ProtocolError:
+ When the Message is not understood.
+ See also `Message._deserialize`.
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise ValueError: JSON serialization failure
+ :raise TypeError: JSON serialization failure
+ :raise OSError: When a stream error is encountered.
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return: An executable QMP `Message`.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg
--
2.31.1
- [PATCH v4 05/27] python/aqmp: add generic async message-based protocol support, (continued)
- [PATCH v4 05/27] python/aqmp: add generic async message-based protocol support, John Snow, 2021/09/15
- [PATCH v4 07/27] python/aqmp: Add logging utility helpers, John Snow, 2021/09/15
- [PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol, John Snow, 2021/09/15
- [PATCH v4 09/27] python/aqmp: add AsyncProtocol.accept() method, John Snow, 2021/09/15
- [PATCH v4 10/27] python/aqmp: add configurable read buffer limit, John Snow, 2021/09/15
- [PATCH v4 11/27] python/aqmp: add _cb_inbound and _cb_outbound logging hooks, John Snow, 2021/09/15
- [PATCH v4 12/27] python/aqmp: add AsyncProtocol._readline() method, John Snow, 2021/09/15
- [PATCH v4 13/27] python/aqmp: add QMP Message format, John Snow, 2021/09/15
- [PATCH v4 14/27] python/aqmp: add well-known QMP object models, John Snow, 2021/09/15
- [PATCH v4 16/27] python/pylint: disable too-many-function-args, John Snow, 2021/09/15
- [PATCH v4 17/27] python/aqmp: add QMP protocol support,
John Snow <=
- [PATCH v4 19/27] python/aqmp: Add message routing to QMP protocol, John Snow, 2021/09/15
- [PATCH v4 20/27] python/aqmp: add execute() interfaces, John Snow, 2021/09/15
- [PATCH v4 15/27] python/aqmp: add QMP event support, John Snow, 2021/09/15
- [PATCH v4 18/27] python/pylint: disable no-member check, John Snow, 2021/09/15
- [PATCH v4 21/27] python/aqmp: add _raw() execution interface, John Snow, 2021/09/15
- [PATCH v4 23/27] python/aqmp: add scary message, John Snow, 2021/09/15
- [PATCH v4 22/27] python/aqmp: add asyncio_run compatibility wrapper, John Snow, 2021/09/15
- [PATCH v4 26/27] python/aqmp: add LineProtocol tests, John Snow, 2021/09/15
- [PATCH v4 27/27] python/aqmp: Add Coverage.py support, John Snow, 2021/09/15
- [PATCH v4 24/27] python: bump avocado to v90.0, John Snow, 2021/09/15