qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v2 1/4] linux-aio: fix submit aio as a batch


From: Benoît Canet
Subject: Re: [Qemu-devel] [PATCH v2 1/4] linux-aio: fix submit aio as a batch
Date: Tue, 9 Sep 2014 16:53:57 +0200
User-agent: Mutt/1.5.23 (2014-03-12)

The Friday 05 Sep 2014 à 00:27:07 (+0800), Ming Lei wrote :
> In the enqueue path, we can't complete request, otherwise
> "Co-routine re-entered recursively" may be caused, so this
> patch fixes the issue with the following ideas:
> 
>       - for -EAGAIN or partial completion, retry the submission by
>       scheduling a BH in following completion cb
>       - for part of completion, also update the io queue
>       - for other failure, return the failure if in enqueue path,
>       otherwise, abort all queued I/O
> 
> Signed-off-by: Ming Lei <address@hidden>
> ---
>  block/linux-aio.c |  106 
> ++++++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 81 insertions(+), 25 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index 9aca758..a06576d 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -38,11 +38,19 @@ struct qemu_laiocb {
>      QLIST_ENTRY(qemu_laiocb) node;
>  };
>  
> -typedef struct {
> +/*
> + * TODO: support to batch I/O from multiple bs in one same
> + * AIO context, one important use case is multi-lun scsi,
> + * so in future the IO queue should be per AIO context.
> + */
> +typedef struct LaioQueue {
>      struct iocb *iocbs[MAX_QUEUED_IO];
>      int plugged;
> -    unsigned int size;
> -    unsigned int idx;
> +    uint32 size;
> +    uint32 idx;

Sorry Ming I said crap about struct, size and idx.
I initially misread that you where adding this.
You where right from the start.

> +
> +    /* handle -EAGAIN and partial completion */
> +    QEMUBH *retry;
>  } LaioQueue;
>  
>  struct qemu_laio_state {
> @@ -138,6 +146,13 @@ static void qemu_laio_completion_bh(void *opaque)
>      }
>  }
>  
> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
> +{
> +    if (s->io_q.idx) {
> +        qemu_bh_schedule(s->io_q.retry);
> +    }
> +}
> +
>  static void qemu_laio_completion_cb(EventNotifier *e)
>  {
>      struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
> @@ -145,6 +160,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
>      if (event_notifier_test_and_clear(&s->e)) {
>          qemu_bh_schedule(s->completion_bh);
>      }
> +    qemu_laio_start_retry(s);
>  }
>  
>  static void laio_cancel(BlockDriverAIOCB *blockacb)
> @@ -164,6 +180,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
>      ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
>      if (ret == 0) {
>          laiocb->ret = -ECANCELED;
> +        qemu_laio_start_retry(laiocb->ctx);
>          return;
>      }
>  
> @@ -191,45 +208,80 @@ static void ioq_init(LaioQueue *io_q)
>      io_q->plugged = 0;
>  }
>  
> -static int ioq_submit(struct qemu_laio_state *s)
> +static void abort_queue(struct qemu_laio_state *s)
> +{
> +    int i;
> +    for (i = 0; i < s->io_q.idx; i++) {
> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> +                                                  struct qemu_laiocb,
> +                                                  iocb);
> +        laiocb->ret = -EIO;
> +        qemu_laio_process_completion(s, laiocb);
> +    }
> +}
> +
> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>  {
>      int ret, i = 0;
>      int len = s->io_q.idx;
> +    int j = 0;
>  
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> +    if (!len) {
> +        return 0;
> +    }
>  
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
> +    if (ret == -EAGAIN) { /* retry in following completion cb */
> +        return 0;
> +    } else if (ret < 0) {
> +        if (enqueue) {
> +            return ret;
> +        }
>  
> -    if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        /* in non-queue path, all IOs have to be completed */
> +        abort_queue(s);
> +        ret = len;
> +    } else if (ret == 0) {
> +        goto out;
>      }
>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
> -
> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> -        qemu_laio_process_completion(s, laiocb);
> +    for (i = ret; i < len; i++) {
> +        s->io_q.iocbs[j++] = s->io_q.iocbs[i];
>      }
> +
> + out:
> +    /*
> +     * update io queue, for partial completion, retry will be
> +     * started automatically in following completion cb.
> +     */
> +    s->io_q.idx -= ret;
> +
>      return ret;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static void ioq_submit_retry(void *opaque)
> +{
> +    struct qemu_laio_state *s = opaque;
> +    ioq_submit(s, false);
> +}
> +
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>  {
>      unsigned int idx = s->io_q.idx;
>  
> +    if (unlikely(idx == s->io_q.size)) {
> +        return -1;
> +    }
> +
>      s->io_q.iocbs[idx++] = iocb;
>      s->io_q.idx = idx;
>  
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> -        ioq_submit(s);
> +    /* submit immediately if queue depth is above 2/3 */
> +    if (idx > s->io_q.size * 2 / 3) {
> +        return ioq_submit(s, true);
>      }
> +
> +    return 0;
>  }
>  
>  void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
> @@ -251,7 +303,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, 
> bool unplug)
>      }
>  
>      if (s->io_q.idx > 0) {
> -        ret = ioq_submit(s);
> +        ret = ioq_submit(s, false);
>      }
>  
>      return ret;
> @@ -295,7 +347,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void 
> *aio_ctx, int fd,
>              goto out_free_aiocb;
>          }
>      } else {
> -        ioq_enqueue(s, iocbs);
> +        if (ioq_enqueue(s, iocbs) < 0) {
> +            goto out_free_aiocb;
> +        }
>      }
>      return &laiocb->common;
>  
> @@ -310,12 +364,14 @@ void laio_detach_aio_context(void *s_, AioContext 
> *old_context)
>  
>      aio_set_event_notifier(old_context, &s->e, NULL);
>      qemu_bh_delete(s->completion_bh);
> +    qemu_bh_delete(s->io_q.retry);
>  }
>  
>  void laio_attach_aio_context(void *s_, AioContext *new_context)
>  {
>      struct qemu_laio_state *s = s_;
>  
> +    s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s);
>      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
>      aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
>  }
> -- 
> 1.7.9.5
> 
> 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]