[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v5 1/3] linux-aio: fix submit aio as a batch
From: |
Ming Lei |
Subject: |
[Qemu-devel] [PATCH v5 1/3] linux-aio: fix submit aio as a batch |
Date: |
Mon, 24 Nov 2014 22:29:37 +0800 |
In the submit path, we can't complete request directly,
otherwise "Co-routine re-entered recursively" may be caused,
so this patch fixes the issue with below ideas:
- for -EAGAIN or partial completion, retry the submision
in following completion cb which is run in BH context
- for part of completion, update the io queue too
- for case of io queue full, submit queued requests
immediatelly and return failure to caller
- for other failure, abort all queued requests in BH
context, and requests won't be allow to submit until
aborting is handled
Signed-off-by: Ming Lei <address@hidden>
---
block/linux-aio.c | 116 ++++++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 96 insertions(+), 20 deletions(-)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..70312a4 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,20 @@ struct qemu_laiocb {
QLIST_ENTRY(qemu_laiocb) node;
};
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
typedef struct {
struct iocb *iocbs[MAX_QUEUED_IO];
int plugged;
unsigned int size;
unsigned int idx;
+
+ /* abort queued requests in BH context */
+ QEMUBH *abort_bh;
+ bool aborting;
} LaioQueue;
struct qemu_laio_state {
@@ -59,6 +68,8 @@ struct qemu_laio_state {
int event_max;
};
+static int ioq_submit(struct qemu_laio_state *s);
+
static inline ssize_t io_event_ret(struct io_event *ev)
{
return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct
qemu_laio_state *s,
qemu_aio_unref(laiocb);
}
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+ if (s->io_q.idx) {
+ ioq_submit(s);
+ }
+}
+
/* The completion BH fetches completed I/O requests and invokes their
* callbacks.
*
@@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
qemu_laio_process_completion(s, laiocb);
}
+
+ qemu_laio_start_retry(s);
}
static void qemu_laio_completion_cb(EventNotifier *e)
@@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
io_q->size = MAX_QUEUED_IO;
io_q->idx = 0;
io_q->plugged = 0;
+ io_q->aborting = false;
}
+/* Always return >= 0 and it means how many requests are submitted */
static int ioq_submit(struct qemu_laio_state *s)
{
- int ret, i = 0;
+ int ret;
int len = s->io_q.idx;
- do {
- ret = io_submit(s->ctx, len, s->io_q.iocbs);
- } while (i++ < 3 && ret == -EAGAIN);
-
- /* empty io queue */
- s->io_q.idx = 0;
+ if (!len) {
+ return 0;
+ }
+ ret = io_submit(s->ctx, len, s->io_q.iocbs);
if (ret < 0) {
- i = 0;
- } else {
- i = ret;
+ /* retry in following completion cb */
+ if (ret == -EAGAIN) {
+ return 0;
+ }
+
+ /*
+ * Abort in BH context for avoiding Co-routine re-entered,
+ * and update io queue at that time
+ */
+ qemu_bh_schedule(s->io_q.abort_bh);
+ s->io_q.aborting = true;
+ ret = 0;
}
- for (; i < len; i++) {
- struct qemu_laiocb *laiocb =
- container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+ /*
+ * update io queue, and retry will be started automatically
+ * in following completion cb for the remainder
+ */
+ if (ret > 0) {
+ if (ret < len) {
+ memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
+ (len - ret) * sizeof(struct iocb *));
+ }
+ s->io_q.idx -= ret;
+ }
- laiocb->ret = (ret < 0) ? ret : -EIO;
+ return ret;
+}
+
+static void ioq_abort_bh(void *opaque)
+{
+ struct qemu_laio_state *s = opaque;
+ int i;
+
+ for (i = 0; i < s->io_q.idx; i++) {
+ struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+ struct qemu_laiocb,
+ iocb);
+ laiocb->ret = -EIO;
qemu_laio_process_completion(s, laiocb);
}
- return ret;
+
+ s->io_q.idx = 0;
+ io_q->aborting = false;
}
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
{
unsigned int idx = s->io_q.idx;
+ /* Request can't be allowed to submit until aborting is handled */
+ if (unlikely(s->io_q.aborting)) {
+ return -EIO;
+ }
+
+ if (unlikely(idx == s->io_q.size)) {
+ ioq_submit(s);
+
+ if (unlikely(s->io_q.aborting)) {
+ return -EIO;
+ }
+ idx = s->io_q.idx;
+ }
+
+ /* It has to return now if queue is still full */
+ if (unlikely(idx == s->io_q.size)) {
+ return -EAGAIN;
+ }
+
s->io_q.iocbs[idx++] = iocb;
s->io_q.idx = idx;
- /* submit immediately if queue is full */
- if (idx == s->io_q.size) {
- ioq_submit(s);
+ /* submit immediately if queue depth is above 2/3 */
+ if (idx > s->io_q.size * 2 / 3) {
+ return ioq_submit(s);
}
+
+ return 0;
}
void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -281,7 +353,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void
*aio_ctx, int fd,
goto out_free_aiocb;
}
} else {
- ioq_enqueue(s, iocbs);
+ if (ioq_enqueue(s, iocbs) < 0) {
+ goto out_free_aiocb;
+ }
}
return &laiocb->common;
@@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext
*old_context)
aio_set_event_notifier(old_context, &s->e, NULL);
qemu_bh_delete(s->completion_bh);
+ qemu_bh_delete(s->io_q.abort_bh);
}
void laio_attach_aio_context(void *s_, AioContext *new_context)
{
struct qemu_laio_state *s = s_;
+ s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}
--
1.7.9.5