qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 03/25] nbd: switch to asynchronous operation


From: Paolo Bonzini
Subject: [Qemu-devel] [PATCH 03/25] nbd: switch to asynchronous operation
Date: Tue, 6 Dec 2011 16:27:30 +0100

Signed-off-by: Paolo Bonzini <address@hidden>
---
 block/nbd.c |  188 ++++++++++++++++++++++++++++++++++++++--------------------
 nbd.c       |    8 +++
 2 files changed, 131 insertions(+), 65 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 882b2dc..9af939d 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -47,13 +47,17 @@
 #endif
 
 typedef struct BDRVNBDState {
-    CoMutex lock;
     int sock;
     uint32_t nbdflags;
     off_t size;
     size_t blocksize;
     char *export_name; /* An NBD server may export several devices */
 
+    CoMutex mutex;
+    Coroutine *coroutine;
+
+    struct nbd_reply reply;
+
     /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
      * it's a string of the form <hostname|ip4|\[ip6\]>:port
      */
@@ -106,6 +110,95 @@ out:
     return err;
 }
 
+static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
+{
+    qemu_co_mutex_lock(&s->mutex);
+    s->coroutine = qemu_coroutine_self();
+    request->handle = (uint64_t)(intptr_t)s;
+}
+
+static int nbd_have_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    return !!s->coroutine;
+}
+
+static void nbd_reply_ready(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    if (s->reply.handle == 0) {
+        /* No reply already in flight.  Fetch a header.  */
+        if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+            s->reply.handle = 0;
+        }
+    }
+
+    /* There's no need for a mutex on the receive side, because the
+     * handler acts as a synchronization point and ensures that only
+     * one coroutine is called until the reply finishes.  */
+    if (s->coroutine) {
+        qemu_coroutine_enter(s->coroutine, NULL);
+    }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    qemu_coroutine_enter(s->coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
+                               struct iovec *iov, int offset)
+{
+    int rc, ret;
+
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
+                            nbd_have_request, NULL, s);
+    rc = nbd_send_request(s->sock, request);
+    if (rc != -1 && iov) {
+        ret = qemu_co_sendv(s->sock, iov, request->len, offset);
+        if (ret != request->len) {
+            errno = -EIO;
+            rc = -1;
+        }
+    }
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+                            nbd_have_request, NULL, s);
+    return rc;
+}
+
+static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
+                                 struct nbd_reply *reply,
+                                 struct iovec *iov, int offset)
+{
+    int ret;
+
+    /* Wait until we're woken up by the read handler.  */
+    qemu_coroutine_yield();
+    *reply = s->reply;
+    if (reply->handle != request->handle) {
+        reply->error = EIO;
+    } else {
+        if (iov && reply->error == 0) {
+            ret = qemu_co_recvv(s->sock, iov, request->len, offset);
+            if (ret != request->len) {
+                reply->error = EIO;
+            }
+        }
+
+        /* Tell the read handler to read another header.  */
+        s->reply.handle = 0;
+    }
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
+{
+    s->coroutine = NULL;
+    qemu_co_mutex_unlock(&s->mutex);
+}
+
 static int nbd_establish_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
@@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs)
         return -errno;
     }
 
-    /* Now that we're connected, set the socket to be non-blocking */
+    /* Now that we're connected, set the socket to be non-blocking and
+     * kick the reply mechanism.  */
     socket_set_nonblock(sock);
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+                            nbd_have_request, NULL, s);
 
     s->sock = sock;
     s->size = size;
@@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     struct nbd_request request;
 
     request.type = NBD_CMD_DISC;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = 0;
     request.len = 0;
     nbd_send_request(s->sock, &request);
 
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
     closesocket(s->sock);
 }
 
@@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* 
filename, int flags)
     BDRVNBDState *s = bs->opaque;
     int result;
 
+    qemu_co_mutex_init(&s->mutex);
+
     /* Pop the config into our state object. Exit if invalid. */
     result = nbd_config(s, filename, flags);
     if (result != 0) {
@@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* 
filename, int flags)
      */
     result = nbd_establish_connection(bs);
 
-    qemu_co_mutex_init(&s->lock);
     return result;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
     struct nbd_reply reply;
 
     request.type = NBD_CMD_READ;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
-        return -EIO;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 
-    return 0;
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                         int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
     struct nbd_reply reply;
 
     request.type = NBD_CMD_WRITE;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
-        return -EIO;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    return 0;
-}
-
-static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num,
-                                    uint8_t *buf, int nb_sectors)
-{
-    int ret;
-    BDRVNBDState *s = bs->opaque;
-    qemu_co_mutex_lock(&s->lock);
-    ret = nbd_read(bs, sector_num, buf, nb_sectors);
-    qemu_co_mutex_unlock(&s->lock);
-    return ret;
-}
-
-static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num,
-                                     const uint8_t *buf, int nb_sectors)
-{
-    int ret;
-    BDRVNBDState *s = bs->opaque;
-    qemu_co_mutex_lock(&s->lock);
-    ret = nbd_write(bs, sector_num, buf, nb_sectors);
-    qemu_co_mutex_unlock(&s->lock);
-    return ret;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 }
 
 static void nbd_close(BlockDriverState *bs)
@@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = {
     .format_name       = "nbd",
     .instance_size     = sizeof(BDRVNBDState),
     .bdrv_file_open    = nbd_open,
-    .bdrv_read          = nbd_co_read,
-    .bdrv_write         = nbd_co_write,
+    .bdrv_co_readv     = nbd_co_readv,
+    .bdrv_co_writev    = nbd_co_writev,
     .bdrv_close                = nbd_close,
     .bdrv_getlength    = nbd_getlength,
     .protocol_name     = "nbd",
diff --git a/nbd.c b/nbd.c
index e6c931c..3f82db3 100644
--- a/nbd.c
+++ b/nbd.c
@@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool 
do_read)
 {
     size_t offset = 0;
 
+    if (qemu_in_coroutine()) {
+        if (do_read) {
+            return qemu_co_recv(fd, buffer, size);
+        } else {
+            return qemu_co_send(fd, buffer, size);
+        }
+    }
+
     while (offset < size) {
         ssize_t len;
 
-- 
1.7.7.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]