[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [RFC v2 2/6] aio: add aio_context_acquire() and aio_con
From: |
Fam Zheng |
Subject: |
Re: [Qemu-devel] [RFC v2 2/6] aio: add aio_context_acquire() and aio_context_release() |
Date: |
Mon, 20 Jan 2014 18:29:50 +0800 |
User-agent: |
Mutt/1.5.22 (2013-10-16) |
On Fri, 01/10 09:45, Stefan Hajnoczi wrote:
> It can be useful to run an AioContext from a thread which normally does
> not "own" the AioContext. For example, request draining can be
> implemented by acquiring the AioContext and looping aio_poll() until all
> requests have been completed.
>
> The following pattern should work:
>
> /* Event loop thread */
> while (running) {
> aio_context_acquire(ctx);
> aio_poll(ctx, true);
> aio_context_release(ctx);
> }
>
> /* Another thread */
> aio_context_acquire(ctx);
> bdrv_read(bs, 0x1000, buf, 1);
> aio_context_release(ctx);
>
> This patch implements aio_context_acquire() and aio_context_release().
>
> Note that existing aio_poll() callers do not need to worry about
> acquiring and releasing - it is only needed when multiple threads will
> call aio_poll() on the same AioContext.
>
> Signed-off-by: Stefan Hajnoczi <address@hidden>
> ---
> async.c | 18 +++++++++++++++++
> include/block/aio.h | 18 +++++++++++++++++
> tests/test-aio.c | 58
> +++++++++++++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 94 insertions(+)
>
> diff --git a/async.c b/async.c
> index 5fb3fa6..6930185 100644
> --- a/async.c
> +++ b/async.c
> @@ -214,6 +214,7 @@ aio_ctx_finalize(GSource *source)
> thread_pool_free(ctx->thread_pool);
> aio_set_event_notifier(ctx, &ctx->notifier, NULL);
> event_notifier_cleanup(&ctx->notifier);
> + rfifolock_destroy(&ctx->lock);
> qemu_mutex_destroy(&ctx->bh_lock);
> g_array_free(ctx->pollfds, TRUE);
> timerlistgroup_deinit(&ctx->tlg);
> @@ -250,6 +251,12 @@ static void aio_timerlist_notify(void *opaque)
> aio_notify(opaque);
> }
>
> +static void aio_rfifolock_cb(void *opaque)
> +{
> + /* Kick owner thread in case they are blocked in aio_poll() */
> + aio_notify(opaque);
> +}
> +
> AioContext *aio_context_new(void)
> {
> AioContext *ctx;
> @@ -257,6 +264,7 @@ AioContext *aio_context_new(void)
> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
> ctx->thread_pool = NULL;
> qemu_mutex_init(&ctx->bh_lock);
> + rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
> event_notifier_init(&ctx->notifier, false);
> aio_set_event_notifier(ctx, &ctx->notifier,
> (EventNotifierHandler *)
> @@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx)
> {
> g_source_unref(&ctx->source);
> }
> +
> +void aio_context_acquire(AioContext *ctx)
> +{
> + rfifolock_lock(&ctx->lock);
> +}
> +
> +void aio_context_release(AioContext *ctx)
> +{
> + rfifolock_unlock(&ctx->lock);
> +}
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 2efdf41..4aaa5d5 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -19,6 +19,7 @@
> #include "qemu/queue.h"
> #include "qemu/event_notifier.h"
> #include "qemu/thread.h"
> +#include "qemu/rfifolock.h"
> #include "qemu/timer.h"
>
> typedef struct BlockDriverAIOCB BlockDriverAIOCB;
> @@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque);
> struct AioContext {
> GSource source;
>
> + /* Protects all fields from multi-threaded access */
> + RFifoLock lock;
> +
> /* The list of registered AIO handlers */
> QLIST_HEAD(, AioHandler) aio_handlers;
>
> @@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx);
> */
> void aio_context_unref(AioContext *ctx);
>
> +/* Take ownership of the AioContext. If the AioContext will be shared
> between
> + * threads, a thread must have ownership when calling aio_poll().
> + *
> + * Note that multiple threads calling aio_poll() means timers, BHs, and
> + * callbacks may be invoked from a different thread than they were registered
> + * from. Therefore, code must use AioContext acquire/release or use
> + * fine-grained synchronization to protect shared state if other threads will
> + * be accessing it simultaneously.
> + */
> +void aio_context_acquire(AioContext *ctx);
> +
> +/* Reliquinish ownership of the AioContext. */
s/Reliquinish/Relinquish/
Fam
> +void aio_context_release(AioContext *ctx);
> +
> /**
> * aio_bh_new: Allocate a new bottom half structure.
> *
> diff --git a/tests/test-aio.c b/tests/test-aio.c
> index 592721e..d384b0b 100644
> --- a/tests/test-aio.c
> +++ b/tests/test-aio.c
> @@ -112,6 +112,63 @@ static void test_notify(void)
> g_assert(!aio_poll(ctx, false));
> }
>
> +typedef struct {
> + QemuMutex start_lock;
> + bool thread_acquired;
> +} AcquireTestData;
> +
> +static void *test_acquire_thread(void *opaque)
> +{
> + AcquireTestData *data = opaque;
> +
> + /* Wait for other thread to let us start */
> + qemu_mutex_lock(&data->start_lock);
> + qemu_mutex_unlock(&data->start_lock);
> +
> + aio_context_acquire(ctx);
> + aio_context_release(ctx);
> +
> + data->thread_acquired = true; /* success, we got here */
> +
> + return NULL;
> +}
> +
> +static void dummy_notifier_read(EventNotifier *unused)
> +{
> + g_assert(false); /* should never be invoked */
> +}
> +
> +static void test_acquire(void)
> +{
> + QemuThread thread;
> + EventNotifier notifier;
> + AcquireTestData data;
> +
> + /* Dummy event notifier ensures aio_poll() will block */
> + event_notifier_init(¬ifier, false);
> + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read);
> + g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
> +
> + qemu_mutex_init(&data.start_lock);
> + qemu_mutex_lock(&data.start_lock);
> + data.thread_acquired = false;
> +
> + qemu_thread_create(&thread, test_acquire_thread,
> + &data, QEMU_THREAD_JOINABLE);
> +
> + /* Block in aio_poll(), let other thread kick us and acquire context */
> + aio_context_acquire(ctx);
> + qemu_mutex_unlock(&data.start_lock); /* let the thread run */
> + g_assert(!aio_poll(ctx, true));
> + aio_context_release(ctx);
> +
> + qemu_thread_join(&thread);
> + aio_set_event_notifier(ctx, ¬ifier, NULL);
> + event_notifier_cleanup(¬ifier);
> +
> + g_assert(data.thread_acquired);
> +}
> +
> static void test_bh_schedule(void)
> {
> BHTestData data = { .n = 0 };
> @@ -775,6 +832,7 @@ int main(int argc, char **argv)
>
> g_test_init(&argc, &argv, NULL);
> g_test_add_func("/aio/notify", test_notify);
> + g_test_add_func("/aio/acquire", test_acquire);
> g_test_add_func("/aio/bh/schedule", test_bh_schedule);
> g_test_add_func("/aio/bh/schedule10", test_bh_schedule10);
> g_test_add_func("/aio/bh/cancel", test_bh_cancel);
> --
> 1.8.4.2
>
>
- [Qemu-devel] [RFC v2 0/6] dataplane: switch to N:M devices-per-thread model, Stefan Hajnoczi, 2014/01/09
- [Qemu-devel] [RFC v2 1/6] rfifolock: add recursive FIFO lock, Stefan Hajnoczi, 2014/01/09
- [Qemu-devel] [RFC v2 2/6] aio: add aio_context_acquire() and aio_context_release(), Stefan Hajnoczi, 2014/01/09
- Re: [Qemu-devel] [RFC v2 2/6] aio: add aio_context_acquire() and aio_context_release(),
Fam Zheng <=
- [Qemu-devel] [RFC v2 3/6] iothread: add I/O thread object, Stefan Hajnoczi, 2014/01/09
- [Qemu-devel] [RFC v2 4/6] qdev: add get_pointer_and_free() for temporary strings, Stefan Hajnoczi, 2014/01/09
- [Qemu-devel] [RFC v2 5/6] iothread: add "iothread" qdev property type, Stefan Hajnoczi, 2014/01/09
- [Qemu-devel] [RFC v2 6/6] dataplane: replace internal thread with IOThread, Stefan Hajnoczi, 2014/01/09