qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v?4?] NBD: asynchronous I/O with timeout & reconnect


From: Nicholas Thomas
Subject: [Qemu-devel] [PATCH v?4?] NBD: asynchronous I/O with timeout & reconnection behaviour
Date: Mon, 15 Aug 2011 12:20:20 +0100

This patch converts the NBD block driver to the asynchronous I/O
API and gives (currently not configurable) timeout and reconnect
behaviour too.

All reads and writes are done asynchronously. We expect a request +
response to take no more than five seconds. If requests time out
or the connection to the NBD server is broken for any reason, we
attempt to reconnect once per second until successful - preserving
the list of outstanding I/O requests.


Typically, the guest sees this as I/O hanging until the reconnection
succeeds, which is the same
behaviour as before this patch. The request-cancelling code does
work in this revision, however.

Read and write requests are now split up to fit into 1MiB blocks.
This is a limit that seems to be imposed by most (all?) NBD servers,
including the one shipped with QEMU.

Signed-off-by: Nick Thomas <address@hidden>
---
 Makefile.objs |    4 +-
 block/nbd.c   | 1064 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 nbd.c         |  209 ++++--------
 nbd.h         |   20 +-
 4 files changed, 1066 insertions(+), 231 deletions(-)

diff --git a/Makefile.objs b/Makefile.objs
index 23b17ce..d475e3b 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -14,7 +14,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o
 # block-obj-y is code used by both qemu system emulation and qemu-img
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
-block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o
+block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o qemu-sockets.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -86,7 +86,7 @@ common-obj-$(CONFIG_SSI_SD) += ssi-sd.o
 common-obj-$(CONFIG_SD) += sd.o
 common-obj-y += bt.o bt-host.o bt-vhci.o bt-l2cap.o bt-sdp.o bt-hci.o bt-hid.o 
usb-bt.o
 common-obj-y += bt-hci-csr.o
-common-obj-y += buffered_file.o migration.o migration-tcp.o qemu-sockets.o
+common-obj-y += buffered_file.o migration.o migration-tcp.o
 common-obj-y += qemu-char.o savevm.o #aio.o
 common-obj-y += msmouse.o ps2.o
 common-obj-y += qdev.o qdev-properties.o
diff --git a/block/nbd.c b/block/nbd.c
index c8dc763..7ec57d9 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -6,6 +6,7 @@
  *
  * Some parts:
  *    Copyright (C) 2007 Anthony Liguori <address@hidden>
+ *    Copyright (C) 2011 Nicholas Thomas <address@hidden>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to 
deal
@@ -29,170 +30,1054 @@
 #include "qemu-common.h"
 #include "nbd.h"
 #include "module.h"
+#include "qemu_socket.h"
 
 #include <sys/types.h>
 #include <unistd.h>
+#include <signal.h>
+#include <time.h>
 
 #define EN_OPTSTR ":exportname="
+#define SECTOR_SIZE 512
+#define SIG_NBD_RECON (SIGRTMIN+5)
 
