qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-block] [PATCH] thread-pool: Use an EventNotifier to coordinate wit


From: Sergio Lopez
Subject: [Qemu-block] [PATCH] thread-pool: Use an EventNotifier to coordinate with AioContext
Date: Mon, 25 Mar 2019 14:34:36 +0100

Our current ThreadPool implementation lacks support for AioContext's
event notifications. This not only means that it can't take advantage
from the IOThread's adaptive polling, but also that the latter works
against it, as it delays the execution of the bottom-halves
completions.

Here we implement handlers for both io_poll and io_read, and an
EventNotifier which is signaled from the worker threads after
returning from the asynchronous function.

The original issue and the improvement obtained from this patch can be
illustrated by the following fio test:

 * Host: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
 * pseudo-storage: null_blk gb=10 nr_devices=2 irqmode=2
   completion_nsec=30000 (latency ~= NVMe)
 * fio cmdline: fio --time_based --runtime=30 --rw=randread
   --name=randread  --filename=/dev/vdb --direct=1 --ioengine=libaio
   --iodepth=1

                               ================
 ==============================| latency (us) |
 | master (poll-max-ns=50000)  |        69.87 |
 | master (poll-max-ns=0       |        56.11 |
 | patched (poll-max-ns=50000) |        49.45 |
 ==============================================

Signed-off-by: Sergio Lopez <address@hidden>
---
 util/thread-pool.c | 48 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 46 insertions(+), 2 deletions(-)

diff --git a/util/thread-pool.c b/util/thread-pool.c
index 610646d131..058ed7f0ae 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -61,6 +61,7 @@ struct ThreadPool {
     QemuSemaphore sem;
     int max_threads;
     QEMUBH *new_thread_bh;
+    EventNotifier e;
 
     /* The following variables are only accessed from one AioContext. */
     QLIST_HEAD(, ThreadPoolElement) head;
@@ -72,6 +73,7 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    bool event_notifier_enabled;
 };
 
 static void *worker_thread(void *opaque)
@@ -108,6 +110,9 @@ static void *worker_thread(void *opaque)
         /* Write ret before state.  */
         smp_wmb();
         req->state = THREAD_DONE;
+        if (pool->event_notifier_enabled) {
+            event_notifier_set(&pool->e);
+        }
 
         qemu_mutex_lock(&pool->lock);
 
@@ -160,10 +165,10 @@ static void spawn_thread(ThreadPool *pool)
     }
 }
 
-static void thread_pool_completion_bh(void *opaque)
+static bool thread_pool_process_completions(ThreadPool *pool)
 {
-    ThreadPool *pool = opaque;
     ThreadPoolElement *elem, *next;
+    bool progress = false;
 
     aio_context_acquire(pool->ctx);
 restart:
@@ -172,6 +177,8 @@ restart:
             continue;
         }
 
+        progress = true;
+
         trace_thread_pool_complete(pool, elem, elem->common.opaque,
                                    elem->ret);
         QLIST_REMOVE(elem, all);
@@ -202,6 +209,32 @@ restart:
         }
     }
     aio_context_release(pool->ctx);
+
+    return progress;
+}
+
+static void thread_pool_completion_bh(void *opaque)
+{
+    ThreadPool *pool = opaque;
+
+    thread_pool_process_completions(pool);
+}
+
+static void thread_pool_completion_cb(EventNotifier *e)
+{
+    ThreadPool *pool = container_of(e, ThreadPool, e);
+
+    if (event_notifier_test_and_clear(&pool->e)) {
+        thread_pool_completion_bh(pool);
+    }
+}
+
+static bool thread_pool_poll_cb(void *opaque)
+{
+    EventNotifier *e = opaque;
+    ThreadPool *pool = container_of(e, ThreadPool, e);
+
+    return thread_pool_process_completions(pool);
 }
 
 static void thread_pool_cancel(BlockAIOCB *acb)
@@ -311,6 +344,13 @@ static void thread_pool_init_one(ThreadPool *pool, 
AioContext *ctx)
     pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
+    if (event_notifier_init(&pool->e, false) >= 0) {
+        aio_set_event_notifier(ctx, &pool->e, false,
+                               thread_pool_completion_cb,
+                               thread_pool_poll_cb);
+        pool->event_notifier_enabled = true;
+    }
+
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
 }
@@ -346,6 +386,10 @@ void thread_pool_free(ThreadPool *pool)
 
     qemu_mutex_unlock(&pool->lock);
 
+    if (pool->event_notifier_enabled) {
+        aio_set_event_notifier(pool->ctx, &pool->e, false, NULL, NULL);
+        event_notifier_cleanup(&pool->e);
+    }
     qemu_bh_delete(pool->completion_bh);
     qemu_sem_destroy(&pool->sem);
     qemu_cond_destroy(&pool->worker_stopped);
-- 
2.20.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]