qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 1/4] virtiofsd: process requests in a thread pool


From: Stefan Hajnoczi
Subject: [Qemu-devel] [PATCH 1/4] virtiofsd: process requests in a thread pool
Date: Thu, 1 Aug 2019 17:54:06 +0100

Introduce a thread pool so that fv_queue_thread() just pops
VuVirtqElements and hands them to the thread pool.  For the time being
only one worker thread is allowed since passthrough_ll.c is not
thread-safe yet.  Future patches will lift this restriction so that
multiple FUSE requests can be processed in parallel.

The main new concept is struct FVRequest, which contains both
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
fv_QueueInfo's fields are moved into FVRequest because they are
per-request.  The name FVRequest conforms to QEMU coding style and I
expect the struct fv_* types will be renamed in a future refactoring.

This patch series is not optimal.  fbuf reuse is dropped so each request
does malloc(se->bufsize), but there is no clean and cheap way to keep
this with a thread pool.  The vq_lock mutex is held for longer than
necessary, especially during the eventfd_write() syscall.  Performance
can be improved in the future.

prctl(2) had to be added to the seccomp whitelist because glib invokes
it.

Signed-off-by: Stefan Hajnoczi <address@hidden>
---
 contrib/virtiofsd/fuse_virtio.c | 491 ++++++++++++++++++--------------
 contrib/virtiofsd/seccomp.c     |   1 +
 2 files changed, 273 insertions(+), 219 deletions(-)

diff --git a/contrib/virtiofsd/fuse_virtio.c b/contrib/virtiofsd/fuse_virtio.c
index d543c6d30f..0c52911144 100644
--- a/contrib/virtiofsd/fuse_virtio.c
+++ b/contrib/virtiofsd/fuse_virtio.c
@@ -29,26 +29,40 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <glib.h>
 
 #include "contrib/libvhost-user/libvhost-user.h"
 
 struct fv_VuDev;
 struct fv_QueueInfo {
         pthread_t thread;
+
+        /* This lock protects the VuVirtq preventing races between
+         * fv_queue_thread() and fv_queue_worker().
+         */
+        pthread_mutex_t vq_lock;
+
         struct fv_VuDev *virtio_dev;
 
         /* Our queue index, corresponds to array position */
         int       qidx;
         int       kick_fd;
         int       kill_fd; /* For killing the thread */
-
-        /* The element for the command currently being processed */
-        VuVirtqElement *qe;
-        /* If any of the qe vec elements (towards vmm) are unmappable */
-        unsigned int elem_bad_in;
-        bool      reply_sent;
 };
 
+/* A FUSE request */
+typedef struct {
+        VuVirtqElement elem;
+        struct fuse_chan ch;
+
+        /* Number of unmappable iovecs */
+        unsigned int bad_in_num;
+        unsigned int bad_out_num;
+
+        /* Used to complete requests that involve no reply */
+        bool reply_sent;
+} FVRequest;
+
 /* We pass the dev element into libvhost-user
  * and then use it to get back to the outer
  * container for other data.
@@ -186,8 +200,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
                     struct iovec *iov, int count)
 {
-        VuVirtqElement *elem;
-        VuVirtq *q;
+        FVRequest *req = container_of(ch, FVRequest, ch);
+        struct fv_QueueInfo *qi = ch->qi;
+        VuDev *dev = &se->virtio_dev->dev;
+        VuVirtq *q = vu_get_queue(dev, qi->qidx);
+        VuVirtqElement *elem = &req->elem;
         int ret = 0;
 
         assert(count >= 1);
@@ -200,11 +217,7 @@ int virtio_send_msg(struct fuse_session *se, struct 
fuse_chan *ch,
 
         /* unique == 0 is notification, which we don't support */
         assert (out->unique);
-        /* For virtio we always have ch */
-        assert(ch);
-        assert(!ch->qi->reply_sent);
-        elem = ch->qi->qe;
-        q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+        assert(!req->reply_sent);
 
         /* The 'in' part of the elem is to qemu */
         unsigned int in_num = elem->in_num;
