[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility |
Date: |
Wed, 31 Oct 2012 10:41:48 +0100 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:16.0) Gecko/20121016 Thunderbird/16.0.1 |
Il 30/10/2012 20:13, Stefan Hajnoczi ha scritto:
> On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
>> +static void event_notifier_ready(EventNotifier *notifier)
>> +{
>> + ThreadPoolElement *elem, *next;
>> +
>> + event_notifier_test_and_clear(notifier);
>> +restart:
>> + QLIST_FOREACH_SAFE(elem, &head, all, next) {
>> + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>> + continue;
>> + }
>> + if (elem->state == THREAD_DONE) {
>> + trace_thread_pool_complete(elem, elem->common.opaque,
>> elem->ret);
>> + }
>> + if (elem->state == THREAD_DONE && elem->common.cb) {
>> + QLIST_REMOVE(elem, all);
>> + elem->common.cb(elem->common.opaque, elem->ret);
>
> This function didn't take the lock. First it accessed elem->state and
> how it reads elem->ret. We need to take the lock to ensure both
> elem->state and elem->ret have been set - otherwise we could read
> elem->ret before the return value was stored.
Right. posix-aio-compat didn't need this because it only had ret.
Just as important: the locking policy was not documented at all.
I'm applying some changes. Logically (and for ease of review) they are
four patches on top of this one, but they'll be squashed in the next
submission. (Hmm, the fourth should be separate).
>> +typedef struct ThreadPoolCo {
>> + Coroutine *co;
>> + int ret;
>> +} ThreadPoolCo;
>> +
>> +static void thread_pool_co_cb(void *opaque, int ret)
>> +{
>> + ThreadPoolCo *co = opaque;
>> +
>> + co->ret = ret;
>> + qemu_coroutine_enter(co->co, NULL);
>> +}
>> +
>> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>> +{
>> + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>> + assert(qemu_in_coroutine());
>> + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
>> + qemu_coroutine_yield();
>> + return tpc.ret;
>
> It's important to understand that the submit_aio, yield, return ret
> pattern works because we assume this function was called as part of the
> main loop.
>
> If thread_pool_submit_co() was called outside the event loop and global
> mutex, then there is a race between the submit_aio and yield steps where
> thread_pool_co_cb() is called before this coroutine yields!
Even before that, thread_pool_submit_aio would race on the
non-thread-safe qemu_aio_get. Also, head is protected by the BQL.
event_notifier_ready needs to run under the BQL too, because it
accesses head and also calls qemu_aio_release.
qemu_aio_get and qemu_aio_release should be moved to AioContext, so
that they can use the (upcoming) AioContext lock instead of the BQL.
The thread pool needs to be per-AioContext instead of using globals,
too. However, this can be done later.
Paolo
----------------------- >8 ---------------------------
>From acf39d76ddf4109fbdbc897afe0d9a23ba8ffba1 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <address@hidden>
Date: Wed, 31 Oct 2012 10:07:04 +0100
Subject: [PATCH 1/4] fix locking in thread-pool
event_notifier_ready accesses elem->state and then elem->ret. We need
to take the lock to ensure both elem->state and elem->ret have been set -
otherwise we could read elem->ret before the return value was stored.
Signed-off-by: Paolo Bonzini <address@hidden>
---
thread-pool.c | 5 ++++-
1 file modificato, 4 inserzioni(+). 1 rimozione(-)
diff --git a/thread-pool.c b/thread-pool.c
index 266f12f..e4bd4f3 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -162,8 +162,11 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
+ qemu_mutex_lock(&lock);
+ int ret = elem->ret;
+ qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, elem->ret);
+ elem->common.cb(elem->common.opaque, ret);
qemu_aio_release(elem);
goto restart;
} else {
--
1.7.12.1
>From 8671e581bb65be4d3cd82a9b99fe46735c6ea76b Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <address@hidden>
Date: Wed, 31 Oct 2012 10:09:26 +0100
Subject: [PATCH 2/4] document lock policy
Signed-off-by: Paolo Bonzini <address@hidden>
---
thread-pool.c | 13 ++++++++++---
1 file modificato, 10 inserzioni(+), 3 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index e4bd4f3..10bab70 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -42,7 +42,10 @@ struct ThreadPoolElement {
enum ThreadState state;
int ret;
+ /* Access to this list is protected by lock. */
QTAILQ_ENTRY(ThreadPoolElement) reqs;
+
+ /* Access to this list is protected by the global mutex. */
QLIST_ENTRY(ThreadPoolElement) all;
};
@@ -51,14 +54,18 @@ static QemuMutex lock;
static QemuCond check_cancel;
static QemuSemaphore sem;
static int max_threads = 64;
+static QEMUBH *new_thread_bh;
+
+/* The following variables are protected by the global mutex. */
+static QLIST_HEAD(, ThreadPoolElement) head;
+
+/* The following variables are protected by lock. */
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static int cur_threads;
static int idle_threads;
static int new_threads; /* backlog of threads we need to create */
static int pending_threads; /* threads created but not running yet */
static int pending_cancellations; /* whether we need a cond_broadcast */
-static QEMUBH *new_thread_bh;
-static QLIST_HEAD(, ThreadPoolElement) head;
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static void *worker_thread(void *unused)
{
--
1.7.12.1
>From 1b611e625a1a16c1d1b110e410f082b64c7ba332 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <address@hidden>
Date: Wed, 31 Oct 2012 10:08:39 +0100
Subject: [PATCH 3/4] simplify locking
Avoid repeated lock/unlock, take lock around the while loop rather
than inside.
Signed-off-by: Paolo Bonzini <address@hidden>
---
thread-pool.c | 11 +++--------
1 file modificato, 3 inserzioni(+), 8 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index 10bab70..38ac5b4 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -71,25 +71,21 @@ static void *worker_thread(void *unused)
{
qemu_mutex_lock(&lock);
pending_threads--;
- qemu_mutex_unlock(&lock);
do_spawn_thread();
while (1) {
ThreadPoolElement *req;
int ret;
- qemu_mutex_lock(&lock);
- idle_threads++;
- qemu_mutex_unlock(&lock);
- ret = qemu_sem_timedwait(&sem, 10000);
- qemu_mutex_lock(&lock);
- idle_threads--;
+ do {
+ idle_threads++;
+ qemu_mutex_unlock(&lock);
+ ret = qemu_sem_timedwait(&sem, 10000);
+ qemu_mutex_lock(&lock);
+ idle_threads--;
+ } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
if (ret == -1) {
- if (QTAILQ_EMPTY(&request_list)) {
- break;
- }
- qemu_mutex_unlock(&lock);
- continue;
+ break;
}
req = QTAILQ_FIRST(&request_list);
@@ -105,14 +103,12 @@ static void *worker_thread(void *unused)
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
- qemu_mutex_unlock(&lock);
event_notifier_set(¬ifier);
}
cur_threads--;
qemu_mutex_unlock(&lock);
-
return NULL;
}
@@ -120,23 +116,22 @@ static void do_spawn_thread(void)
{
QemuThread t;
- qemu_mutex_lock(&lock);
+ /* Runs with lock taken. */
if (!new_threads) {
- qemu_mutex_unlock(&lock);
return;
}
new_threads--;
pending_threads++;
- qemu_mutex_unlock(&lock);
-
qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
}
static void spawn_thread_bh_fn(void *opaque)
{
+ qemu_mutex_lock(&lock);
do_spawn_thread();
+ qemu_mutex_unlock(&lock);
}
static void spawn_thread(void)
--
1.7.12.1
>From 3478c7db38368804db46924fc22d90cda77f6a48 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <address@hidden>
Date: Wed, 31 Oct 2012 10:09:11 +0100
Subject: [PATCH 4/4] threadpool: do not take lock in event_notifier_ready
The ordering is:
worker thread consumer thread
-------------------------------------------------------------------
write ret event_notifier_test_and_clear
wmb() read state
write state rmb()
event_notifier_set read ret
Signed-off-by: Paolo Bonzini <address@hidden>
---
thread-pool.c | 19 +++++++++++++------
1 file modificato, 13 inserzioni(+), 6 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index 38ac5b4..883948a 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -39,6 +39,11 @@ struct ThreadPoolElement {
BlockDriverAIOCB common;
ThreadPoolFunc *func;
void *arg;
+
+ /* Moving state out of THREAD_QUEUED is protected by lock. After
+ * that, only the worker thread can write to it. Reads and writes
+ * of state and ret are ordered with memory barriers.
+ */
enum ThreadState state;
int ret;
@@ -97,9 +102,12 @@ static void *worker_thread(void *unused)
ret = req->func(req->arg);
- qemu_mutex_lock(&lock);
- req->state = THREAD_DONE;
req->ret = ret;
+ /* Write ret before state. */
+ smp_wmb();
+ req->state = THREAD_DONE;
+
+ qemu_mutex_lock(&lock);
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
@@ -164,11 +172,10 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
- qemu_mutex_lock(&lock);
- int ret = elem->ret;
- qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, ret);
+ /* Read state before ret. */
+ smp_rmb();
+ elem->common.cb(elem->common.opaque, elem->ret);
qemu_aio_release(elem);
goto restart;
} else {
--
1.7.12.1
- [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers, (continued)
- [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 10/25] aio: add Win32 implementation, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 13/25] aio: call aio_notify after setting I/O handlers, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 16/25] aio: clean up now-unused functions, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 14/25] main-loop: use GSource to poll AIO file descriptors, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 22/25] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 25/25] raw-win32: implement native asynchronous I/O, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 23/25] raw-win32: add emulated AIO support, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 20/25] block: switch posix-aio-compat to threadpool, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 21/25] raw: merge posix-aio-compat.c into block/raw-posix.c, Paolo Bonzini, 2012/10/26
- [Qemu-devel] [PATCH 24/25] raw-posix: move linux-aio.c to block/, Paolo Bonzini, 2012/10/26
- Re: [Qemu-devel] [PATCH 00/25] AioContext & threadpool, Stefan Hajnoczi, 2012/10/31