[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [RFC 1/1] linux-aio: consume events in userspace instea
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [RFC 1/1] linux-aio: consume events in userspace instead of calling io_getevents |
Date: |
Thu, 14 Jul 2016 11:14:40 +0200 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:45.0) Gecko/20100101 Thunderbird/45.1.1 |
On 12/07/2016 17:25, Roman Pen wrote:
> AIO context in userspace is represented as a simple ring buffer, which
> can be consumed directly without entering the kernel, which obviously
> can bring some performance gain. QEMU does not use timeout value for
> waiting for events completions, so we can consume all events from
> userspace.
>
> My VM configuration is the following:
>
> VCPU=8, MQ=8, 1 iothread
>
> FIO results:
>
> io_getevents in kernel:
>
> READ: io=56479MB, aggrb=1882.5MB/s, minb=1882.5MB/s, maxb=1882.5MB/s,
> mint=30003msec, maxt=30003msec
> WRITE: io=56371MB, aggrb=1878.9MB/s, minb=1878.9MB/s, maxb=1878.9MB/s,
> mint=30003msec, maxt=30003msec
>
> io_getevents in userspace:
>
> READ: io=57576MB, aggrb=1919.2MB/s, minb=1919.2MB/s, maxb=1919.2MB/s,
> mint=30003msec, maxt=30003msec
> WRITE: io=57492MB, aggrb=1916.3MB/s, minb=1916.3MB/s, maxb=1916.3MB/s,
> mint=30003msec, maxt=30003msec
>
> The gain is only ~2%, but the price is almost nothing.
>
> Signed-off-by: Roman Pen <address@hidden>
> Cc: Stefan Hajnoczi <address@hidden>
> Cc: Paolo Bonzini <address@hidden>
> Cc: address@hidden
> ---
> block/linux-aio.c | 86
> ++++++++++++++++++++++++++++++++++++++++++++-----------
> 1 file changed, 70 insertions(+), 16 deletions(-)
The patch does not pass checkpatch.pl, but apart from this it's a nice
trick.
Paolo
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index fe7cece..4ab09c0 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -58,7 +58,7 @@ struct LinuxAioState {
>
> /* I/O completion processing */
> QEMUBH *completion_bh;
> - struct io_event events[MAX_EVENTS];
> + struct io_event *events;
> int event_idx;
> int event_max;
> };
> @@ -101,6 +101,60 @@ static void qemu_laio_process_completion(struct
> qemu_laiocb *laiocb)
> }
> }
>
> +/**
> + * aio_ring buffer which is shared between userspace and kernel.
> + */
> +struct aio_ring {
> + unsigned id; /* kernel internal index number */
> + unsigned nr; /* number of io_events */
> + unsigned head; /* Written to by userland or under ring_lock
> + * mutex by aio_read_events_ring(). */
> + unsigned tail;
> +
> + unsigned magic;
> + unsigned compat_features;
> + unsigned incompat_features;
> + unsigned header_length; /* size of aio_ring */
> +
> + struct io_event io_events[0];
> +};
> +
> +/**
> + * io_getevents_peek() - returns the number of completed events
> + * and sets a pointer on events array.
> + *
> + * This function does not update the internal ring buffer, only
> + * reads head and tail. When @events has been processed
> + * io_getevents_commit() must be called.
> + */
> +static int io_getevents_peek(io_context_t aio_ctx, unsigned int max,
> + struct io_event **events)
> +{
> + struct aio_ring *ring = (struct aio_ring*)aio_ctx;
> + unsigned head = ring->head, tail = ring->tail;
> + unsigned nr;
> +
> + nr = tail >= head ? tail - head : ring->nr - head;
> + *events = ring->io_events + head;
> + /* To avoid speculative loads of s->events[i] before observing tail.
> + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
> + smp_rmb();
> +
> + return nr;
> +}
> +
> +/**
> + * io_getevents_commit() - advances head of a ring buffer and returns
> + * 1 if there are some events left in a ring.
> + */
> +static int io_getevents_commit(io_context_t aio_ctx, unsigned int nr)
> +{
> + struct aio_ring *ring = (struct aio_ring*)aio_ctx;
> +
> + ring->head = (ring->head + nr) % ring->nr;
> + return (ring->head != ring->tail);
> +}
> +
> /* The completion BH fetches completed I/O requests and invokes their
> * callbacks.
> *
> @@ -118,32 +172,32 @@ static void qemu_laio_completion_bh(void *opaque)
>
> /* Fetch more completion events when empty */
> if (s->event_idx == s->event_max) {
> - do {
> - struct timespec ts = { 0 };
> - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
> - s->events, &ts);
> - } while (s->event_max == -EINTR);
> -
> +more:
> + s->event_max = io_getevents_peek(s->ctx, MAX_EVENTS, &s->events);
> s->event_idx = 0;
> - if (s->event_max <= 0) {
> - s->event_max = 0;
> + if (s->event_max == 0)
> return; /* no more events */
> - }
> }
>
> /* Reschedule so nested event loops see currently pending completions */
> qemu_bh_schedule(s->completion_bh);
>
> /* Process completion events */
> - while (s->event_idx < s->event_max) {
> - struct iocb *iocb = s->events[s->event_idx].obj;
> - struct qemu_laiocb *laiocb =
> + if (s->event_idx < s->event_max) {
> + unsigned nr = s->event_max - s->event_idx;
> +
> + while (s->event_idx < s->event_max) {
> + struct iocb *iocb = s->events[s->event_idx].obj;
> + struct qemu_laiocb *laiocb =
> container_of(iocb, struct qemu_laiocb, iocb);
>
> - laiocb->ret = io_event_ret(&s->events[s->event_idx]);
> - s->event_idx++;
> + laiocb->ret = io_event_ret(&s->events[s->event_idx]);
> + s->event_idx++;
>
> - qemu_laio_process_completion(laiocb);
> + qemu_laio_process_completion(laiocb);
> + }
> + if (io_getevents_commit(s->ctx, nr))
> + goto more;
> }
>
> if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
>