@@ -231,9 +244,15 @@ int virtio_send_msg(struct fuse_session *se, struct 
fuse_chan *ch,
         }
 
         copy_iov(iov, count, in_sg, in_num, tosend_len);
-        vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-        vu_queue_notify(&se->virtio_dev->dev, q);
-        ch->qi->reply_sent = true;
+
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+        pthread_mutex_lock(&qi->vq_lock);
+        vu_queue_push(dev, q, elem, tosend_len);
+        vu_queue_notify(dev, q);
+        pthread_mutex_unlock(&qi->vq_lock);
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+        req->reply_sent = true;
 
 err:
 
@@ -249,9 +268,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct 
fuse_chan *ch,
                          struct iovec *iov, int count,
                          struct fuse_bufvec *buf, size_t len)
 {
+        FVRequest *req = container_of(ch, FVRequest, ch);
+        struct fv_QueueInfo *qi = ch->qi;
+        VuDev *dev = &se->virtio_dev->dev;
+        VuVirtq *q = vu_get_queue(dev, qi->qidx);
+        VuVirtqElement *elem = &req->elem;
         int ret = 0;
-        VuVirtqElement *elem;
-        VuVirtq *q;
 
         assert(count >= 1);
         assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
@@ -271,15 +293,11 @@ int virtio_send_data_iov(struct fuse_session *se, struct 
fuse_chan *ch,
         /* unique == 0 is notification which we don't support */
         assert (out->unique);
 
-        /* For virtio we always have ch */
-        assert(ch);
-        assert(!ch->qi->reply_sent);
-        elem = ch->qi->qe;
-        q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+        assert(!req->reply_sent);
 
         /* The 'in' part of the elem is to qemu */
         unsigned int in_num = elem->in_num;
-        unsigned int bad_in_num = ch->qi->elem_bad_in;
+        unsigned int bad_in_num = req->bad_in_num;
         struct iovec *in_sg = elem->in_sg;
         size_t in_len = iov_length(in_sg, in_num);
         size_t in_len_writeable = iov_length(in_sg, in_num - bad_in_num);
@@ -423,16 +441,219 @@ int virtio_send_data_iov(struct fuse_session *se, struct 
fuse_chan *ch,
 
         ret = 0;
 
-        vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-        vu_queue_notify(&se->virtio_dev->dev, q);
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+        pthread_mutex_lock(&qi->vq_lock);
+        vu_queue_push(dev, q, elem, tosend_len);
+        vu_queue_notify(dev, q);
+        pthread_mutex_unlock(&qi->vq_lock);
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
 
 err:
         if (ret == 0)
-                ch->qi->reply_sent = true;
+                req->reply_sent = true;
 
         return ret;
 }
 
+/* Process one FVRequest in a thread pool */
+static void fv_queue_worker(gpointer data, gpointer user_data)
+{
+        struct fv_QueueInfo *qi = user_data;
+        struct fuse_session *se = qi->virtio_dev->se;
+        struct VuDev *dev = &qi->virtio_dev->dev;
+        FVRequest *req = data;
+        VuVirtqElement *elem = &req->elem;
+        struct fuse_buf fbuf = {};
+        bool allocated_bufv = false;
+        struct fuse_bufvec bufv;
+        struct fuse_bufvec *pbufv;
+
+        assert(se->bufsize > sizeof(struct fuse_in_header));
+
+        /* An element contains one request and the space to send our response
+         * They're spread over multiple descriptors in a scatter/gather set
+         * and we can't trust the guest to keep them still; so copy in/out.
+         */
+        fbuf.mem = malloc(se->bufsize);
+        assert(fbuf.mem);
+
+        fuse_mutex_init(&req->ch.lock);
+        req->ch.fd = (int)0xdaff0d111;
+        req->ch.ctr = 1;
+        req->ch.qi = qi;
+
+        /* The 'out' part of the elem is from qemu */
+        unsigned int out_num = elem->out_num;
+        unsigned int out_num_readable = out_num - req->bad_out_num;
+        struct iovec *out_sg = elem->out_sg;
+        size_t out_len = iov_length(out_sg, out_num);
+        size_t out_len_readable = iov_length(out_sg, out_num_readable);
+        if (se->debug)
+                fuse_debug("%s: elem %d: with %d out desc of length %zd"
+                           " bad_in_num=%u bad_out_num=%u\n",
+                           __func__, elem->index, out_num,
+                           out_len, req->bad_in_num, req->bad_out_num);
+
+        /* The elem should contain a 'fuse_in_header' (in to fuse)
+         * plus the data based on the len in the header.
+         */
+        if (out_len_readable < sizeof(struct fuse_in_header)) {
+                fuse_err("%s: elem %d too short for in_header\n",
+                                __func__, elem->index);
+                assert(0); // TODO
+        }
+        if (out_len > se->bufsize) {
+                fuse_err("%s: elem %d too large for buffer\n",
+                                __func__, elem->index);
+                assert(0); // TODO
+        }
+        // Copy just the first element and look at it
+        copy_from_iov(&fbuf, 1, out_sg);
+
+        pbufv = NULL; /* Compiler thinks an unitialised path */
+        if (req->bad_in_num || req->bad_out_num) {
+                bool handled_unmappable = false;
+
+                if (out_num > 2 && out_num_readable >= 2 && !req->bad_in_num &&
+                                out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
+                                ((struct fuse_in_header *)fbuf.mem)->opcode ==
+                                FUSE_WRITE &&
+                                out_sg[1].iov_len == sizeof(struct 
fuse_write_in)) {
+                        handled_unmappable = true;
+
+                        // copy the fuse_write_in header after the 
fuse_in_header
+                        fbuf.mem += out_sg->iov_len;
+                        copy_from_iov(&fbuf, 1, out_sg + 1);
+                        fbuf.mem -= out_sg->iov_len;
+                        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
+
+                        // Allocate the bufv, with space for the rest of the 
iov
+                        allocated_bufv = true;
+                        pbufv = malloc(sizeof(struct fuse_bufvec) +
+                                        sizeof(struct fuse_buf) * (out_num - 
2));
+
+                        pbufv->count = 1;
+                        pbufv->buf[0] = fbuf;
+
+                        size_t iovindex, pbufvindex;
+                        iovindex = 2; // 2 headers, separate iovs
+                        pbufvindex = 1; // 2 headers, 1 fusebuf
+
+                        for(; iovindex < out_num; iovindex++, pbufvindex++) {
+                                pbufv->count++;
+                                pbufv->buf[pbufvindex].pos = ~0; // Dummy
+                                pbufv->buf[pbufvindex].flags =
+                                        (iovindex < out_num_readable) ?
+                                        0 : FUSE_BUF_PHYS_ADDR;
+                                pbufv->buf[pbufvindex].mem = 
out_sg[iovindex].iov_base;
+                                pbufv->buf[pbufvindex].size = 
out_sg[iovindex].iov_len;
+                        }
+                }
+
+                if (out_num == 2 && out_num_readable == 2 && req->bad_in_num &&
+                                out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
+                                ((struct fuse_in_header *)fbuf.mem)->opcode ==
+                                FUSE_READ &&
+                                out_sg[1].iov_len == sizeof(struct 
fuse_read_in)) {
+                        if (se->debug) {
+                                fuse_debug("Unmappable read case "
+                                           "in_num=%d bad_in_num=%d\n",
+                                           elem->in_num, req->bad_in_num);
+                        }
+                        handled_unmappable = true;
+                }
+
+                if (!handled_unmappable) {
+                        fuse_err("Unhandled unmappable element: out: %d(b:%d) 
in: %d(b:%d)",
+                                 out_num, req->bad_out_num,
+                                 elem->in_num, req->bad_in_num);
+                        fv_panic(dev, "Unhandled unmappable element");
+                }
+        }
+
+        if (!req->bad_out_num) {
+                if (out_num > 2 &&
+                                out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
+                                ((struct fuse_in_header *)fbuf.mem)->opcode ==
+                                FUSE_WRITE &&
+                                out_sg[1].iov_len == sizeof(struct 
fuse_write_in)) {
+                        // For a write we don't actually need to copy the
+                        // data, we can just do it straight out of guest memory
+                        // but we must sitll copy the headers in case the guest
+                        // was nasty and changed them while we were using them.
+                        if (se->debug)
+                                fuse_debug("%s: Write special case\n", 
__func__);
+
+                        // copy the fuse_write_in header afte rthe 
fuse_in_header
+                        fbuf.mem += out_sg->iov_len;
+                        copy_from_iov(&fbuf, 1, out_sg + 1);
+                        fbuf.mem -= out_sg->iov_len;
+                        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
+
+                        // Allocate the bufv, with space for the rest of the 
iov
+                        allocated_bufv = true;
+                        pbufv = malloc(sizeof(struct fuse_bufvec) +
+                                        sizeof(struct fuse_buf) * (out_num - 
2));
+
+                        pbufv->count = 1;
+                        pbufv->buf[0] = fbuf;
+
+                        size_t iovindex, pbufvindex;
+                        iovindex = 2; // 2 headers, separate iovs
+                        pbufvindex = 1; // 2 headers, 1 fusebuf
+
+                        for(; iovindex < out_num; iovindex++, pbufvindex++) {
+                                pbufv->count++;
+                                pbufv->buf[pbufvindex].pos = ~0; // Dummy
+                                pbufv->buf[pbufvindex].flags = 0;
+                                pbufv->buf[pbufvindex].mem = 
out_sg[iovindex].iov_base;
+                                pbufv->buf[pbufvindex].size = 
out_sg[iovindex].iov_len;
+                        }
+                } else {
+                        // Normal (non fast write) path
+
+                        // Copy the rest of the buffer
+                        fbuf.mem += out_sg->iov_len;
+                        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
+                        fbuf.mem -= out_sg->iov_len;
+                        fbuf.size = out_len;
+
+                        // TODO! Endianness of header
+
+                        // TODO: Add checks for fuse_session_exited
+                        bufv.buf[0] = fbuf;
+                        bufv.count = 1;
+                        pbufv = &bufv;
+                }
+        }
+        pbufv->idx = 0;
+        pbufv->off = 0;
+        fuse_session_process_buf_int(se, pbufv, &req->ch);
+
+        if (allocated_bufv) free(pbufv);
+
+        /* If the request has no reply, still recycle the virtqueue element */
+        if (!req->reply_sent) {
+                struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
+
+                if (se->debug) {
+                        fuse_debug("%s: elem %d no reply sent\n",
+                                   __func__, elem->index);
+                }
+
+                pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+                pthread_mutex_lock(&qi->vq_lock);
+                vu_queue_push(dev, q, elem, 0);
+                vu_queue_notify(dev, q);
+                pthread_mutex_unlock(&qi->vq_lock);
+                pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+        }
+
+        pthread_mutex_destroy(&req->ch.lock);
+        free(fbuf.mem);
+        free(req);
+}
+
 /* Thread function for individual queues, created when a queue is 'started' */
 static void *fv_queue_thread(void *opaque)
 {
@@ -440,16 +661,14 @@ static void *fv_queue_thread(void *opaque)
         struct VuDev        *dev = &qi->virtio_dev->dev;
         struct VuVirtq      *q = vu_get_queue(dev, qi->qidx);
         struct fuse_session *se = qi->virtio_dev->se;
-        struct fuse_chan    ch;
-        struct fuse_buf     fbuf;
+        GThreadPool *pool;
 
-        fbuf.mem = NULL;
-        fbuf.flags = 0;
-
-        fuse_mutex_init(&ch.lock);
-        ch.fd = (int)0xdaff0d111;
-        ch.ctr = 1;
-        ch.qi = qi;
+        pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
+                        TRUE, NULL);
+        if (!pool) {
+                fuse_err("%s: g_thread_pool_new failed\n", __func__);
+                return NULL;
+        }
 
         fuse_info("%s: Start for queue %d kick_fd %d\n",
                   __func__, qi->qidx, qi->kick_fd);
@@ -507,6 +726,8 @@ static void *fv_queue_thread(void *opaque)
                ret = 
pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
                assert(ret == 0); /* there is no possible error case */
 
+               pthread_mutex_lock(&qi->vq_lock);
+
                if (se->debug) {
                        /* out is from guest, in is too guest */
                        unsigned int in_bytes, out_bytes;
@@ -518,198 +739,26 @@ static void *fv_queue_thread(void *opaque)
                }
 
                while (1) {
-                       bool allocated_bufv = false;
-                       struct fuse_bufvec bufv;
-                       struct fuse_bufvec *pbufv;
                        unsigned int bad_in_num = 0, bad_out_num = 0;
-
-                       /* An element contains one request and the space to 
send our response
-                        * They're spread over multiple descriptors in a 
scatter/gather set
-                        * and we can't trust the guest to keep them still; so 
copy in/out.
-                        */
-                       VuVirtqElement *elem = vu_queue_pop(dev, q, 
sizeof(VuVirtqElement),
-                                                           &bad_in_num, 
&bad_out_num);
-                       if (!elem) {
+                       FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest),
+                                                     &bad_in_num,
+                                                     &bad_out_num);
+                       if (!req) {
                                break;
                        }
 
-                       qi->qe = elem;
-                       qi->reply_sent = false;
-                       qi->elem_bad_in = bad_in_num;
+                       req->reply_sent = false;
+                       req->bad_in_num = bad_in_num;
+                       req->bad_out_num = bad_out_num;
 
-                       if (!fbuf.mem) {
-                               fbuf.mem = malloc(se->bufsize);
-                               assert(fbuf.mem);
-                               assert(se->bufsize > sizeof(struct 
fuse_in_header));
-                       }
-                       /* The 'out' part of the elem is from qemu */
-                       unsigned int out_num = elem->out_num;
-                       unsigned int out_num_readable = out_num - bad_out_num;
-                       struct iovec *out_sg = elem->out_sg;
-                       size_t out_len = iov_length(out_sg, out_num);
-                       size_t out_len_readable = iov_length(out_sg, 
out_num_readable);
-                       if (se->debug)
-                               fuse_debug("%s: elem %d: with %d out desc of 
length %zd"
-                                          " bad_in_num=%u bad_out_num=%u\n",
-                                         __func__, elem->index, out_num,
-                                         out_len, bad_in_num, bad_out_num);
-
-                       /* The elem should contain a 'fuse_in_header' (in to 
fuse)
-                        * plus the data based on the len in the header.
-                        */
-                       if (out_len_readable < sizeof(struct fuse_in_header)) {
-                               fuse_err("%s: elem %d too short for 
in_header\n",
-                                        __func__, elem->index);
-                               assert(0); // TODO
-                       }
-                       if (out_len > se->bufsize) {
-                               fuse_err("%s: elem %d too large for buffer\n",
-                                        __func__, elem->index);
-                               assert(0); // TODO
-                       }
-                       // Copy just the first element and look at it
-                       copy_from_iov(&fbuf, 1, out_sg);
-
-                       pbufv = NULL; /* Compiler thinks an unitialised path */
-                       if (bad_in_num || bad_out_num) {
-                           bool handled_unmappable = false;
-
-                           if (out_num > 2 && out_num_readable >= 2 && 
!bad_in_num &&
-                               out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
-                               ((struct fuse_in_header *)fbuf.mem)->opcode ==
-                                   FUSE_WRITE &&
-                               out_sg[1].iov_len == sizeof(struct 
fuse_write_in)) {
-                               handled_unmappable = true;
-
-                               // copy the fuse_write_in header after the 
fuse_in_header
-                               fbuf.mem += out_sg->iov_len;
-                               copy_from_iov(&fbuf, 1, out_sg + 1);
-                               fbuf.mem -= out_sg->iov_len;
-                               fbuf.size = out_sg[0].iov_len + 
out_sg[1].iov_len;
-
-                               // Allocate the bufv, with space for the rest 
of the iov
-                               allocated_bufv = true;
-                               pbufv = malloc(sizeof(struct fuse_bufvec) +
-                                              sizeof(struct fuse_buf) * 
(out_num - 2));
-
-                               pbufv->count = 1;
-                               pbufv->buf[0] = fbuf;
-
-                               size_t iovindex, pbufvindex;
-                               iovindex = 2; // 2 headers, separate iovs
-                               pbufvindex = 1; // 2 headers, 1 fusebuf
-
-                               for(; iovindex < out_num; iovindex++, 
pbufvindex++) {
-                                       pbufv->count++;
-                                       pbufv->buf[pbufvindex].pos = ~0; // 
Dummy
-                                       pbufv->buf[pbufvindex].flags =
-                                               (iovindex < out_num_readable) ?
-                                               0 : FUSE_BUF_PHYS_ADDR;
-                                       pbufv->buf[pbufvindex].mem = 
out_sg[iovindex].iov_base;
-                                       pbufv->buf[pbufvindex].size = 
out_sg[iovindex].iov_len;
-                               }
-                           }
-
-                           if (out_num == 2 && out_num_readable == 2 && 
bad_in_num &&
-                               out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
-                               ((struct fuse_in_header *)fbuf.mem)->opcode ==
-                                   FUSE_READ &&
-                               out_sg[1].iov_len == sizeof(struct 
fuse_read_in)) {
-                               if (se->debug) {
-                                   fuse_debug("Unmappable read case "
-                                              "in_num=%d bad_in_num=%d\n",
-                                              elem->in_num, bad_in_num);
-                               }
-                               handled_unmappable = true;
-                           }
-
-                           if (!handled_unmappable) {
-                               fuse_err("Unhandled unmappable element: out: 
%d(b:%d) in: %d(b:%d)",
-                                        out_num, bad_out_num,
-                                        elem->in_num, bad_in_num);
-                               fv_panic(dev, "Unhandled unmappable element");
-                           }
-                       }
-
-                       if (!bad_out_num) {
-                           if (out_num > 2 &&
-                               out_sg[0].iov_len == sizeof(struct 
fuse_in_header) &&
-                               ((struct fuse_in_header *)fbuf.mem)->opcode ==
-                                   FUSE_WRITE &&
-                               out_sg[1].iov_len == sizeof(struct 
fuse_write_in)) {
-                                   // For a write we don't actually need to 
copy the
-                                   // data, we can just do it straight out of 
guest memory
-                                   // but we must sitll copy the headers in 
case the guest
-                                   // was nasty and changed them while we were 
using them.
-                                   if (se->debug)
-                                           fuse_debug("%s: Write special 
case\n", __func__);
-
-                                   // copy the fuse_write_in header afte rthe 
fuse_in_header
-                                   fbuf.mem += out_sg->iov_len;
-                                   copy_from_iov(&fbuf, 1, out_sg + 1);
-                                   fbuf.mem -= out_sg->iov_len;
-                                   fbuf.size = out_sg[0].iov_len + 
out_sg[1].iov_len;
-
-                                   // Allocate the bufv, with space for the 
rest of the iov
-                                   allocated_bufv = true;
-                                   pbufv = malloc(sizeof(struct fuse_bufvec) +
-                                                  sizeof(struct fuse_buf) * 
(out_num - 2));
-
-                                   pbufv->count = 1;
-                                   pbufv->buf[0] = fbuf;
-
-                                   size_t iovindex, pbufvindex;
-                                   iovindex = 2; // 2 headers, separate iovs
-                                   pbufvindex = 1; // 2 headers, 1 fusebuf
-
-                                   for(; iovindex < out_num; iovindex++, 
pbufvindex++) {
-                                           pbufv->count++;
-                                           pbufv->buf[pbufvindex].pos = ~0; // 
Dummy
-                                           pbufv->buf[pbufvindex].flags = 0;
-                                           pbufv->buf[pbufvindex].mem = 
out_sg[iovindex].iov_base;
-                                           pbufv->buf[pbufvindex].size = 
out_sg[iovindex].iov_len;
-                                   }
-                           } else {
-                                   // Normal (non fast write) path
-
-                                   // Copy the rest of the buffer
-                                   fbuf.mem += out_sg->iov_len;
-                                   copy_from_iov(&fbuf, out_num - 1, out_sg + 
1);
-                                   fbuf.mem -= out_sg->iov_len;
-                                   fbuf.size = out_len;
-
-                                   // TODO! Endianness of header
-
-                                   // TODO: Add checks for fuse_session_exited
-                                   bufv.buf[0] = fbuf;
-                                   bufv.count = 1;
-                                   pbufv = &bufv;
-                           }
-                       }
-                       pbufv->idx = 0;
-                       pbufv->off = 0;
-                       fuse_session_process_buf_int(se, pbufv, &ch);
-
-                       if (allocated_bufv) free(pbufv);
-
-                       if (!qi->reply_sent) {
-                              if (se->debug) {
-                                      fuse_debug("%s: elem %d no reply sent\n",
-                                                 __func__, elem->index);
-                              }
-                               /* I think we've still got to recycle the 
element */
-                               vu_queue_push(dev, q, elem, 0);
-                               vu_queue_notify(dev, q);
-                       }
-                       qi->qe = NULL;
-                       free(elem);
-                       elem = NULL;
+                       g_thread_pool_push(pool, req, NULL);
                 }
 
+                pthread_mutex_unlock(&qi->vq_lock);
                 pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
         }
-        pthread_mutex_destroy(&ch.lock);
-        free(fbuf.mem);
+
+        g_thread_pool_free(pool, FALSE, TRUE);
 
         return NULL;
 }
