[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v7 09/13] qmp: Move dispatcher to a coroutine
From: |
Kevin Wolf |
Subject: |
Re: [PATCH v7 09/13] qmp: Move dispatcher to a coroutine |
Date: |
Fri, 25 Sep 2020 17:38:55 +0200 |
Am 14.09.2020 um 17:30 hat Markus Armbruster geschrieben:
> Kevin Wolf <kwolf@redhat.com> writes:
>
> > This moves the QMP dispatcher to a coroutine and runs all QMP command
> > handlers that declare 'coroutine': true in coroutine context so they
> > can avoid blocking the main loop while doing I/O or waiting for other
> > events.
> >
> > For commands that are not declared safe to run in a coroutine, the
> > dispatcher drops out of coroutine context by calling the QMP command
> > handler from a bottom half.
> >
> > Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> > Reviewed-by: Markus Armbruster <armbru@redhat.com>
> > ---
> > include/qapi/qmp/dispatch.h | 1 +
> > monitor/monitor-internal.h | 6 +-
> > monitor/monitor.c | 55 +++++++++++++---
> > monitor/qmp.c | 122 +++++++++++++++++++++++++++---------
> > qapi/qmp-dispatch.c | 61 ++++++++++++++++--
> > qapi/qmp-registry.c | 3 +
> > util/aio-posix.c | 8 ++-
> > 7 files changed, 210 insertions(+), 46 deletions(-)
> >
> > diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
> > index 9fd2b720a7..af8d96c570 100644
> > --- a/include/qapi/qmp/dispatch.h
> > +++ b/include/qapi/qmp/dispatch.h
> > @@ -31,6 +31,7 @@ typedef enum QmpCommandOptions
> > typedef struct QmpCommand
> > {
> > const char *name;
> > + /* Runs in coroutine context if QCO_COROUTINE is set */
> > QmpCommandFunc *fn;
> > QmpCommandOptions options;
> > QTAILQ_ENTRY(QmpCommand) node;
> > diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h
> > index b39e03b744..b55d6df07f 100644
> > --- a/monitor/monitor-internal.h
> > +++ b/monitor/monitor-internal.h
> > @@ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon)
> >
> > typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
> > extern IOThread *mon_iothread;
> > -extern QEMUBH *qmp_dispatcher_bh;
> > +extern Coroutine *qmp_dispatcher_co;
> > +extern bool qmp_dispatcher_co_shutdown;
> > +extern bool qmp_dispatcher_co_busy;
> > extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
> > extern QemuMutex monitor_lock;
> > extern MonitorList mon_list;
> > @@ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void);
> >
> > void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
> > void monitor_data_destroy_qmp(MonitorQMP *mon);
> > -void monitor_qmp_bh_dispatcher(void *data);
> > +void coroutine_fn monitor_qmp_dispatcher_co(void *data);
> >
> > int get_monitor_def(int64_t *pval, const char *name);
> > void help_cmd(Monitor *mon, const char *name);
> > diff --git a/monitor/monitor.c b/monitor/monitor.c
> > index 629aa073ee..ac2722bf91 100644
> > --- a/monitor/monitor.c
> > +++ b/monitor/monitor.c
> > @@ -55,8 +55,32 @@ typedef struct {
> > /* Shared monitor I/O thread */
> > IOThread *mon_iothread;
> >
> > -/* Bottom half to dispatch the requests received from I/O thread */
> > -QEMUBH *qmp_dispatcher_bh;
> > +/* Coroutine to dispatch the requests received from I/O thread */
> > +Coroutine *qmp_dispatcher_co;
> > +
> > +/* Set to true when the dispatcher coroutine should terminate */
> > +bool qmp_dispatcher_co_shutdown;
> > +
> > +/*
> > + * qmp_dispatcher_co_busy is used for synchronisation between the
> > + * monitor thread and the main thread to ensure that the dispatcher
> > + * coroutine never gets scheduled a second time when it's already
> > + * scheduled (scheduling the same coroutine twice is forbidden).
> > + *
> > + * It is true if the coroutine is active and processing requests.
> > + * Additional requests may then be pushed onto mon->qmp_requests,
> > + * and @qmp_dispatcher_co_shutdown may be set without further ado.
> > + * @qmp_dispatcher_co_busy must not be woken up in this case.
> > + *
> > + * If false, you also have to set @qmp_dispatcher_co_busy to true and
> > + * wake up @qmp_dispatcher_co after pushing the new requests.
> > + *
> > + * The coroutine will automatically change this variable back to false
> > + * before it yields. Nobody else may set the variable to false.
> > + *
> > + * Access must be atomic for thread safety.
> > + */
> > +bool qmp_dispatcher_co_busy;
> >
> > /*
> > * Protects mon_list, monitor_qapi_event_state, coroutine_mon,
> > @@ -623,9 +647,24 @@ void monitor_cleanup(void)
> > }
> > qemu_mutex_unlock(&monitor_lock);
> >
> > - /* QEMUBHs needs to be deleted before destroying the I/O thread */
> > - qemu_bh_delete(qmp_dispatcher_bh);
> > - qmp_dispatcher_bh = NULL;
> > + /*
> > + * The dispatcher needs to stop before destroying the I/O thread.
> > + *
> > + * We need to poll both qemu_aio_context and iohandler_ctx to make
> > + * sure that the dispatcher coroutine keeps making progress and
> > + * eventually terminates. qemu_aio_context is automatically
> > + * polled by calling AIO_WAIT_WHILE on it, but we must poll
> > + * iohandler_ctx manually.
> > + */
> > + qmp_dispatcher_co_shutdown = true;
> > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
> > + aio_co_wake(qmp_dispatcher_co);
> > + }
> > +
> > + AIO_WAIT_WHILE(qemu_get_aio_context(),
> > + (aio_poll(iohandler_get_aio_context(), false),
> > + atomic_mb_read(&qmp_dispatcher_co_busy)));
> > +
> > if (mon_iothread) {
> > iothread_destroy(mon_iothread);
> > mon_iothread = NULL;
> > @@ -649,9 +688,9 @@ void monitor_init_globals_core(void)
> > * have commands assuming that context. It would be nice to get
> > * rid of those assumptions.
> > */
> > - qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
> > - monitor_qmp_bh_dispatcher,
> > - NULL);
> > + qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co,
> > NULL);
> > + atomic_mb_set(&qmp_dispatcher_co_busy, true);
> > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> > }
> >
> > int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)
> > diff --git a/monitor/qmp.c b/monitor/qmp.c
> > index 922fdb5541..69f6e93f38 100644
> > --- a/monitor/qmp.c
> > +++ b/monitor/qmp.c
> > @@ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict
> > *rsp)
> > }
> > }
> >
> > +/*
> > + * Runs outside of coroutine context for OOB commands, but in
> > + * coroutine context for everything else.
> > + */
> > static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
> > {
> > QDict *rsp;
> > @@ -205,43 +209,99 @@ static QMPRequest
> > *monitor_qmp_requests_pop_any_with_lock(void)
> > return req_obj;
> > }
> >
> > -void monitor_qmp_bh_dispatcher(void *data)
> > +void coroutine_fn monitor_qmp_dispatcher_co(void *data)
> > {
> > - QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
> > + QMPRequest *req_obj = NULL;
> > QDict *rsp;
> > bool need_resume;
> > MonitorQMP *mon;
> >
> > - if (!req_obj) {
> > - return;
> > - }
> > + while (true) {
> > + assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
> >
> > - mon = req_obj->mon;
> > - /* qmp_oob_enabled() might change after "qmp_capabilities" */
> > - need_resume = !qmp_oob_enabled(mon) ||
> > - mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> > - qemu_mutex_unlock(&mon->qmp_queue_lock);
> > - if (req_obj->req) {
> > - QDict *qdict = qobject_to(QDict, req_obj->req);
> > - QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> > - trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> > - monitor_qmp_dispatch(mon, req_obj->req);
> > - } else {
> > - assert(req_obj->err);
> > - rsp = qmp_error_response(req_obj->err);
> > - req_obj->err = NULL;
> > - monitor_qmp_respond(mon, rsp);
> > - qobject_unref(rsp);
> > - }
> > + /*
> > + * Mark the dispatcher as not busy already here so that we
> > + * don't miss any new requests coming in the middle of our
> > + * processing.
> > + */
> > + atomic_mb_set(&qmp_dispatcher_co_busy, false);
> > +
> > + while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
> > + /*
> > + * No more requests to process. Wait to be reentered from
> > + * handle_qmp_command() when it pushes more requests, or
> > + * from monitor_cleanup() when it requests shutdown.
> > + */
> > + if (!qmp_dispatcher_co_shutdown) {
> > + qemu_coroutine_yield();
> > +
> > + /*
> > + * busy must be set to true again by whoever
> > + * rescheduled us to avoid double scheduling
> > + */
> > + assert(atomic_xchg(&qmp_dispatcher_co_busy, false) ==
> > true);
> > + }
> > +
> > + /*
> > + * qmp_dispatcher_co_shutdown may have changed if we
> > + * yielded and were reentered from monitor_cleanup()
> > + */
> > + if (qmp_dispatcher_co_shutdown) {
> > + return;
> > + }
> > + }
> >
> > - if (need_resume) {
> > - /* Pairs with the monitor_suspend() in handle_qmp_command() */
> > - monitor_resume(&mon->common);
> > - }
> > - qmp_request_free(req_obj);
> > + if (atomic_xchg(&qmp_dispatcher_co_busy, true) == true) {
> > + /*
> > + * Someone rescheduled us (probably because a new requests
> > + * came in), but we didn't actually yield. Do that now,
> > + * only to be immediately reentered and removed from the
> > + * list of scheduled coroutines.
> > + */
> > + qemu_coroutine_yield();
> > + }
> >
> > - /* Reschedule instead of looping so the main loop stays responsive */
> > - qemu_bh_schedule(qmp_dispatcher_bh);
> > + /*
> > + * Move the coroutine from iohandler_ctx to qemu_aio_context for
> > + * executing the command handler so that it can make progress if it
> > + * involves an AIO_WAIT_WHILE().
> > + */
> > + aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
> > + qemu_coroutine_yield();
> > +
> > + mon = req_obj->mon;
> > + /* qmp_oob_enabled() might change after "qmp_capabilities" */
> > + need_resume = !qmp_oob_enabled(mon) ||
> > + mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> > + qemu_mutex_unlock(&mon->qmp_queue_lock);
> > + if (req_obj->req) {
> > + QDict *qdict = qobject_to(QDict, req_obj->req);
> > + QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> > + trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> > + monitor_qmp_dispatch(mon, req_obj->req);
> > + } else {
> > + assert(req_obj->err);
> > + rsp = qmp_error_response(req_obj->err);
> > + req_obj->err = NULL;
> > + monitor_qmp_respond(mon, rsp);
> > + qobject_unref(rsp);
> > + }
> > +
> > + if (need_resume) {
> > + /* Pairs with the monitor_suspend() in handle_qmp_command() */
> > + monitor_resume(&mon->common);
> > + }
> > + qmp_request_free(req_obj);
> > +
> > + /*
> > + * Yield and reschedule so the main loop stays responsive.
> > + *
> > + * Move back to iohandler_ctx so that nested event loops for
> > + * qemu_aio_context don't start new monitor commands.
> > + */
> > + aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> > + qemu_coroutine_yield();
> > + }
> > }
> >
> > static void handle_qmp_command(void *opaque, QObject *req, Error *err)
> > @@ -302,7 +362,9 @@ static void handle_qmp_command(void *opaque, QObject
> > *req, Error *err)
> > qemu_mutex_unlock(&mon->qmp_queue_lock);
> >
> > /* Kick the dispatcher routine */
> > - qemu_bh_schedule(qmp_dispatcher_bh);
> > + if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
> > + aio_co_wake(qmp_dispatcher_co);
> > + }
> > }
> >
> > static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
> > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
> > index 5677ba92ca..754f7b854c 100644
> > --- a/qapi/qmp-dispatch.c
> > +++ b/qapi/qmp-dispatch.c
> > @@ -12,12 +12,16 @@
> > */
> >
> > #include "qemu/osdep.h"
> > +
> > +#include "block/aio.h"
> > #include "qapi/error.h"
> > #include "qapi/qmp/dispatch.h"
> > #include "qapi/qmp/qdict.h"
> > #include "qapi/qmp/qjson.h"
> > #include "sysemu/runstate.h"
> > #include "qapi/qmp/qbool.h"
> > +#include "qemu/coroutine.h"
> > +#include "qemu/main-loop.h"
> >
> > static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob,
> > Error **errp)
> > @@ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict)
> > && !qdict_haskey(dict, "execute");
> > }
> >
> > +typedef struct QmpDispatchBH {
> > + const QmpCommand *cmd;
> > + Monitor *cur_mon;
> > + QDict *args;
> > + QObject **ret;
> > + Error **errp;
> > + Coroutine *co;
> > +} QmpDispatchBH;
> > +
> > +static void do_qmp_dispatch_bh(void *opaque)
> > +{
> > + QmpDispatchBH *data = opaque;
> > +
> > + assert(monitor_cur() == NULL);
> > + monitor_set_cur(qemu_coroutine_self(), data->cur_mon);
> > + data->cmd->fn(data->args, data->ret, data->errp);
> > + monitor_set_cur(qemu_coroutine_self(), NULL);
> > + aio_co_wake(data->co);
> > +}
> > +
> > +/*
> > + * Runs outside of coroutine context for OOB commands, but in coroutine
> > + * context for everything else.
> > + */
> > QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
> > bool allow_oob, Monitor *cur_mon)
> > {
> > @@ -153,12 +181,35 @@ QDict *qmp_dispatch(const QmpCommandList *cmds,
> > QObject *request,
> > qobject_ref(args);
> > }
> >
> > + assert(!(oob && qemu_in_coroutine()));
> > assert(monitor_cur() == NULL);
> > - monitor_set_cur(qemu_coroutine_self(), cur_mon);
> > -
> > - cmd->fn(args, &ret, &err);
> > -
> > - monitor_set_cur(qemu_coroutine_self(), NULL);
> > + if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {
> > + monitor_set_cur(qemu_coroutine_self(), cur_mon);
> > + cmd->fn(args, &ret, &err);
> > + monitor_set_cur(qemu_coroutine_self(), NULL);
> > + } else {
> > + /*
> > + * Not being in coroutine context implies that we're handling
> > + * an OOB command, which must not have QCO_COROUTINE.
> > + *
> > + * This implies that we are in coroutine context, but the
> > + * command doesn't have QCO_COROUTINE. We must drop out of
> > + * coroutine context for this one.
> > + */
>
> I had to read this several times to get it. The first sentence leads me
> into coroutine context, and then the next sentence tells me the
> opposite, throwing me into confusion.
>
> Perhaps something like this:
>
> /*
> * Actual context doesn't match the one the command needs.
> * Case 1: we are in coroutine context, but command does not
> * have QCO_COROUTINE. We need to drop out of coroutine
> * context for executing it.
> * Case 2: we are outside coroutine context, but command has
> * QCO_COROUTINE. Can't actually happen, because we get here
> * outside coroutine context only when executing a command
> * out of band, and OOB commands never have QCO_COROUTINE.
> */
Works for me. Can you squash this in while applying?
Kevin
[PATCH v7 11/13] util/async: Add aio_co_reschedule_self(), Kevin Wolf, 2020/09/09
[PATCH v7 10/13] hmp: Add support for coroutine command handlers, Kevin Wolf, 2020/09/09
[PATCH v7 12/13] block: Add bdrv_co_move_to_aio_context(), Kevin Wolf, 2020/09/09