qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH V4 2/3] qemu: Generic task offloading framework: thr


From: Gautham R Shenoy
Subject: [Qemu-devel] [PATCH V4 2/3] qemu: Generic task offloading framework: threadlets
Date: Wed, 16 Jun 2010 17:26:56 +0530
User-agent: StGit/0.15-51-gc750

From: Aneesh Kumar K.V <address@hidden>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets. The core idea has been borrowed from the threading framework that
is being used by paio.

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue.
API's that allow a subsystem to wait till all the earlier queued tasks have been
executed, is also provided.

address@hidden: Facelift of the code, cancel_threadlet,
flush_threadlet_queue and other minor helpers.]

Signed-off-by: Aneesh Kumar K.V <address@hidden>
Signed-off-by: Gautham R Shenoy <address@hidden>
---
 Makefile.objs |    3 +
 async-work.c  |  186 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 async-work.h  |   69 +++++++++++++++++++++
 3 files changed, 257 insertions(+), 1 deletions(-)
 create mode 100644 async-work.c
 create mode 100644 async-work.h

diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..019646f 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += qemu-thread.o
+block-obj-y += async-work.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -109,7 +111,6 @@ common-obj-y += iov.o
 common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
 common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
 common-obj-$(CONFIG_COCOA) += cocoa.o
-common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/async-work.c b/async-work.c
new file mode 100644
index 0000000..50e39ce
--- /dev/null
+++ b/async-work.c
@@ -0,0 +1,186 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ * Generalization based on posix-aio emulation code.
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori   <address@hidden>
+ *  Aneesh Kumar K.V <address@hidden>
+ *  Gautham R Shenoy <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+#include "async-work.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS  64
+ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+        qemu_mutex_lock(&(queue->lock));
+
+        while (QTAILQ_EMPTY(&(queue->request_list)) &&
+               (ret != ETIMEDOUT)) {
+            ret = qemu_cond_timedwait(&(queue->cond),
+                                        &(queue->lock), 10*100000);
+        }
+
+        if (QTAILQ_EMPTY(&(queue->request_list)))
+            goto check_exit;
+
+        work = QTAILQ_FIRST(&(queue->request_list));
+        QTAILQ_REMOVE(&(queue->request_list), work, node);
+        queue->idle_threads--;
+        qemu_mutex_unlock(&(queue->lock));
+
+        /* execute the work function */
+        work->func(work);
+
+        qemu_mutex_lock(&(queue->lock));
+        queue->idle_threads++;
+
+check_exit:
+        if (queue->exit || ((queue->idle_threads > 0) &&
+            (queue->cur_threads > queue->min_threads))) {
+            /* We exit the queue or we retain minimum number of threads */
+            break;
+        }
+        qemu_mutex_unlock(&(queue->lock));
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    if (queue->exit) {
+        qemu_mutex_unlock(&(queue->lock));
+        qemu_barrier_wait(&queue->barr);
+    } else
+        qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * threadlet_submit: Submit a new task to be executed asynchronously.
+ * @queue: Queue to which the new task needs to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void threadlet_submit(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&(queue->lock));
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    }
+    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+    qemu_mutex_unlock(&(queue->lock));
+    qemu_cond_signal(&(queue->cond));
+}
+
+/**
+ * threadlet_submit_common: Submit to the global queue a new task to be
+ *                          executed asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void threadlet_submit_common(ThreadletWork *work)
+{
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    threadlet_submit(&globalqueue, work);
+}
+
+/**
+ * flush_threadlet_queue: Wait till completion of all the submitted tasks
+ * @queue: Queue containing the tasks we're waiting on.
+ */
+void flush_threadlet_queue(ThreadletQueue *queue)
+{
+    qemu_mutex_lock(&queue->lock);
+    queue->exit = 1;
+
+    qemu_barrier_init(&queue->barr, queue->cur_threads + 1);
+    qemu_mutex_unlock(&queue->lock);
+
+    qemu_barrier_wait(&queue->barr);
+}
+
+/**
+ * flush_common_threadlet_queue: Wait till completion of all the
+ *                               submitted tasks
+ * @queue: Queue containing the tasks we're waiting on.
+ */
+void flush_common_threadlet_queue(void)
+{
+    flush_threadlet_queue(&globalqueue);
+}
+
+/**
+ * cancel_threadlet: Cancel a queued task.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int found = 0;
+
+    qemu_mutex_lock(&(queue->lock));
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+            found = 1;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&(queue->lock));
+
+    if (found) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/**
+ * cancel_threadlet_common: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadlet_common(ThreadletWork *work)
+{
+    return cancel_threadlet(&globalqueue, work);
+}
diff --git a/async-work.h b/async-work.h
new file mode 100644
index 0000000..36d19fa
--- /dev/null
+++ b/async-work.h
@@ -0,0 +1,69 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ * Generalization based on posix-aio emulation code.
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori   <address@hidden>
+ *  Aneesh Kumar K.V <address@hidden>
+ *  Gautham R Shenoy <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    QemuBarrier barr;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    int exit;
+    QTAILQ_HEAD(, threadlet_work) request_list;
+    QTAILQ_HEAD(, threadlet_work) threadlet_work_pool;
+} ThreadletQueue;
+
+typedef struct threadlet_work
+{
+    QTAILQ_ENTRY(threadlet_work) node;
+    void (*func)(struct threadlet_work *work);
+} ThreadletWork;
+
+static inline void threadlet_queue_init(ThreadletQueue *queue,
+                                   int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->exit = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&(queue->request_list));
+    QTAILQ_INIT(&(queue->threadlet_work_pool));
+    qemu_mutex_init(&(queue->lock));
+    qemu_cond_init(&(queue->cond));
+}
+
+extern void threadlet_submit(ThreadletQueue *queue,
+                                 ThreadletWork *work);
+
+extern void threadlet_submit_common(ThreadletWork *work);
+
+extern int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work);
+extern int cancel_threadlet_common(ThreadletWork *work);
+
+
+extern void flush_threadlet_queue(ThreadletQueue *queue);
+extern void flush_common_threadlet_queue(void);
+#endif




reply via email to

[Prev in Thread] Current Thread [Next in Thread]