qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PULL 20/24] coroutine-lock: add limited spinning to CoMute


From: Stefan Hajnoczi
Subject: [Qemu-devel] [PULL 20/24] coroutine-lock: add limited spinning to CoMutex
Date: Mon, 20 Feb 2017 09:33:00 +0000

From: Paolo Bonzini <address@hidden>

Running a very small critical section on pthread_mutex_t and CoMutex
shows that pthread_mutex_t is much faster because it doesn't actually
go to sleep.  What happens is that the critical section is shorter
than the latency of entering the kernel and thus FUTEX_WAIT always
fails.  With CoMutex there is no such latency but you still want to
avoid wait and wakeup.  So introduce it artificially.

This only works with one waiters; because CoMutex is fair, it will
always have more waits and wakeups than a pthread_mutex_t.

Signed-off-by: Paolo Bonzini <address@hidden>
Reviewed-by: Fam Zheng <address@hidden>
Message-id: address@hidden
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
 include/qemu/coroutine.h   |  5 +++++
 util/qemu-coroutine-lock.c | 51 ++++++++++++++++++++++++++++++++++++++++------
 util/qemu-coroutine.c      |  2 +-
 3 files changed, 51 insertions(+), 7 deletions(-)

diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index fce228f..12ce8e1 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -167,6 +167,11 @@ typedef struct CoMutex {
      */
     unsigned locked;
 
+    /* Context that is holding the lock.  Useful to avoid spinning
+     * when two coroutines on the same AioContext try to get the lock. :)
+     */
+    AioContext *ctx;
+
     /* A queue of waiters.  Elements are added atomically in front of
      * from_push.  to_pop is only populated, and popped from, by whoever
      * is in charge of the next wakeup.  This can be an unlocker or,
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 25da9fa..73fe77c 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -30,6 +30,7 @@
 #include "qemu-common.h"
 #include "qemu/coroutine.h"
 #include "qemu/coroutine_int.h"
+#include "qemu/processor.h"
 #include "qemu/queue.h"
 #include "block/aio.h"
 #include "trace.h"
@@ -181,7 +182,18 @@ void qemu_co_mutex_init(CoMutex *mutex)
     memset(mutex, 0, sizeof(*mutex));
 }
 
-static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)
+static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
+{
+    /* Read co before co->ctx; pairs with smp_wmb() in
+     * qemu_coroutine_enter().
+     */
+    smp_read_barrier_depends();
+    mutex->ctx = co->ctx;
+    aio_co_wake(co);
+}
+
+static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
+                                                     CoMutex *mutex)
 {
     Coroutine *self = qemu_coroutine_self();
     CoWaitRecord w;
@@ -206,10 +218,11 @@ static void coroutine_fn 
qemu_co_mutex_lock_slowpath(CoMutex *mutex)
         if (co == self) {
             /* We got the lock ourselves!  */
             assert(to_wake == &w);
+            mutex->ctx = ctx;
             return;
         }
 
-        aio_co_wake(co);
+        qemu_co_mutex_wake(mutex, co);
     }
 
     qemu_coroutine_yield();
@@ -218,13 +231,39 @@ static void coroutine_fn 
qemu_co_mutex_lock_slowpath(CoMutex *mutex)
 
 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
 {
+    AioContext *ctx = qemu_get_current_aio_context();
     Coroutine *self = qemu_coroutine_self();
+    int waiters, i;
 
-    if (atomic_fetch_inc(&mutex->locked) == 0) {
+    /* Running a very small critical section on pthread_mutex_t and CoMutex
+     * shows that pthread_mutex_t is much faster because it doesn't actually
+     * go to sleep.  What happens is that the critical section is shorter
+     * than the latency of entering the kernel and thus FUTEX_WAIT always
+     * fails.  With CoMutex there is no such latency but you still want to
+     * avoid wait and wakeup.  So introduce it artificially.
+     */
+    i = 0;
+retry_fast_path:
+    waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
+    if (waiters != 0) {
+        while (waiters == 1 && ++i < 1000) {
+            if (atomic_read(&mutex->ctx) == ctx) {
+                break;
+            }
+            if (atomic_read(&mutex->locked) == 0) {
+                goto retry_fast_path;
+            }
+            cpu_relax();
+        }
+        waiters = atomic_fetch_inc(&mutex->locked);
+    }
+
+    if (waiters == 0) {
         /* Uncontended.  */
         trace_qemu_co_mutex_lock_uncontended(mutex, self);
+        mutex->ctx = ctx;
     } else {
-        qemu_co_mutex_lock_slowpath(mutex);
+        qemu_co_mutex_lock_slowpath(ctx, mutex);
     }
     mutex->holder = self;
     self->locks_held++;
@@ -240,6 +279,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
     assert(mutex->holder == self);
     assert(qemu_in_coroutine());
 
+    mutex->ctx = NULL;
     mutex->holder = NULL;
     self->locks_held--;
     if (atomic_fetch_dec(&mutex->locked) == 1) {
@@ -252,8 +292,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
         unsigned our_handoff;
 
         if (to_wake) {
-            Coroutine *co = to_wake->co;
-            aio_co_wake(co);
+            qemu_co_mutex_wake(mutex, to_wake->co);
             break;
         }
 
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index 415600d..72412e5 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -118,7 +118,7 @@ void qemu_coroutine_enter(Coroutine *co)
     co->ctx = qemu_get_current_aio_context();
 
     /* Store co->ctx before anything that stores co.  Matches
-     * barrier in aio_co_wake.
+     * barrier in aio_co_wake and qemu_co_mutex_wake.
      */
     smp_wmb();
 
-- 
2.9.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]