@@ -760,6 +809,9 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool 
started)
 
                 ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
                 assert(ourqi->kill_fd != -1);
+
+                pthread_mutex_init(&ourqi->vq_lock, NULL);
+
                 if (pthread_create(&ourqi->thread, NULL,  fv_queue_thread,
                                    ourqi)) {
                         fuse_err("%s: Failed to create thread for queue %d\n",
@@ -780,6 +832,7 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool 
started)
                        fuse_err("%s: Failed to join thread idx %d err %d\n",
                                 __func__, qidx, ret);
                 }
+                pthread_mutex_destroy(&ourqi->vq_lock);
                 close(ourqi->kill_fd);
                 ourqi->kick_fd = -1;
                 free(vud->qi[qidx]);
diff --git a/contrib/virtiofsd/seccomp.c b/contrib/virtiofsd/seccomp.c
index cea4cc5f60..5f1c873b82 100644
--- a/contrib/virtiofsd/seccomp.c
+++ b/contrib/virtiofsd/seccomp.c
@@ -58,6 +58,7 @@ static const int syscall_whitelist[] = {
        SCMP_SYS(open),
        SCMP_SYS(openat),
        SCMP_SYS(ppoll),
+       SCMP_SYS(prctl), /* TODO restrict to just PR_SET_NAME? */
        SCMP_SYS(preadv),
        SCMP_SYS(pwrite64),
        SCMP_SYS(read),
-- 
2.21.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]