qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch


From: Ming Lei
Subject: [Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch
Date: Tue, 25 Nov 2014 15:23:11 +0800

In the submit path, we can't complete request directly,
otherwise "Co-routine re-entered recursively" may be caused,
so this patch fixes the issue with below ideas:

        - for -EAGAIN or partial completion, retry the submision
        in following completion cb which is run in BH context
        - for part of completion, update the io queue too
        - for case of io queue full, submit queued requests
        immediatelly and return failure to caller
        - for other failure, abort all queued requests in BH
    context, and requests won't be allow to submit until
    aborting is handled

Reviewed-by: Paolo Bonzini <address@hidden>
Signed-off-by: Ming Lei <address@hidden>
---
 block/linux-aio.c |  114 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 95 insertions(+), 19 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..11ac828 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,20 @@ struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
     struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
     unsigned int size;
     unsigned int idx;
+
+    /* abort queued requests in BH context */
+    QEMUBH *abort_bh;
+    bool  aborting;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -59,6 +68,8 @@ struct qemu_laio_state {
     int event_max;
 };
 
+static int ioq_submit(struct qemu_laio_state *s);
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct 
qemu_laio_state *s,
     qemu_aio_unref(laiocb);
 }
 
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+    if (s->io_q.idx) {
+        ioq_submit(s);
+    }
+}
+
 /* The completion BH fetches completed I/O requests and invokes their
  * callbacks.
  *
@@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
 
         qemu_laio_process_completion(s, laiocb);
     }
+
+    qemu_laio_start_retry(s);
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
     io_q->size = MAX_QUEUED_IO;
     io_q->idx = 0;
     io_q->plugged = 0;
+    io_q->aborting = false;
 }
 
+/* Always return >= 0 and it means how many requests are submitted */
 static int ioq_submit(struct qemu_laio_state *s)
 {
-    int ret, i = 0;
+    int ret;
     int len = s->io_q.idx;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
-
-    /* empty io queue */
-    s->io_q.idx = 0;
+    if (!len) {
+        return 0;
+    }
 
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
     if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* retry in following completion cb */
+        if (ret == -EAGAIN) {
+            return 0;
+        }
+
+        /*
+         * Abort in BH context for avoiding Co-routine re-entered,
+         * and update io queue at that time
+         */
+        qemu_bh_schedule(s->io_q.abort_bh);
+        s->io_q.aborting = true;
+        ret = 0;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+    /*
+     * update io queue, and retry will be started automatically
+     * in following completion cb for the remainder
+     */
+    if (ret > 0) {
+        if (ret < len) {
+            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
+                    (len - ret) * sizeof(struct iocb *));
+        }
+        s->io_q.idx -= ret;
+    }
 
-        laiocb->ret = (ret < 0) ? ret : -EIO;
+    return ret;
+}
+
+static void ioq_abort_bh(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    int i;
+
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = -EIO;
         qemu_laio_process_completion(s, laiocb);
     }
-    return ret;
+
+    s->io_q.idx = 0;
+    s->io_q.aborting = false;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    /* Request can't be allowed to submit until aborting is handled */
+    if (unlikely(s->io_q.aborting)) {
+        return -EIO;
+    }
+
+    if (unlikely(idx == s->io_q.size)) {
+        ioq_submit(s);
+
+        if (unlikely(s->io_q.aborting)) {
+            return -EIO;
+        }
+        idx = s->io_q.idx;
+    }
+
+    /* It has to return now if queue is still full */
+    if (unlikely(idx == s->io_q.size)) {
+        return -EAGAIN;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
         ioq_submit(s);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -281,7 +353,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void 
*aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext 
*old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    qemu_bh_delete(s->io_q.abort_bh);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
 }
-- 
1.7.9.5




reply via email to

[Prev in Thread] Current Thread [Next in Thread]