-typedef struct BDRVNBDState {
+/* Seconds we allow for requests on the wire */
+#define NBD_SOCKET_TIMEOUT 5
+
+/* 1MiB minus header size */
+#define NBD_MAX_READ      ((1024*1024) - sizeof(NBDReply))
+#define NBD_MAX_WRITE     ((1024*1024) - sizeof(NBDRequest))
+
+/* #define DEBUG_NBD */
+
+#if defined(DEBUG_NBD)
+#define logout(fmt, ...) \
+                fprintf(stderr, "nbd\t%-24s" fmt, __func__, ##__VA_ARGS__)
+#else
+#define logout(fmt, ...) ((void)0)
+#endif
+
+/*
+ * Here's how the I/O works.
+ * qemu creates a BDRVNBDState for us, which is the context for all reads
+ * and writes.
+ *
+ * nbd_open is called to connect to the NBD server and set up an on-read
+ * handler (nbd_aio_read_response)
+ *
+ * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing the
+ * I/O request to qemu).
+ * For read requests, read/writev creates a single AIOReq containing the NBD
+ * header. For write requests, 1 or more AIOReqs are created, containing the
+ * NBD header and the write data. These are pushed to reqs_to_send_head in the
+ * BDRVNBDState and the list in the NBDAIOCB. We then register a write request
+ * callback, which results in nbd_aio_write_request being called from the
+ * select() in vlc:main_loop_wait
+ *
+ * Each time nbd_aio_write_request is called, it gets the first AIOReq in the
+ * reqs_to_send_head and writes the data to the socket.
+ * If this results in the whole AIOReq being written to the socket, it moves
+ * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
+ * isn't finished, then it's left where it is. to have more of it written
+ * next time. Before exiting, we unregister the write request handler if the
+ * reqs_to_send_head queue is empty. This avoids a tight loop around the
+ * aforementioned select (since the socket is almost always ready for writing).
+ *
+ * Each nbd_aio_read_response, we check the BDRVNBDState's current_req 
attribute
+ * to see if we're in the middle of a read. If not, we read a header's worth of
+ * data, then try to find an AIOReq in the reqs_for_reply_head.
+ *
+ * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
+ * in the current_req attribute, then read from the socket to the buffer (if
+ * needed). If that completes the AIOReq, we clear the current_req attribute
+ * and deallocate the AIOReq.
+ *   - If the AIOReq is complete, and that's the last one for the NBDAIOCB, we
+ *     call the 'done' callback' and return.
+ *   - If the AIOReq isn't complete, we just return. It'll be completed in
+ *     future callbacks, since it's now the current_req
+ *   - If there's an unrecoverable error reading from the socket (EBADF, say).
+ *     we invalidate the AIOReq and teardown the connection.
+ *
+ * There are a number of scenarios in which I/O errors can occur. The socket 
can
+ * be broken at any time, or the NBD server might return invalid data, causing
+ * us to break the socket at our end. Alternatively, we might time out waiting
+ * for a response to an NBDRequest to come back. When this happens, there may 
be
+ * outstanding requests in any of a number of states. Here's what we do to 
them:
+ *
+ *   + Request has been put into reqs_to_send but no data xferred yet
+ *     - We don't need to do anything for these requests
+ *   + Request has been partially sent to NBD server
+ *     - Reset bytes_sent to 0
+ *   + Request is in reqs_for_reply
+ *     - Assume request hasn't been done. Move back to reqs_to_send_head,
+ *       preserving order. Reset bytes_sent to 0. Reset waiting_sent to true.
+ *   + Request is in current_req
+ *     -  Move back to the head of reqs_to_send (above those in 
reqs_for_reply).
+ *        Reset bytes_sent to 0, bytes_got to 0 and waiting_sent to true.
+ *        Reset s->nbd_rsp_offset to 0
+ *
+ * The only one of those scenarios that can lead to data being written to the
+ * NBD server while the VM thinks it is not written is the last one. If we can
+ * reconnect, this is not an issue at all because we'll re-send to the NBD
+ * server then (in the correct order!). If we can't reconnect, the guest will
+ * eventually get a timeout error. Worst-case, it was a write request and the
+ * write has actually succeeded, while the guest got that error, but that's no
+ * worse than if you get a timeout from a hard disc while writing.
+ *
+ * Once we have the outstanding requests sorted out, we attempt to reconnect
+ * to the NBD server. We do this by enabling a timer for a signal handler that
+ * writes a byte to the recon_sock pipe. We have a read cb registered on that
+ * socket with qemu core (vl.c), so it will call the nbd_reconnect function 
when
+ * it's next convenient for it to do so, and we do the work then.
+ *
+ * We keep a timeout value with the socket. Every time we write to the socket,
+ * we update the timeout value with the time we expect *all* responses to have
+ * finished being read. In aio_read_response, we check this timeout if there 
are
+ * outstanding requests. If all requests have been completed, we clear it. If
+ * we ever exceed the timeout, then we teardown the connection (ETIMEDOUT) and
+ * try again. Whenever we receive data from the socket, we increment the 
timeout
+ * by one second, to prevent us from timing out just because there are lots of
+ * requests in-flight.
+ *
+ */
+
+typedef struct NBDAIOCB NBDAIOCB;
+typedef struct BDRVNBDState BDRVNBDState;
+
+static int nbd_establish_connection(BDRVNBDState *s);
+static void nbd_teardown_connection(BDRVNBDState *s, bool send_disconnect);
+static void nbd_register_write_request_handler(BDRVNBDState *s);
+static void nbd_unregister_write_request_handler(BDRVNBDState *s);
+static void nbd_arm_recon_timer(BDRVNBDState *s);
+static void nbd_disarm_recon_timer(BDRVNBDState *s);
+static void nbd_reconnect(void *opaque);
+
+typedef struct AIOReq {
+    NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
+
+    /* Where on the NBDAIOCB's iov does this request start? */
+    off_t iov_offset;
+
+    /* The NBD request header pertaining to this AIOReq.
+     * This specifies the handle of the request, the read offset and length.
+     */
+    NBDRequest nbd_req_hdr;
+
+    /* How many bytes have been written to the NBD server so far. This will
+     * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
+     */
+    size_t bytes_sent;
+
+    /* How many bytes *of the payload* have been read from the NBD server so
+     * far. Varies between 0 and nbd_req_hdr.len - header byte count is kept in
+     * BDRVNBDState->nbd_rsp_offset.
+     */
+    size_t bytes_got;
+
+    /* Used to record this in the state object. waiting_sent is used to work
+     * out which queue the AIOReq is in. Before it's been sent, it's in the
+     * reqs_to_send_head. After being sent, if it's not current_req, it's in
+     * reqs_for_reply_head.
+     */
+    QTAILQ_ENTRY(AIOReq) socket_siblings;
+    bool waiting_sent;
+
+    /* Used to enter this into an NBDAIOCB */
+    QLIST_ENTRY(AIOReq) aioreq_siblings;
+} AIOReq;
+
+struct BDRVNBDState {
+    /* File descriptor for the socket to the NBD server */
     int sock;
+    bool connected;
+
+    /* File descriptor used to gain regular reconnect callbacks + timer obj
+     * We keep track of the write side so we can close it.
+     */
+    int recon_sock_r;
+    int recon_sock_w;
+    timer_t recon_timer;
+
+    /* When the above socket will be timed out unless all requests fulfilled.
+     * This isn't checked by a timer, but in the read/write functions...
+     */
+    time_t timeout;
+
+    /* Size of the file being served */
     off_t size;
+
+    /* block size */
     size_t blocksize;
-} BDRVNBDState;
 
-static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
-{
-    BDRVNBDState *s = bs->opaque;
-    uint32_t nbdflags;
+    /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
+     * it's a string of the form <hostname|ip4|\[ip6\]>:port
+     */
+    char *host_spec;
+
+    /* An NBD server may export several devices - this is the one we want */
+    char *export_name;
+
+    /* Used to generate unique NBD handles */
+    uint64_t aioreq_seq_num;
+
+    /* AIOReqs yet to be transmitted */
+    QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
 
+    /* AIOReqs that have been transmitted and are awaiting a reply */
+    QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
+
+    /* AIOReq that is currently being read from the socket */
+    AIOReq *current_req;
+
+    /* Used in nbd_aio_read_response. We may need to store received header 
bytes
+     * between reads - we don't have an AIOReq at that point.
+     */
+    uint8_t nbd_rsp_buf[sizeof(NBDReply)];
+    size_t nbd_rsp_offset;
+
+};
+
+enum AIOCBState {
+    AIOCB_WRITE_UDATA,
+    AIOCB_READ_UDATA,
+};
+
+struct NBDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUIOVector *qiov;
+    QEMUBH *bh;
+
+    enum AIOCBState aiocb_type;
+
+    int64_t sector_num;
+    int nb_sectors;
+    int ret;
+
+    bool canceled;
+
+    void (*aio_done_func)(NBDAIOCB *);
+
+    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+};
+
+static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
+{
     char *file;
-    char *name;
-    const char *host;
+    char *export_name;
+    const char *host_spec;
     const char *unixpath;
-    int sock;
-    off_t size;
-    size_t blocksize;
-    int ret;
     int err = -EINVAL;
 
     file = qemu_strdup(filename);
 
-    name = strstr(file, EN_OPTSTR);
-    if (name) {
-        if (name[strlen(EN_OPTSTR)] == 0) {
+    export_name = strstr(file, EN_OPTSTR);
+    if (export_name) {
+        if (export_name[strlen(EN_OPTSTR)] == 0) {
             goto out;
         }
-        name[0] = 0;
-        name += strlen(EN_OPTSTR);
+        export_name[0] = 0; /* truncate 'file' */
+        export_name += strlen(EN_OPTSTR);
+        s->export_name = qemu_strdup(export_name);
     }
 
-    if (!strstart(file, "nbd:", &host)) {
+    /* extract the host_spec - fail if it's not nbd:... */
+    if (!strstart(file, "nbd:", &host_spec)) {
         goto out;
     }
 
-    if (strstart(host, "unix:", &unixpath)) {
-
-        if (unixpath[0] != '/') {
+    /* are we a UNIX or TCP socket? */
+    if (strstart(host_spec, "unix:", &unixpath)) {
+        if (unixpath[0] != '/') { /* We demand  an absolute path*/
             goto out;
         }
+        s->host_spec = qemu_strdup(unixpath);
+    } else {
+        s->host_spec = qemu_strdup(host_spec);
+    }
+
+    err = 0;
+
+out:
+    qemu_free(file);
+    if (err != 0) {
+        qemu_free(s->export_name);
+        qemu_free(s->host_spec);
+    }
+    return err;
+}
+
+static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
+                                        size_t data_len,
+                                        off_t offset,
+                                        off_t iov_offset)
+{
+    AIOReq *aio_req;
 
-        sock = unix_socket_outgoing(unixpath);
+    aio_req = qemu_malloc(sizeof(*aio_req));
+    aio_req->aiocb = acb;
+    aio_req->iov_offset = iov_offset;
+    aio_req->nbd_req_hdr.from = offset;
+    aio_req->nbd_req_hdr.len = data_len;
+    aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
 
+    if (acb->aiocb_type == AIOCB_READ_UDATA) {
+        aio_req->nbd_req_hdr.type = NBD_CMD_READ;
     } else {
-        uint16_t port = NBD_DEFAULT_PORT;
-        char *p, *r;
-        char hostname[128];
+        aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
+    }
+
+    aio_req->bytes_sent = 0;
+    aio_req->bytes_got = 0;
 
-        pstrcpy(hostname, 128, host);
+    QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
+    aio_req->waiting_sent = true;
+    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
+    return aio_req;
+}
 
-        p = strchr(hostname, ':');
-        if (p != NULL) {
-            *p = '\0';
-            p++;
+static int nbd_aio_flush_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    int resp;
 
-            port = strtol(p, &r, 0);
-            if (r == p) {
-                goto out;
+    resp = !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
+             QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
+             (s->current_req == NULL));
+
+    return resp;
+}
+
+static inline bool free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
+{
+    NBDAIOCB *acb = aio_req->aiocb;
+
+    if (aio_req->waiting_sent) {
+        QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+    }
+
+    if (s->current_req == aio_req) {
+        s->current_req = NULL;
+    } else {
+        QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+    }
+
+    QLIST_REMOVE(aio_req, aioreq_siblings);
+    qemu_free(aio_req);
+
+    return !QLIST_EMPTY(&acb->aioreq_head);
+}
+
+static void nbd_finish_aiocb(NBDAIOCB *acb)
+{
+    acb->common.cb(acb->common.opaque, acb->ret);
+    qemu_aio_release(acb);
+}
+
+static void nbd_handle_io_err(BDRVNBDState *s, int err)
+{
+    AIOReq *aio_req;
+
+    /* These are fine - no need to do anything */
+    if (err == EAGAIN || err == EWOULDBLOCK || err == EINTR) {
+        logout("Recoverable error %i (%s) - returning\n", err, strerror(err));
+        return;
+    }
+
+    /* These errors mean the I/O failed. This happens because the socket is 
dead
+     * - we need to teardown the NBD session and prepare all the in-flight I/O
+     * requests to be retransmitted.
+     */
+    nbd_teardown_connection(s, false);
+
+    /* Reset bytes_sent for the first aio_req in reqs_to_send */
+    if (!QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+        aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
+        logout("Resetting %lu->bytes_sent\n", aio_req->nbd_req_hdr.handle);
+        aio_req->bytes_sent = 0;
+    }
+
+    /* Move all requests in reqs_for_reply back to reqs_to_send. Both are
+     * FIFOs in ordinary operation, so we take from the end of reqs_for_reply
+     * and put to the head of reqs_to_send
+     */
+    while (!QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
+        aio_req = QTAILQ_LAST(&s->reqs_for_reply_head, reqs_for_reply_head);
+
+        logout("Returning request %lu to reqs_to_send_head\n",
+               aio_req->nbd_req_hdr.handle);
+
+        QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+
+        aio_req->bytes_sent = 0;
+        aio_req->waiting_sent = true;
+
+        QTAILQ_INSERT_HEAD(&s->reqs_to_send_head, aio_req, socket_siblings);
+    }
+
+    /* If we're partway through a request, clean that up too. It should go
+     * to the very head of reqs_to_send as though it'd never been partly
+     * received at all.
+     */
+    if (s->current_req) {
+      aio_req = s->current_req;
+      s->current_req = NULL;
+
+        logout("Returning %lu to reqs_to_send_head\n",
+               aio_req->nbd_req_hdr.handle);
+
+      aio_req->bytes_sent = 0;
+      aio_req->bytes_got = 0;
+      aio_req->waiting_sent = true;
+
+      QTAILQ_INSERT_HEAD(&s->reqs_for_reply_head, aio_req, socket_siblings);
+    }
+
+    /* Reset this in case we're part-way through reading a response header */
+    logout("Resetting s->nbd_rsp_offset\n");
+    s->nbd_rsp_offset = 0;
+
+    /* Now we should be good to continue on our merry way when reconnected.
+     * Arming the recon timer makes us attempt to reconnect once every second
+     * until successful.
+     */
+    nbd_arm_recon_timer(s);
+}
+
+static void nbd_aio_write_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    AIOReq *aio_req = NULL;
+    NBDAIOCB *acb;
+    size_t total;
+    ssize_t ret;
+
+    if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+        logout("Nothing to do in aio_write_request so unregistering 
handler\n");
+        nbd_unregister_write_request_handler(s);
+        return;
+    }
+
+    aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
+    acb = aio_req->aiocb;
+
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+        total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
+    } else {
+        total = sizeof(NBDRequest);
+    }
+
+    /* Since we've not written (all of) the header yet, get on with it.
+     * We always grab the *head* of the queue in this callback, so we
+     * won't interleave writes to the socket.
+     *
+     * Creating the header buffer on the fly isn't ideal in the case of many
+     * retries, but almost all the time, this will happen exactly once.
+     */
+    if (aio_req->bytes_sent < sizeof(NBDRequest)) {
+        logout("Buffer not written in full, doing so\n");
+        uint8_t buf[sizeof(NBDRequest)];
+        QEMUIOVector hdr;
+        nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
+        qemu_iovec_init(&hdr, 1);
+        qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
+        ret = writev(s->sock, hdr.iov, hdr.niov);
+        qemu_iovec_destroy(&hdr);
+
+        if (ret == -1) {
+            nbd_handle_io_err(s, socket_error());
+            return;
+        } else {
+            logout("Written %zu bytes to socket (request is %zu bytes)\n", ret,
+                   sizeof(NBDRequest));
+            aio_req->bytes_sent += ret;
+        }
+    }
+
+    /* If the header is sent & we're doing a write request, send data */
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA &&
+        aio_req->bytes_sent >= sizeof(NBDRequest) &&
+        aio_req->bytes_sent < total) {
+        logout("Write request - putting data in socket\n");
+        off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) +
+                        aio_req->iov_offset;
+
+        ret = nbd_qiov_wr(s->sock, acb->qiov, total - aio_req->bytes_sent,
+                          offset, false);
+
+        if (ret < 0) {
+            nbd_handle_io_err(s, -ret);
+            return;
+        } else {
+            logout("Written %zu bytes to socket\n", ret);
+            aio_req->bytes_sent += ret;
+        }
+    }
+
+    /* Request written. nbd_aio_read_response gets the reply */
+    if (aio_req->bytes_sent == total) {
+        logout("aio_req written to socket, moving to reqs_for_reply\n");
+        aio_req->waiting_sent = false;
+        QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+        QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
+
+    /* Set socket timeout. We only do this once the write is complete since
+     * an inability to write is detected much more easily than an inability
+     * to read (is this right?)
+     */
+      s->timeout = time(NULL) + NBD_SOCKET_TIMEOUT;
+    }
+
+    if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+        logout("Write queue empty, unregistering write request handler\n");
+        nbd_unregister_write_request_handler(s);
+    }
+
+}
+
+static inline bool nbd_find_next_aioreq(BDRVNBDState *s)
+{
+    uint8_t *buf = &s->nbd_rsp_buf[s->nbd_rsp_offset];
+    size_t cnt = sizeof(NBDReply) - s->nbd_rsp_offset;
+    ssize_t ret;
+    NBDReply rsp;
+    AIOReq *aio_req;
+
+    /* Try to get enough bytes so we have a complete NBDReply */
+    ret = read(s->sock, buf, cnt);
+    logout("read %zu bytes\n", ret);
+
+    /* Socket activity - increment the timeout. If the socket sits on EOF, then
+     * the timeout isn't incremented and we will exceed it at some point.
+     */
+    if (ret > 0) {
+        s->timeout++;
+    }
+
+    /* I/O error means we've failed. */
+    if (ret == -1) {
+        nbd_handle_io_err(s, socket_error());
+        return false;
+    }
+
+    s->nbd_rsp_offset += ret;
+
+    /* We don't have enough data to make a full header */
+    if (s->nbd_rsp_offset < sizeof(NBDReply)) {
+        return false;
+    }
+
+    /* Turn data into NBDReply, find the matching aio_req */
+    nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+
+    /* Check the magic */
+    if (rsp.magic != NBD_REPLY_MAGIC) {
+        logout("Received invalid NBD response magic!\n");
+        nbd_handle_io_err(s, EIO);
+        return false;
+    }
+
+    QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, socket_siblings) {
+        if (aio_req->nbd_req_hdr.handle == rsp.handle) {
+            s->current_req = aio_req;
+            break;
+        }
+    }
+
+    if (s->current_req) {
+        /* Mission accomplished! */
+        logout("Found next aio_req\n");
+        QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+        return true;
+    }
+
+    /* The handle in the reply head doesn't match any AIOReq. Fail. */
+    logout("cannot find aio_req for handle %lu\n", rsp.handle);
+    nbd_handle_io_err(s, EIO);
+    return false;
+}
+
+static void nbd_aio_read_response(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    uint8_t *buf = NULL; /* Used if the aiocb has been canceled */
+    AIOReq *aio_req = NULL;
+    NBDAIOCB *acb;
+    NBDReply rsp;
+
+    size_t total = 0; /* number of payload bytes read */
+    ssize_t ret;
+    int rest;
+
+    /* We're not in the middle of a request */
+    if (s->current_req == NULL) {
+        /* No outstanding requests */
+        if (QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
+            logout("No request outstanding, exiting\n");
+            s->timeout = 0; /* Don't timeout an idle socket */
+            return;
+        }
+
+        /* Couldn't grab the next aioreq */
+        if (!nbd_find_next_aioreq(s)) {
+            if (time(NULL) > s->timeout) {
+                logout("Timed out waiting to receive response from server\n");
+                nbd_handle_io_err(s, ETIMEDOUT);
+            } else {
+                logout("Don't have a new aio_req to work on yet\n");
             }
+            return;
         }
+    }
+
+    /* From here on, s->current_req and s->nbd_rsp_buf are known to be good */
+    nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+    aio_req = s->current_req;
+    acb = aio_req->aiocb;
+
+    /* NBD server returned an error for this operation */
+    if (rsp.error != 0) {
+        logout("NBD request resulted in error: %i\n", rsp.error);
+        acb->ret = -EIO;
 
-        sock = tcp_socket_outgoing(hostname, port);
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            logout("Signalling completion for this ACB\n");
+            acb->aio_done_func(acb);
     }
 
+        return;
+    }
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA) {
+        total = aio_req->nbd_req_hdr.len;
+    }
+
+    if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
+
+        size_t remaining = total - aio_req->bytes_got;
+        QEMUIOVector *qiov = acb->qiov;
+        off_t qiov_offset = aio_req->bytes_got + aio_req->iov_offset;
+
+        if (acb->canceled) {
+            buf = qemu_malloc(remaining);
+            qemu_iovec_init(qiov, 1);
+            qemu_iovec_add(qiov, buf, remaining);
+            qiov_offset = 0;
+        }
+
+        ret = nbd_qiov_wr(s->sock, qiov, remaining, qiov_offset, true);
+        logout("Read %zu of %zu bytes remaining\n", ret, remaining);
+
+        /* Socket activity - increment the timeout */
+        if (ret > 0) {
+            s->timeout++;
+        }
+
+        if (acb->canceled) {
+            qemu_iovec_destroy(qiov);
+            qemu_free(buf);
+        }
+
+        if (ret < 0) {
+            nbd_handle_io_err(s, -ret);
+            return;
+        }
+
+        aio_req->bytes_got += ret;
+    }
+
+    /* Entire request has been read */
+    if (total == aio_req->bytes_got) {
+        logout("Read all bytes of the response; clearing s->current_req\n");
+        s->nbd_rsp_offset = 0;
+
+        /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
+        rest = free_aio_req(s, aio_req);
+        if (!rest) {
+            logout("acb complete\n");
+            acb->aio_done_func(acb);
+        }
+    }
+
+    logout("Leaving function\n");
+}
+
+static int nbd_establish_connection(BDRVNBDState *s)
+{
+    int sock;
+    int ret;
+    off_t size;
+    size_t blocksize;
+    uint32_t nbdflags;
+
+    if (s->host_spec[0] == '/') {
+        sock = unix_socket_outgoing(s->host_spec);
+    } else {
+        sock = tcp_socket_outgoing_spec(s->host_spec);
+    }
+
+    /* Failed to establish connection */
     if (sock == -1) {
-        err = -errno;
-        goto out;
+        logout("Failed to establish connection to NBD server\n");
+        return -errno;
     }
 
-    ret = nbd_receive_negotiate(sock, name, &nbdflags, &size, &blocksize);
+    /* NBD handshake */
+    ret = nbd_receive_negotiate(sock, s->export_name, &nbdflags, &size,
+                                &blocksize);
     if (ret == -1) {
-        err = -errno;
-        goto out;
+        logout("Failed to negotiate with the NBD server\n");
+        closesocket(sock);
+        return -errno;
     }
 
+    /* Now that we're connected, set the socket to be non-blocking */
+    socket_set_nonblock(sock);
+
     s->sock = sock;
     s->size = size;
     s->blocksize = blocksize;
-    err = 0;
+    s->nbd_rsp_offset = 0;
+    s->timeout = 0; /* Clear any timeout value */
+    s->connected = true;
 
-out:
-    qemu_free(file);
-    return err;
+    /* It's possible that there are aio_reqs waiting in reqs_to_send_head here.
+     * If not, then we clear just the on-write handler in nbd_aio_write_request
+     */
+    nbd_register_write_request_handler(s);
+
+    logout("Established connection with NBD server\n");
+    return 0;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static void nbd_teardown_connection(BDRVNBDState *s, bool send_disconnect)
 {
-    BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
-    struct nbd_reply reply;
 
-    request.type = NBD_CMD_READ;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    if (send_disconnect) {
+        request.type = NBD_CMD_DISC;
+        request.handle = 0;
+        request.from = 0;
+        request.len = 0;
+        nbd_send_request(s->sock, &request);
+    }
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
+    s->connected = false;
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
+    closesocket(s->sock);
+    s->sock = -1;
 
-    if (reply.error !=0)
-        return -reply.error;
+    logout("Connection to NBD server closed\n");
+}
 
-    if (reply.handle != request.handle)
-        return -EIO;
+static void nbd_arm_recon_timer(BDRVNBDState *s) {
+    struct itimerspec its;
 
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
-        return -EIO;
+    /* First, register the needed callback - we write a byte every second to
+     * cause the callback to be read
+     */
+    logout("Arming reconnect timer\n");
+    qemu_aio_set_fd_handler(s->recon_sock_r, nbd_reconnect, NULL, NULL, NULL, 
s);
 
-    return 0;
+    its.it_interval.tv_sec = 1;
+    its.it_interval.tv_nsec = 0;
+    its.it_value.tv_sec = 1;
+    its.it_value.tv_nsec = 0;
+
+    timer_settime(s->recon_timer, 0, &its, NULL);
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static void nbd_disarm_recon_timer(BDRVNBDState *s) {
+    struct itimerspec its;
+
+    logout("Disarming reconnect timer\n");
+
+    its.it_interval.tv_sec = 0;
+    its.it_interval.tv_nsec = 0;
+    its.it_value.tv_sec = 0;
+    its.it_value.tv_nsec = 0;
+
+    timer_settime(s->recon_timer, 0, &its, NULL);
+
+    /* Unregister the required callback */
+    qemu_aio_set_fd_handler(s->recon_sock_r, NULL, NULL, NULL, NULL, s);
+}
+
+/* Here we write a single byte to the write side of recon_fd. This gets the
+ * nbd_reconnect function called */
+static void nbd_time_to_reconnect(int sig, siginfo_t *si, void *uc)
+{
+    char data = 0x00;
+    int result;
+    logout("in nbd_time_to_reconnect!\n");
+    result = write(si->si_value.sival_int, &data, 1);
+}
+
+static void nbd_reconnect(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    char recon_byte;
+
+    logout("In nbd_reconnect\n");
+
+    /* Read the signal byte from the fd to avoid filling it up */
+    if (read(s->recon_sock_r, &recon_byte, 1) == -1) {
+        logout("Error reading reconnection signal byte, but persisting\n");
+    }
+
+    if (s->connected) {
+        logout("Called when already connected; stopping timer\n");
+        nbd_disarm_recon_timer(s);
+
+    } else {
+        if (nbd_establish_connection(s) == 0) {
+            logout("Connection to NBD server established, unregistering\n");
+            nbd_disarm_recon_timer(s);
+        } else {
+            logout("NBD connection not established, staying registered\n");
+        }
+    }
+}
+
+static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
 {
     BDRVNBDState *s = bs->opaque;
-    struct nbd_request request;
-    struct nbd_reply reply;
+    int result;
+    int pipe_fds[2];
+
+    struct sigevent evp;
+    struct sigaction sa;
+    timer_t timerid;
 
-    request.type = NBD_CMD_WRITE;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = sector_num * 512;;
-    request.len = nb_sectors * 512;
+    /* Pop the config into our state object. Exit if invalid. */
+    result = nbd_config(s, filename, flags);
+    if (result != 0) {
+        return result;
+    }
 
-    if (nbd_send_request(s->sock, &request) == -1)
+    /* We register a read callback against the read end of the socket, and use
+     * the write end to signal a reconnection attempt is due.*/
+    result = pipe(pipe_fds);
+    if (result == -1) {
+        logout("Failed to generate reconnect queue pipe, failing\n");
         return -errno;
+    }
+    s->recon_sock_r = pipe_fds[0];
+    s->recon_sock_w = pipe_fds[1];
 
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
-        return -EIO;
+    /* We use this timer for notifying when we should retry a connection */
+    evp.sigev_notify = SIGEV_SIGNAL;
+    evp.sigev_signo = SIG_NBD_RECON;
+    evp.sigev_value.sival_int = pipe_fds[1];
 
-    if (nbd_receive_reply(s->sock, &reply) == -1)
+    sa.sa_sigaction = nbd_time_to_reconnect;
+    sa.sa_flags = SA_SIGINFO;
+
+    sigaction(SIG_NBD_RECON, &sa, NULL);
+
+    if (timer_create(CLOCK_MONOTONIC, &evp, &timerid) != 0) {
+        logout("Failed to create reconnection timer!");
         return -errno;
+    }
+    s->recon_timer = timerid;
 
-    if (reply.error !=0)
-        return -reply.error;
+    QTAILQ_INIT(&s->reqs_to_send_head);
+    QTAILQ_INIT(&s->reqs_for_reply_head);
 
-    if (reply.handle != request.handle)
-        return -EIO;
+    s->current_req = NULL;
+    s->aioreq_seq_num = 1; /* We always use '0' for the disconnect  */
+    s->nbd_rsp_offset = 0;
+    s->sock = -1;
+    s->connected = false;
 
-    return 0;
+    /* establish TCP connection, return error if it fails. */
+    result = nbd_establish_connection(s);
+
+    return result;
 }
 
 static void nbd_close(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
-    struct nbd_request request;
 
-    request.type = NBD_CMD_DISC;
-    request.handle = (uint64_t)(intptr_t)bs;
-    request.from = 0;
-    request.len = 0;
-    nbd_send_request(s->sock, &request);
+    sigaction(SIG_NBD_RECON, NULL, NULL);
+    timer_delete(s->recon_timer);
+    qemu_aio_set_fd_handler(s->recon_sock_r, NULL, NULL, NULL, NULL, s);
+    nbd_teardown_connection(s, s->connected);
+
+    qemu_free(s->export_name);
+    qemu_free(s->host_spec);
+    closesocket(s->recon_sock_r);
+    closesocket(s->recon_sock_w);
+}
+
+static void nbd_register_write_request_handler(BDRVNBDState *s)
+{
+    if (!s->connected) {
+        logout("Register write request handler tried when socket closed\n");
+        return;
+    }
+
+    qemu_aio_set_fd_handler(s->sock, nbd_aio_read_response, 
nbd_aio_write_request,
+                            nbd_aio_flush_request, NULL, s);
+}
+
+static void nbd_unregister_write_request_handler(BDRVNBDState *s)
+{
+    if (!s->connected) {
+        logout("Unregister write request handler tried when socket closed\n");
+        return;
+    }
+
+    qemu_aio_set_fd_handler(s->sock, nbd_aio_read_response, NULL,
+                            nbd_aio_flush_request, NULL, s);
+}
+
+/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
+ * first, if any bytes have been transmitted). So we don't need to check for
+ * canceled in aio_write_request at all.If that finishes the acb, we call its
+ * completion function. Otherwise, we leave it alone.
+ *
+ * in nbd_aio_read_response, when we're handling a read request for an acb with
+ * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
+ * the read, and throw the bytes away. Everything else goes on as normal.
+ */
+static void nbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    NBDAIOCB *acb = (NBDAIOCB *)blockacb;
+    BDRVNBDState *s = acb->common.bs->opaque;
+    AIOReq *a;
+
+    QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
+        free_aio_req(s, a);
+    }
+
+    acb->canceled = true;
+    acb->ret = -EIO;
+
+    if QLIST_EMPTY(&acb->aioreq_head) {
+        nbd_finish_aiocb(acb);
+    }
+}
+
+static AIOPool nbd_aio_pool = {
+    .aiocb_size = sizeof(NBDAIOCB),
+    .cancel = nbd_aio_cancel,
+};
+
+static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
+                                   int64_t sector_num, int nb_sectors,
+                                   BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
+
+    acb->qiov = qiov;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+
+    acb->canceled = false;
+
+    acb->aio_done_func = NULL;
+    acb->bh = NULL;
+    acb->ret = 0;
+
+    QLIST_INIT(&acb->aioreq_head);
+    return acb;
+}
+
+static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
+{
+    if (acb->bh) {
+        logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
+        return -EIO;
+    }
+
+    acb->bh = qemu_bh_new(cb, acb);
+    if (!acb->bh) {
+        logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
+        return -EIO;
+    }
+
+    qemu_bh_schedule(acb->bh);
+
+    return 0;
+}
+
+static void nbd_readv_writev_bh_cb(void *p)
+{
+    NBDAIOCB *acb = p;
+
+    size_t len, done = 0;
+    size_t total = acb->nb_sectors * SECTOR_SIZE;
+
+    /* Where the read/write starts from */
+    off_t offset = acb->sector_num * SECTOR_SIZE;
+    BDRVNBDState *s = acb->common.bs->opaque;
+
+    AIOReq *aio_req;
+
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    while (done < total) {
+        len = (total - done);
+
+        /* Split read & write requests into segments if needed */
+        if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
+            len = NBD_MAX_READ;
+        }
+
+        if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
+            len = NBD_MAX_WRITE;
+        }
+
+        logout("Allocating an aio_req of %zu bytes\n", len);
+        aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
+
+        done += len;
+    }
+
+    if (QLIST_EMPTY(&acb->aioreq_head)) {
+        logout("acb->aioreq_head empty, so finishing acb now\n");
+        nbd_finish_aiocb(acb);
+    } else if (s->connected) {
+        logout("Requests to make - registering write request callback\n");
+        nbd_register_write_request_handler(s);
+    } else {
+        logout("Requests to make but socket not connected, waiting\n");
+    }
 
