qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch


From: Ming Lei
Subject: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch
Date: Thu, 6 Nov 2014 23:09:59 +0800

In the enqueue path, we can't complete request, otherwise
"Co-routine re-entered recursively" may be caused, so this
patch fixes the issue with below ideas:

        - for -EAGAIN or partial completion, retry the submision by
        schedule an BH in following completion cb
        - for part of completion, also update the io queue
        - for other failure, return the failure if in enqueue path,
        otherwise, abort all queued I/O

Signed-off-by: Ming Lei <address@hidden>
---
 block/linux-aio.c |  101 +++++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 79 insertions(+), 22 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..f66e8ad 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,19 @@ struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
     struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
     unsigned int size;
     unsigned int idx;
+
+    /* handle -EAGAIN and partial completion */
+    QEMUBH *retry;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque)
     }
 }
 
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+    if (s->io_q.idx)
+        qemu_bh_schedule(s->io_q.retry);
+}
+
 static void qemu_laio_completion_cb(EventNotifier *e)
 {
     struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
@@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
     if (event_notifier_test_and_clear(&s->e)) {
         qemu_bh_schedule(s->completion_bh);
     }
+    qemu_laio_start_retry(s);
 }
 
 static void laio_cancel(BlockAIOCB *blockacb)
@@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb)
     }
 
     laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
+
+    /* check if there are requests in io queue */
+    qemu_laio_start_retry(laiocb->ctx);
 }
 
 static const AIOCBInfo laio_aiocb_info = {
@@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q)
     io_q->plugged = 0;
 }
 
-static int ioq_submit(struct qemu_laio_state *s)
+static void abort_queue(struct qemu_laio_state *s)
+{
+    int i;
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = -EIO;
+        qemu_laio_process_completion(s, laiocb);
+    }
+}
+
+static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
 {
     int ret, i = 0;
     int len = s->io_q.idx;
+    int j = 0;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
+    if (!len) {
+        return 0;
+    }
 
-    /* empty io queue */
-    s->io_q.idx = 0;
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
+    if (ret == -EAGAIN) { /* retry in following completion cb */
+        return 0;
+    } else if (ret < 0) {
+        if (enqueue) {
+            return ret;
+        }
 
-    if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* in non-queue path, all IOs have to be completed */
+        abort_queue(s);
+        ret = len;
+    } else if (ret == 0) {
+        goto out;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
-        laiocb->ret = (ret < 0) ? ret : -EIO;
-        qemu_laio_process_completion(s, laiocb);
+    for (i = ret; i < len; i++) {
+        s->io_q.iocbs[j++] = s->io_q.iocbs[i];
     }
+
+ out:
+    /*
+     * update io queue, for partial completion, retry will be
+     * started automatically in following completion cb.
+     */
+    s->io_q.idx -= ret;
+
     return ret;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static void ioq_submit_retry(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    ioq_submit(s, false);
+}
+
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    if (unlikely(idx == s->io_q.size)) {
+        return -1;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
-        ioq_submit(s);
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
+        return ioq_submit(s, true);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, 
bool unplug)
     }
 
     if (s->io_q.idx > 0) {
-        ret = ioq_submit(s);
+        ret = ioq_submit(s, false);
     }
 
     return ret;
@@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void 
*aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -296,12 +351,14 @@ void laio_detach_aio_context(void *s_, AioContext 
*old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    qemu_bh_delete(s->io_q.retry);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s);
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
 }
-- 
1.7.9.5




reply via email to

[Prev in Thread] Current Thread [Next in Thread]