[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header
From: |
Fam Zheng |
Subject: |
Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads |
Date: |
Mon, 16 Jan 2017 20:52:08 +0800 |
User-agent: |
Mutt/1.7.1 (2016-10-04) |
On Fri, 01/13 14:17, Paolo Bonzini wrote:
> Read the replies from a coroutine, switching the read side between the
> "read header" coroutine and the I/O coroutine that reads the body of
> the reply.
>
> qio_channel_yield is used so that the right coroutine is restarted
> automatically, eliminating the need for send_coroutine in
> NBDClientSession.
>
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
> block/nbd-client.c | 108
> +++++++++++++++++++++--------------------------------
> block/nbd-client.h | 2 +-
> nbd/client.c | 2 +-
> nbd/common.c | 9 +----
> 4 files changed, 45 insertions(+), 76 deletions(-)
>
> @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
> client->ioc = NULL;
> }
>
> -static void nbd_reply_ready(void *opaque)
> +static void nbd_read_reply_entry(void *opaque)
> {
> - BlockDriverState *bs = opaque;
> - NBDClientSession *s = nbd_get_client_session(bs);
> + NBDClientSession *s = opaque;
> uint64_t i;
> int ret;
>
> - if (!s->ioc) { /* Already closed */
> - return;
> - }
> -
> - if (s->reply.handle == 0) {
> - /* No reply already in flight. Fetch a header. It is possible
> - * that another thread has done the same thing in parallel, so
> - * the socket is not readable anymore.
> - */
> + for (;;) {
> + assert(s->reply.handle == 0);
> ret = nbd_receive_reply(s->ioc, &s->reply);
> - if (ret == -EAGAIN) {
> - return;
> - }
> if (ret < 0) {
> - s->reply.handle = 0;
> - goto fail;
> + break;
> }
> - }
> -
> - /* There's no need for a mutex on the receive side, because the
> - * handler acts as a synchronization point and ensures that only
> - * one coroutine is called until the reply finishes. */
> - i = HANDLE_TO_INDEX(s, s->reply.handle);
> - if (i >= MAX_NBD_REQUESTS) {
> - goto fail;
> - }
>
> - if (s->recv_coroutine[i]) {
> - qemu_coroutine_enter(s->recv_coroutine[i]);
> - return;
> - }
> -
> -fail:
> - nbd_teardown_connection(bs);
> -}
> + /* There's no need for a mutex on the receive side, because the
> + * handler acts as a synchronization point and ensures that only
> + * one coroutine is called until the reply finishes.
> + */
> + i = HANDLE_TO_INDEX(s, s->reply.handle);
> + if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
> + break;
> + }
>
> -static void nbd_restart_write(void *opaque)
> -{
> - BlockDriverState *bs = opaque;
> + aio_co_wake(s->recv_coroutine[i]);
>
> - qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
> + /* We're woken up by the recv_coroutine itself. */
"Wait until we're woken by ..." ?
> + qemu_coroutine_yield();
> + }
> + s->read_reply_co = NULL;
> }
>
> static int nbd_co_send_request(BlockDriverState *bs,
> @@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
> QEMUIOVector *qiov)
> {
> NBDClientSession *s = nbd_get_client_session(bs);
> - AioContext *aio_context;
> int rc, ret, i;
>
> qemu_co_mutex_lock(&s->send_mutex);
> @@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
> return -EPIPE;
> }
>
> - s->send_coroutine = qemu_coroutine_self();
> - aio_context = bdrv_get_aio_context(bs);
> -
> - aio_set_fd_handler(aio_context, s->sioc->fd, false,
> - nbd_reply_ready, nbd_restart_write, NULL, bs);
> if (qiov) {
> qio_channel_set_cork(s->ioc, true);
> rc = nbd_send_request(s->ioc, request);
> @@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
> } else {
> rc = nbd_send_request(s->ioc, request);
> }
> - aio_set_fd_handler(aio_context, s->sioc->fd, false,
> - nbd_reply_ready, NULL, NULL, bs);
> - s->send_coroutine = NULL;
> qemu_co_mutex_unlock(&s->send_mutex);
> return rc;
> }
> @@ -174,8 +147,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
> {
> int ret;
>
> - /* Wait until we're woken up by the read handler. TODO: perhaps
> - * peek at the next reply and avoid yielding if it's ours? */
> + /* Wait until we're woken up by nbd_read_reply_entry. */
> qemu_coroutine_yield();
> *reply = s->reply;
> if (reply->handle != request->handle ||
> @@ -209,14 +181,18 @@ static void nbd_coroutine_start(NBDClientSession *s,
> /* s->recv_coroutine[i] is set as soon as we get the send_lock. */
> }
>
> -static void nbd_coroutine_end(NBDClientSession *s,
> +static void nbd_coroutine_end(BlockDriverState *bs,
> NBDRequest *request)
> {
> + NBDClientSession *s = nbd_get_client_session(bs);
> int i = HANDLE_TO_INDEX(s, request->handle);
> +
> s->recv_coroutine[i] = NULL;
> - if (s->in_flight-- == MAX_NBD_REQUESTS) {
> - qemu_co_queue_next(&s->free_sema);
> - }
> + s->in_flight--;
> + qemu_co_queue_next(&s->free_sema);
> +
> + /* Kick the read_reply_co to get the next reply. */
> + aio_co_wake(s->read_reply_co);
Can't s->read_reply_co be NULL? nbd_read_reply_entry unsets it. (Surprisingly
this file is rather unfamiliar to me, it's possible I'm missing something.)
> }
>
> int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
> void nbd_client_attach_aio_context(BlockDriverState *bs,
> AioContext *new_context)
> {
> - aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
> - false, nbd_reply_ready, NULL, NULL, bs);
> + NBDClientSession *client = nbd_get_client_session(bs);
> + qio_channel_set_aio_context(QIO_CHANNEL(client->sioc), new_context);
> + aio_co_schedule(new_context, client->read_reply_co);
Like above, is client->read_reply_co possibly NULL?
> }
>
> void nbd_client_close(BlockDriverState *bs)
> @@ -434,7 +410,7 @@ int nbd_client_init(BlockDriverState *bs,
> /* Now that we're connected, set the socket to be non-blocking and
> * kick the reply mechanism. */
> qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
> -
> + client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry,
> client);
> nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
>
> logout("Established connection with NBD server\n");
> diff --git a/block/nbd-client.h b/block/nbd-client.h
> index f8d6006..8cdfc92 100644
> --- a/block/nbd-client.h
> +++ b/block/nbd-client.h
> @@ -25,7 +25,7 @@ typedef struct NBDClientSession {
>
> CoMutex send_mutex;
> CoQueue free_sema;
> - Coroutine *send_coroutine;
> + Coroutine *read_reply_co;
> int in_flight;
>
> Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
> diff --git a/nbd/client.c b/nbd/client.c
> index ffb0743..5c9dee3 100644
> --- a/nbd/client.c
> +++ b/nbd/client.c
> @@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply
> *reply)
> ssize_t ret;
>
> ret = read_sync(ioc, buf, sizeof(buf));
> - if (ret < 0) {
> + if (ret <= 0) {
Not sure why this belongs to this patch, but it also looks harmless.
> return ret;
> }
>
Fam
- Re: [Qemu-devel] [PATCH 04/16] io: add methods to set I/O handlers on AioContext, (continued)
- [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads, Paolo Bonzini, 2017/01/13
- [Qemu-devel] [PATCH 08/16] qed: introduce qed_aio_start_io and qed_aio_next_io_cb, Paolo Bonzini, 2017/01/13
- [Qemu-devel] [PATCH 09/16] aio: push aio_context_acquire/release down to dispatching, Paolo Bonzini, 2017/01/13
- [Qemu-devel] [PATCH 15/16] async: remove unnecessary inc/dec pairs, Paolo Bonzini, 2017/01/13
- [Qemu-devel] [PATCH 13/16] block: explicitly acquire aiocontext in aio callbacks that need it, Paolo Bonzini, 2017/01/13