-    close(s->sock);
+}
+
+static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_READ_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    NBDAIOCB *acb;
+
+    acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb->aiocb_type = AIOCB_WRITE_UDATA;
+    acb->aio_done_func = nbd_finish_aiocb;
+
+    nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+    return &acb->common;
 }
 
 static int64_t nbd_getlength(BlockDriverState *bs)
@@ -206,11 +1091,11 @@ static BlockDriver bdrv_nbd = {
     .format_name       = "nbd",
     .instance_size     = sizeof(BDRVNBDState),
     .bdrv_file_open    = nbd_open,
-    .bdrv_read         = nbd_read,
-    .bdrv_write                = nbd_write,
+    .bdrv_aio_readv  = nbd_aio_readv,
+    .bdrv_aio_writev = nbd_aio_writev,
     .bdrv_close                = nbd_close,
     .bdrv_getlength    = nbd_getlength,
-    .protocol_name     = "nbd",
+    .protocol_name   = "nbd"
 };
 
 static void bdrv_nbd_init(void)
@@ -219,3 +1104,4 @@ static void bdrv_nbd_init(void)
 }
 
 block_init(bdrv_nbd_init);
+
diff --git a/nbd.c b/nbd.c
index d8ebc42..48b80a9 100644
--- a/nbd.c
+++ b/nbd.c
@@ -49,10 +49,6 @@
 
 /* This is all part of the "official" NBD API */
 
