qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch


From: Ming Lei
Subject: Re: [Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch
Date: Fri, 28 Nov 2014 10:16:41 +0800

On Wed, Nov 26, 2014 at 7:18 PM, Kevin Wolf <address@hidden> wrote:
> Am 25.11.2014 um 08:23 hat Ming Lei geschrieben:
>> In the submit path, we can't complete request directly,
>> otherwise "Co-routine re-entered recursively" may be caused,
>> so this patch fixes the issue with below ideas:
>>
>>       - for -EAGAIN or partial completion, retry the submision
>>       in following completion cb which is run in BH context
>>       - for part of completion, update the io queue too
>>       - for case of io queue full, submit queued requests
>>       immediatelly and return failure to caller
>>       - for other failure, abort all queued requests in BH
>>     context, and requests won't be allow to submit until
>>     aborting is handled
>>
>> Reviewed-by: Paolo Bonzini <address@hidden>
>> Signed-off-by: Ming Lei <address@hidden>
>
> This looks like a quite complex fix to this problem, and it introduces
> new error cases, too (while aborting, requests fail now, but they really
> should just be waiting).

"Co-routine re-entered recursively" is only triggered when io_submit()
returns failure(either -EAGAIN or other error) or partial completion, so
this patch actually is for handling failure of io_submit() and making
linux-aio more reliably.

After io queue is introduce, it is a bit easy to trigger the failure, especially
in case of multi-queue, or VM driver sets a longer queue depth. So all
cases(EAGAIN, other failure and partial completion) have to be
considered too.

So it doesn't mean a complex fix, and the actual problem is complex too.
Not mention previously linux aio doesn't handle -EAGAIN.

>
> I'm wondering if this is the time to convert the linux-aio interface to
> coroutines finally. It wouldn't only be a performance optimisation, but
> would potentially also simplify this code.

Even with coroutine, the above io_submit() issues has to be considered
too.

>
>>  block/linux-aio.c |  114 
>> ++++++++++++++++++++++++++++++++++++++++++++---------
>>  1 file changed, 95 insertions(+), 19 deletions(-)
>>
>> diff --git a/block/linux-aio.c b/block/linux-aio.c
>> index d92513b..11ac828 100644
>> --- a/block/linux-aio.c
>> +++ b/block/linux-aio.c
>> @@ -38,11 +38,20 @@ struct qemu_laiocb {
>>      QLIST_ENTRY(qemu_laiocb) node;
>>  };
>>
>> +/*
>> + * TODO: support to batch I/O from multiple bs in one same
>> + * AIO context, one important use case is multi-lun scsi,
>> + * so in future the IO queue should be per AIO context.
>> + */
>>  typedef struct {
>>      struct iocb *iocbs[MAX_QUEUED_IO];
>>      int plugged;
>>      unsigned int size;
>>      unsigned int idx;
>> +
>> +    /* abort queued requests in BH context */
>> +    QEMUBH *abort_bh;
>> +    bool  aborting;
>
> Two spaces.

OK.

>
>>  } LaioQueue;
>>
>>  struct qemu_laio_state {
>> @@ -59,6 +68,8 @@ struct qemu_laio_state {
>>      int event_max;
>>  };
>>
>> +static int ioq_submit(struct qemu_laio_state *s);
>> +
>>  static inline ssize_t io_event_ret(struct io_event *ev)
>>  {
>>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
>> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct 
>> qemu_laio_state *s,
>>      qemu_aio_unref(laiocb);
>>  }
>>
>> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
>> +{
>> +    if (s->io_q.idx) {
>> +        ioq_submit(s);
>> +    }
>> +}
>> +
>>  /* The completion BH fetches completed I/O requests and invokes their
>>   * callbacks.
>>   *
>> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
>>
>>          qemu_laio_process_completion(s, laiocb);
>>      }
>> +
>> +    qemu_laio_start_retry(s);
>>  }
>
> Why is qemu_laio_start_retry() a separate function? This is the only
> caller.

OK.

>
>>  static void qemu_laio_completion_cb(EventNotifier *e)
>> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
>>      io_q->size = MAX_QUEUED_IO;
>>      io_q->idx = 0;
>>      io_q->plugged = 0;
>> +    io_q->aborting = false;
>>  }
>>
>> +/* Always return >= 0 and it means how many requests are submitted */
>>  static int ioq_submit(struct qemu_laio_state *s)
>>  {
>> -    int ret, i = 0;
>> +    int ret;
>>      int len = s->io_q.idx;
>>
>> -    do {
>> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> -    } while (i++ < 3 && ret == -EAGAIN);
>> -
>> -    /* empty io queue */
>> -    s->io_q.idx = 0;
>> +    if (!len) {
>> +        return 0;
>> +    }
>>
>> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>>      if (ret < 0) {
>> -        i = 0;
>> -    } else {
>> -        i = ret;
>> +        /* retry in following completion cb */
>> +        if (ret == -EAGAIN) {
>> +            return 0;
>> +        }
>> +
>> +        /*
>> +         * Abort in BH context for avoiding Co-routine re-entered,
>> +         * and update io queue at that time
>> +         */
>> +        qemu_bh_schedule(s->io_q.abort_bh);
>> +        s->io_q.aborting = true;
>> +        ret = 0;
>>      }
>>
>> -    for (; i < len; i++) {
>> -        struct qemu_laiocb *laiocb =
>> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
>
>> +    /*
>> +     * update io queue, and retry will be started automatically
>> +     * in following completion cb for the remainder
>> +     */
>> +    if (ret > 0) {
>> +        if (ret < len) {
>> +            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
>> +                    (len - ret) * sizeof(struct iocb *));
>> +        }
>> +        s->io_q.idx -= ret;
>> +    }
>
> Support for partly handled queues is nice, but a logically separate
> change. Please move this to its own patch.

As I described above, the issue of "Co-routine re-entered recursively"
is triggered in current handling of partly completion.

>
>> -        laiocb->ret = (ret < 0) ? ret : -EIO;
>> +    return ret;
>> +}
>> +
>> +static void ioq_abort_bh(void *opaque)
>> +{
>> +    struct qemu_laio_state *s = opaque;
>> +    int i;
>> +
>> +    for (i = 0; i < s->io_q.idx; i++) {
>> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>> +                                                  struct qemu_laiocb,
>> +                                                  iocb);
>> +        laiocb->ret = -EIO;
>
> We shouldn't be throwing the real error code away.

OK, will do that.

>
>>          qemu_laio_process_completion(s, laiocb);
>>      }
>> -    return ret;
>> +
>> +    s->io_q.idx = 0;
>> +    s->io_q.aborting = false;
>>  }
>>
>> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>>  {
>>      unsigned int idx = s->io_q.idx;
>>
>> +    /* Request can't be allowed to submit until aborting is handled */
>> +    if (unlikely(s->io_q.aborting)) {
>> +        return -EIO;
>> +    }
>> +
>> +    if (unlikely(idx == s->io_q.size)) {
>> +        ioq_submit(s);
>> +
>> +        if (unlikely(s->io_q.aborting)) {
>> +            return -EIO;
>> +        }
>> +        idx = s->io_q.idx;
>> +    }
>> +
>> +    /* It has to return now if queue is still full */
>> +    if (unlikely(idx == s->io_q.size)) {
>> +        return -EAGAIN;
>> +    }
>> +
>>      s->io_q.iocbs[idx++] = iocb;
>>      s->io_q.idx = idx;
>>
>> -    /* submit immediately if queue is full */
>> -    if (idx == s->io_q.size) {
>> +    /* submit immediately if queue depth is above 2/3 */
>> +    if (idx > s->io_q.size * 2 / 3) {
>>          ioq_submit(s);
>>      }
>
> This change looks independent as well. Also isn't mentioned in the
> commit message. Why is it a good idea to use only 2/3 of the queue
> instead of making use of its full length?

When queue is full, the new submission has to return failure, so it is
used for making linux-aio more reliable.


Thanks,
Ming Lei



reply via email to

[Prev in Thread] Current Thread [Next in Thread]