qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 1/4] linux-aio: queue requests that cannot be su


From: Kevin Wolf
Subject: Re: [Qemu-devel] [PATCH 1/4] linux-aio: queue requests that cannot be submitted
Date: Thu, 11 Dec 2014 13:49:56 +0100
User-agent: Mutt/1.5.21 (2010-09-15)

Am 10.12.2014 um 15:51 hat Paolo Bonzini geschrieben:
> Keep a queue of requests that were not submitted; pass them to
> the kernel when a completion is reported, unless the queue is
> plugged.
> 
> The array of iocbs is rebuilt every time from scratch.  This
> avoids keeping the iocbs array and list synchronized.
> 
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
>  block/linux-aio.c | 75 
> ++++++++++++++++++++++++-------------------------------
>  1 file changed, 33 insertions(+), 42 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index d92513b..b6fbfd8 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -35,14 +35,13 @@ struct qemu_laiocb {
>      size_t nbytes;
>      QEMUIOVector *qiov;
>      bool is_read;
> -    QLIST_ENTRY(qemu_laiocb) node;
> +    QSIMPLEQ_ENTRY(qemu_laiocb) next;
>  };
>  
>  typedef struct {
> -    struct iocb *iocbs[MAX_QUEUED_IO];
>      int plugged;
> -    unsigned int size;
>      unsigned int idx;
> +    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
>  } LaioQueue;
>  
>  struct qemu_laio_state {
> @@ -59,6 +58,8 @@ struct qemu_laio_state {
>      int event_max;
>  };
>  
> +static int ioq_submit(struct qemu_laio_state *s);
> +
>  static inline ssize_t io_event_ret(struct io_event *ev)
>  {
>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
> @@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque)
>  
>          qemu_laio_process_completion(s, laiocb);
>      }
> +
> +    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
> +        ioq_submit(s);
> +    }
>  }
>  
>  static void qemu_laio_completion_cb(EventNotifier *e)
> @@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = {
>  
>  static void ioq_init(LaioQueue *io_q)
>  {
> -    io_q->size = MAX_QUEUED_IO;
> -    io_q->idx = 0;
> +    QSIMPLEQ_INIT(&io_q->pending);
>      io_q->plugged = 0;
> +    io_q->idx = 0;
>  }
>  
>  static int ioq_submit(struct qemu_laio_state *s)
>  {
> -    int ret, i = 0;
> -    int len = s->io_q.idx;
> -
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> +    int ret, i;
> +    int len = 0;
> +    struct qemu_laiocb *aiocb;
> +    struct iocb *iocbs[MAX_QUEUED_IO];
>  
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
> +        iocbs[len++] = &aiocb->iocb;
> +        if (len == MAX_QUEUED_IO) {
> +            break;
> +        }
> +    }
>  
> +    ret = io_submit(s->ctx, len, iocbs);
> +    if (ret == -EAGAIN) {
> +        ret = 0;
> +    }
>      if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        abort();
>      }

abort() doesn't feel right here.

>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
> -
> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> -        qemu_laio_process_completion(s, laiocb);
> +    for (i = 0; i < ret; i++) {
> +        s->io_q.idx--;
> +        QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
>      }
>      return ret;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> -{
> -    unsigned int idx = s->io_q.idx;
> -
> -    s->io_q.iocbs[idx++] = iocb;
> -    s->io_q.idx = idx;
> -
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> -        ioq_submit(s);
> -    }
> -}
> -
>  void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
>  {
>      struct qemu_laio_state *s = aio_ctx;
> @@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, 
> bool unplug)
>          return 0;
>      }
>  
> -    if (s->io_q.idx > 0) {
> +    if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
>          ret = ioq_submit(s);
>      }
>  
> @@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void 
> *aio_ctx, int fd,
>      }
>      io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
>  
> -    if (!s->io_q.plugged) {
> -        if (io_submit(s->ctx, 1, &iocbs) < 0) {
> -            goto out_free_aiocb;
> -        }
> -    } else {
> -        ioq_enqueue(s, iocbs);
> +    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
> +    s->io_q.idx++;
> +    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {

More naturally written and more obviously correct as (!s->io_q,plugged ||
s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch
converts it to, so I won't spend much time thinking about whether this
version is actually right.

> +        ioq_submit(s);
>      }
>      return &laiocb->common;

Kevin



reply via email to

[Prev in Thread] Current Thread [Next in Thread]