-#define NBD_REPLY_SIZE         (4 + 4 + 8)
-#define NBD_REQUEST_MAGIC       0x25609513
-#define NBD_REPLY_MAGIC         0x67446698
-
 #define NBD_SET_SOCK            _IO(0xab, 0)
 #define NBD_SET_BLKSIZE         _IO(0xab, 1)
 #define NBD_SET_SIZE            _IO(0xab, 2)
@@ -107,155 +103,79 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, 
bool do_read)
     return offset;
 }
 
-int tcp_socket_outgoing(const char *address, uint16_t port)
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+                    bool do_read)
 {
-    int s;
-    struct in_addr in;
-    struct sockaddr_in addr;
-
-    s = socket(PF_INET, SOCK_STREAM, 0);
-    if (s == -1) {
-        return -1;
-    }
+    ssize_t ret;
+    QEMUIOVector spec;
 
-    if (inet_aton(address, &in) == 0) {
-        struct hostent *ent;
+    qemu_iovec_init(&spec, qiov->niov);
+    qemu_iovec_copy(&spec, qiov, offset, len);
 
-        ent = gethostbyname(address);
-        if (ent == NULL) {
-            goto error;
-        }
-
-        memcpy(&in, ent->h_addr, sizeof(in));
+    if (do_read) {
+        ret = readv(fd, spec.iov, spec.niov);
+    } else {
+        ret = writev(fd, spec.iov, spec.niov);
     }
 
-    addr.sin_family = AF_INET;
-    addr.sin_port = htons(port);
-    memcpy(&addr.sin_addr.s_addr, &in, sizeof(in));
+    qemu_iovec_destroy(&spec);
 
-    if (connect(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-        goto error;
+    if (ret == -1) {
+        return -socket_error();
     }
 
-    return s;
-error:
-    closesocket(s);
-    return -1;
+    return ret;
 }
 
-int tcp_socket_incoming(const char *address, uint16_t port)
+static void combine_addr(char *buf, size_t len, const char* address,
+                         uint16_t port)
 {
-    int s;
-    struct in_addr in;
-    struct sockaddr_in addr;
-    int opt;
-
-    s = socket(PF_INET, SOCK_STREAM, 0);
-    if (s == -1) {
-        return -1;
-    }
-
-    if (inet_aton(address, &in) == 0) {
-        struct hostent *ent;
-
-        ent = gethostbyname(address);
-        if (ent == NULL) {
-            goto error;
-        }
-
-        memcpy(&in, ent->h_addr, sizeof(in));
-    }
-
-    addr.sin_family = AF_INET;
-    addr.sin_port = htons(port);
-    memcpy(&addr.sin_addr.s_addr, &in, sizeof(in));
-
-    opt = 1;
-    if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
-                   (const void *) &opt, sizeof(opt)) == -1) {
-        goto error;
-    }
-
-    if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-        goto error;
-    }
-
-    if (listen(s, 128) == -1) {
-        goto error;
+    /* If the address-part contains a colon, it's an IPv6 IP so needs [] */
+    if (strstr(address, ":")) {
+        snprintf(buf, len, "[%s]:%u", address, port);
+    } else {
+        snprintf(buf, len, "%s:%u", address, port);
     }
-
-    return s;
-error:
-    closesocket(s);
-    return -1;
 }
 
-#ifndef _WIN32
-int unix_socket_incoming(const char *path)
+int tcp_socket_outgoing(const char *address, uint16_t port)
 {
-    int s;
-    struct sockaddr_un addr;
-
-    s = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (s == -1) {
-        return -1;
+    char address_and_port[128];
+    combine_addr(address_and_port, 128, address, port);
+    return tcp_socket_outgoing_spec(address_and_port);
     }
 
-    memset(&addr, 0, sizeof(addr));
-    addr.sun_family = AF_UNIX;
-    pstrcpy(addr.sun_path, sizeof(addr.sun_path), path);
-
-    if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-        goto error;
-    }
-
-    if (listen(s, 128) == -1) {
-        goto error;
-    }
-
-    return s;
-error:
-    closesocket(s);
-    return -1;
+int tcp_socket_outgoing_spec(const char *address_and_port)
+{
+    return inet_connect(address_and_port, SOCK_STREAM);
 }
 
-int unix_socket_outgoing(const char *path)
+int tcp_socket_incoming(const char *address, uint16_t port)
 {
-    int s;
-    struct sockaddr_un addr;
-
-    s = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (s == -1) {
-        return -1;
+    char address_and_port[128];
+    combine_addr(address_and_port, 128, address, port);
+    return tcp_socket_incoming_spec(address_and_port);
     }
 
-    memset(&addr, 0, sizeof(addr));
-    addr.sun_family = AF_UNIX;
-    pstrcpy(addr.sun_path, sizeof(addr.sun_path), path);
-
-    if (connect(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-        goto error;
+int tcp_socket_incoming_spec(const char *address_and_port)
+{
+    char *ostr  = NULL;
+    int olen = 0;
+    return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0);
     }
 
-    return s;
-error:
-    closesocket(s);
-    return -1;
-}
-#else
 int unix_socket_incoming(const char *path)
 {
-    errno = ENOTSUP;
-    return -1;
+    char *ostr = NULL;
+    int olen = 0;
+
+    return unix_listen(path, ostr, olen);
 }
 
 int unix_socket_outgoing(const char *path)
 {
-    errno = ENOTSUP;
-    return -1;
+    return unix_connect(path);
 }
-#endif
-
 
 /* Basic flow
 
@@ -529,17 +449,35 @@ int nbd_client(int fd)
 }
 #endif
 
-int nbd_send_request(int csock, struct nbd_request *request)
+/* Put the NBD header into a buffer, ready for wire transmission.
+ * Endianness is dealt with here. The caller needs to allocate a
+ * buffer of sizeof(NBDRequest) bytes.
+ */
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf)
 {
-       uint8_t buf[4 + 4 + 8 + 8 + 4];
-
-       cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC);
+    request->magic = NBD_REQUEST_MAGIC;
+    cpu_to_be32w((uint32_t *)(buf +  0), request->magic);
        cpu_to_be32w((uint32_t*)(buf + 4), request->type);
        cpu_to_be64w((uint64_t*)(buf + 8), request->handle);
        cpu_to_be64w((uint64_t*)(buf + 16), request->from);
        cpu_to_be32w((uint32_t*)(buf + 24), request->len);
+}
+
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply)
+{
+    reply->magic  = be32_to_cpup((uint32_t *)buf);
+    reply->error  = be32_to_cpup((uint32_t *)(buf + 4));
+    reply->handle = be64_to_cpup((uint64_t *)(buf + 8));
+}
 
