[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH 2/2] nbd/server: Use drained block ops to quiesce the server
From: |
Kevin Wolf |
Subject: |
Re: [PATCH 2/2] nbd/server: Use drained block ops to quiesce the server |
Date: |
Tue, 1 Jun 2021 18:08:41 +0200 |
Am 01.06.2021 um 07:57 hat Sergio Lopez geschrieben:
> Before switching between AioContexts we need to make sure that we're
> fully quiesced ("nb_requests == 0" for every client) when entering the
> drained section.
>
> To do this, we set "quiescing = true" for every client on
> ".drained_begin" to prevent new coroutines to be created, and check if
> "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
> drained section, on ".drained_end" we set "quiescing = false" and
> call "nbd_client_receive_next_request()" to resume the processing of
> new requests.
>
> With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
> reverted to be as simple as they were before f148ae7d36.
>
> RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137
> Suggested-by: Kevin Wolf <kwolf@redhat.com>
> Signed-off-by: Sergio Lopez <slp@redhat.com>
> ---
> nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++-------------
> 1 file changed, 75 insertions(+), 24 deletions(-)
>
> diff --git a/nbd/server.c b/nbd/server.c
> index 86a44a9b41..33e55479d7 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -132,7 +132,7 @@ struct NBDClient {
> CoMutex send_lock;
> Coroutine *send_coroutine;
>
> - bool read_yielding;
> + GSList *yield_co_list; /* List of coroutines yielding on nbd_read_eof */
> bool quiescing;
Hm, how do you get more than one coroutine per client yielding in
nbd_read_eof() at the same time? I thought the model is that you always
have one coroutine reading the next request (which is
client->recv_coroutine) and all the others are just processing the
request they had read earlier. Multiple coroutines reading from the
same socket would sound like a bad idea.
> QTAILQ_ENTRY(NBDClient) next;
> @@ -1367,6 +1367,7 @@ static inline int coroutine_fn
> nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> {
> bool partial = false;
> + Coroutine *co;
>
> assert(size);
> while (size > 0) {
> @@ -1375,9 +1376,12 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t
> size, Error **errp)
>
> len = qio_channel_readv(client->ioc, &iov, 1, errp);
> if (len == QIO_CHANNEL_ERR_BLOCK) {
> - client->read_yielding = true;
> + co = qemu_coroutine_self();
> +
> + client->yield_co_list = g_slist_prepend(client->yield_co_list,
> co);
> qio_channel_yield(client->ioc, G_IO_IN);
> - client->read_yielding = false;
> + client->yield_co_list = g_slist_remove(client->yield_co_list,
> co);
> +
> if (client->quiescing) {
> return -EAGAIN;
> }
> @@ -1513,6 +1517,11 @@ static void nbd_request_put(NBDRequestData *req)
> g_free(req);
>
> client->nb_requests--;
> +
> + if (client->quiescing && client->nb_requests == 0) {
> + aio_wait_kick();
> + }
> +
> nbd_client_receive_next_request(client);
>
> nbd_client_put(client);
> @@ -1530,49 +1539,75 @@ static void blk_aio_attached(AioContext *ctx, void
> *opaque)
> QTAILQ_FOREACH(client, &exp->clients, next) {
> qio_channel_attach_aio_context(client->ioc, ctx);
>
> + assert(client->nb_requests == 0);
> assert(client->recv_coroutine == NULL);
> assert(client->send_coroutine == NULL);
> -
> - if (client->quiescing) {
> - client->quiescing = false;
> - nbd_client_receive_next_request(client);
> - }
> }
> }
>
> -static void nbd_aio_detach_bh(void *opaque)
> +static void blk_aio_detach(void *opaque)
> {
> NBDExport *exp = opaque;
> NBDClient *client;
>
> + trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> +
> QTAILQ_FOREACH(client, &exp->clients, next) {
> qio_channel_detach_aio_context(client->ioc);
> + }
> +
> + exp->common.ctx = NULL;
> +}
> +
> +static void nbd_drained_begin(void *opaque)
> +{
> + NBDExport *exp = opaque;
> + NBDClient *client;
> +
> + QTAILQ_FOREACH(client, &exp->clients, next) {
> client->quiescing = true;
> + }
> +}
>
> - if (client->recv_coroutine) {
> - if (client->read_yielding) {
> - qemu_aio_coroutine_enter(exp->common.ctx,
> - client->recv_coroutine);
> - } else {
> - AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine !=
> NULL);
> - }
> - }
> +static void nbd_drained_end(void *opaque)
> +{
> + NBDExport *exp = opaque;
> + NBDClient *client;
>
> - if (client->send_coroutine) {
> - AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
> - }
> + QTAILQ_FOREACH(client, &exp->clients, next) {
> + client->quiescing = false;
> + nbd_client_receive_next_request(client);
> }
> }
>
> -static void blk_aio_detach(void *opaque)
> +static bool nbd_drained_poll(void *opaque)
> {
> NBDExport *exp = opaque;
> + NBDClient *client;
> + Coroutine *co;
> + GSList *entry;
> + GSList *coroutine_list;
>
> - trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> + QTAILQ_FOREACH(client, &exp->clients, next) {
> + if (client->nb_requests != 0) {
> + /*
> + * Enter coroutines waiting for new requests on nbd_read_eof(),
> so
> + * we don't depend on the client to wake us up.
> + */
> + coroutine_list = g_slist_copy(client->yield_co_list);
> + for (entry = coroutine_list;
> + entry != NULL;
> + entry = g_slist_next(entry)) {
> + co = entry->data;
> + qemu_aio_coroutine_enter(exp->common.ctx, co);
> + }
> + g_slist_free(coroutine_list);
>
> - aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
> + return 1;
This would be more accurately spelt true...
> + }
> + }
>
> - exp->common.ctx = NULL;
> + return 0;
...and this false.
> }
>
> static void nbd_eject_notifier(Notifier *n, void *data)
The patch looks correct to me, though I'm not sure if yield_co_list is
an unnecessary complication (and if it isn't, whether that's safe).
I would be happy enough to apply it anyway if you can explain the
yield_co_list thing, but I'll give Eric some time to have a look, too.
Kevin