[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threa
From: |
Arun R Bharadwaj |
Subject: |
[Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threadlets |
Date: |
Tue, 19 Oct 2010 23:13:50 +0530 |
User-agent: |
StGit/0.15 |
From: Gautham R Shenoy <address@hidden>
Add helper functions to enable virtio-9p make use of the threadlets
infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.
Signed-off-by: Gautham R Shenoy <address@hidden>
Signed-off-by: Sripathi Kodi <address@hidden>
Signed-off-by: Arun R Bharadwaj <address@hidden>
---
hw/virtio-9p.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 33 +++-------
qemu-threadlets.c | 21 +++++++
qemu-threadlets.h | 1
vl.c | 3 +
5 files changed, 200 insertions(+), 23 deletions(-)
diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index a871685..174300d 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-debug.h"
#include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
int debug_9p_pdu;
@@ -33,6 +34,146 @@ enum {
Oappend = 0x80,
};
+struct v9fs_post_op {
+ QTAILQ_ENTRY(v9fs_post_op) node;
+ void (*func)(void *arg);
+ void *arg;
+};
+
+static struct {
+ int rfd;
+ int wfd;
+ QemuMutex lock;
+ QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
+}
+
+static void die(const char *what)
+{
+ die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS 5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into
whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+ int count = 0;
+ struct v9fs_post_op *post_op;
+ int ret;
+ char byte;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ do {
+ ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+ } while (ret >= 0 && errno != EAGAIN);
+
+ for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+ if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+ break;
+ }
+ post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+ QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+ post_op->func(post_op->arg);
+ qemu_free(post_op);
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ }
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+ char byte = 0;
+ ssize_t ret;
+ int tries = 0;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ do {
+ assert(tries != 100);
+ ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+ tries++;
+ } while (ret < 0 && errno == EAGAIN);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ if (ret < 0 && errno != EAGAIN)
+ die("write() in v9fs");
+
+ if (kill(getpid(), SIGUSR2)) die("kill failed");
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ * the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+ struct v9fs_post_op *post_op;
+
+ post_op = qemu_mallocz(sizeof(*post_op));
+ post_op->func = func;
+ post_op->arg = arg;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs
to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+ void (*posix_fn)(ThreadletWork *work),
+ void (**post_fn_ptr)(void *arg),
+ void (*post_fn)(void *arg))
+{
+ *post_fn_ptr = post_fn;
+ work->func = posix_fn;
+ submit_threadletwork(work);
+}
+
static int omode_to_uflags(int8_t mode)
{
int ret = 0;
@@ -3639,7 +3780,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf
*conf)
int i, len;
struct stat stat;
FsTypeEntry *fse;
-
+ int fds[2];
s = (V9fsState *)virtio_common_init("virtio-9p",
VIRTIO_ID_9P,
@@ -3722,5 +3863,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf
*conf)
s->tag_len;
s->vdev.get_config = virtio_9p_get_config;
+ if (qemu_pipe(fds) == -1) {
+ fprintf(stderr, "failed to create fd's for virtio-9p\n");
+ exit(1);
+ }
+
+ v9fs_async_struct.rfd = fds[0];
+ v9fs_async_struct.wfd = fds[1];
+
+ printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+ printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+ qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+ QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+ qemu_mutex_init(&(v9fs_async_struct.lock));
+ /* Create async queue. */
+
+ (void)v9fs_do_async_posix;
+ (void)v9fs_async_helper_done;
+
return &s->vdev;
}
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6977c18..dbf9321 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -261,6 +261,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
+static PosixAioState *posix_aio_state;
+
static void aio_thread(ThreadletWork *work)
{
pid_t pid;
@@ -289,6 +291,15 @@ static void aio_thread(ThreadletWork *work)
aiocb->ret = ret;
+ if (posix_aio_state) {
+ char byte = 0;
+ ssize_t ret;
+
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN)
+ die("write()");
+ }
+
if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
@@ -401,22 +412,6 @@ static int posix_aio_flush(void *opaque)
return !!s->first_aio;
}
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
-
- qemu_service_io();
-}
-
static void paio_remove(struct qemu_paiocb *acb)
{
struct qemu_paiocb **pacb;
@@ -520,7 +515,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
@@ -529,11 +523,6 @@ int paio_init(void)
s = qemu_malloc(sizeof(PosixAioState));
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index fd33752..df02f4e 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -15,12 +15,28 @@
#include "qemu-threadlets.h"
#include "osdep.h"
+#include <signal.h>
#define MAX_GLOBAL_THREADS 64
#define MIN_GLOBAL_THREADS 64
static ThreadletQueue globalqueue;
static int globalqueue_init;
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
static void *threadlet_worker(void *data)
{
ThreadletQueue *queue = data;
@@ -163,3 +179,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
qemu_mutex_init(&(queue->lock));
qemu_cond_init(&(queue->cond));
}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 9c8f9e5..3de589b 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@ extern int cancel_threadletwork_on_queue(ThreadletQueue
*queue,
extern int cancel_threadletwork(ThreadletWork *work);
extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
int min_threads);
+extern void threadlet_init(void);
#endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
+#include "qemu-threadlets.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
exit(1);
}
+ threadlet_init();
+
/* init generic devices */
if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1)
!= 0)
exit(1);