-       TRACE("Sending request to client");
+int nbd_send_request(int csock, NBDRequest *request)
+{
+    uint8_t buf[sizeof(NBDRequest)];
+    nbd_request_to_buf(request, buf);
+
+    TRACE("Sending request to client: "
+          "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
+          request->from, request->len, request->handle, request->type);
 
        if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
                LOG("writing to socket failed");
@@ -549,7 +487,6 @@ int nbd_send_request(int csock, struct nbd_request *request)
        return 0;
 }
 
-
 static int nbd_receive_request(int csock, struct nbd_request *request)
 {
        uint8_t buf[4 + 4 + 8 + 8 + 4];
@@ -589,8 +526,7 @@ static int nbd_receive_request(int csock, struct 
nbd_request *request)
 
 int nbd_receive_reply(int csock, struct nbd_reply *reply)
 {
-       uint8_t buf[NBD_REPLY_SIZE];
-       uint32_t magic;
+    uint8_t buf[sizeof(NBDReply)];
 
        memset(buf, 0xAA, sizeof(buf));
 
@@ -605,17 +541,14 @@ int nbd_receive_reply(int csock, struct nbd_reply *reply)
           [ 4 ..  7]    error   (0 == no error)
           [ 7 .. 15]    handle
         */
-
-       magic = be32_to_cpup((uint32_t*)buf);
-       reply->error  = be32_to_cpup((uint32_t*)(buf + 4));
-       reply->handle = be64_to_cpup((uint64_t*)(buf + 8));
+    nbd_buf_to_reply((uint8_t *)&buf, reply);
 
        TRACE("Got reply: "
              "{ magic = 0x%x, .error = %d, handle = %" PRIu64" }",
-             magic, reply->error, reply->handle);
+          reply->magic, reply->error, reply->handle);
 
-       if (magic != NBD_REPLY_MAGIC) {
-               LOG("invalid magic (got 0x%x)", magic);
+    if (reply->magic != NBD_REPLY_MAGIC) {
+        LOG("invalid magic (got 0x%x)", reply->magic);
                errno = EINVAL;
                return -1;
        }
@@ -657,7 +590,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, 
uint64_t dev_offset,
                return -1;
 
        if (request.len + NBD_REPLY_SIZE > data_size) {
-               LOG("len (%u) is larger than max len (%u)",
+        LOG("len (%lu) is larger than max len (%u)",
                    request.len + NBD_REPLY_SIZE, data_size);
                errno = EINVAL;
                return -1;
diff --git a/nbd.h b/nbd.h
index fc3a594..8eea104 100644
--- a/nbd.h
+++ b/nbd.h
@@ -22,19 +22,25 @@
 #include <sys/types.h>
 
 #include <qemu-common.h>
+
 #include "block_int.h"
 
 struct nbd_request {
+    uint32_t magic;
     uint32_t type;
     uint64_t handle;
     uint64_t from;
     uint32_t len;
-};
+} __attribute__ ((__packed__));
 
 struct nbd_reply {
+    uint32_t magic;
     uint32_t error;
     uint64_t handle;
-};
+} __attribute__ ((__packed__));
+
+typedef struct nbd_request NBDRequest;
+typedef struct nbd_reply NBDReply;
 
 enum {
     NBD_CMD_READ = 0,
@@ -43,13 +49,23 @@ enum {
 };
 
 #define NBD_DEFAULT_PORT       10809
+#define NBD_REPLY_SIZE       sizeof(NBDReply)
+#define NBD_REQUEST_MAGIC    0x25609513
+#define NBD_REPLY_MAGIC      0x67446698
 
 size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+                    bool do_read);
 int tcp_socket_outgoing(const char *address, uint16_t port);
 int tcp_socket_incoming(const char *address, uint16_t port);
+int tcp_socket_outgoing_spec(const char *address_and_port);
+int tcp_socket_incoming_spec(const char *address_and_port);
 int unix_socket_outgoing(const char *path);
 int unix_socket_incoming(const char *path);
 
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf);
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply);
+
 int nbd_negotiate(int csock, off_t size);
 int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
                           off_t *size, size_t *blocksize);
-- 
1.7.0.4






reply via email to

[Prev in Thread] Current Thread [Next in Thread]