qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 1/1] block: Support Archipelago as a QEMU block back


From: Chrysostomos Nanakos
Subject: [Qemu-devel] [PATCH 1/1] block: Support Archipelago as a QEMU block backend
Date: Thu, 29 May 2014 14:14:42 +0300

VM Image on Archipelago volume is specified like this:

file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>]]

'archipelago' is the protocol.

'mport' is the port number on which mapperd is listening. This is optional
and if not specified, QEMU will make Archipelago to use the default port.

'vport' is the port number on which vlmcd is listening. This is optional
and if not specified, QEMU will make Archipelago to use the default port.

Examples:

file=archipelago:my_vm_volume
file=archipelago:my_vm_volume/mport=123
file=archipelago:my_vm_volume/mport=123:vport=1234

Signed-off-by: Chrysostomos Nanakos <address@hidden>
---
 block/Makefile.objs |    1 +
 block/archipelago.c | 1129 +++++++++++++++++++++++++++++++++++++++++++++++++++
 configure           |   40 ++
 3 files changed, 1170 insertions(+)
 create mode 100644 block/archipelago.c

diff --git a/block/Makefile.objs b/block/Makefile.objs
index fd88c03..895c30d 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
 block-obj-$(CONFIG_GLUSTERFS) += gluster.o
+block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
 block-obj-$(CONFIG_LIBSSH2) += ssh.o
 endif
 
diff --git a/block/archipelago.c b/block/archipelago.c
new file mode 100644
index 0000000..5318cc8
--- /dev/null
+++ b/block/archipelago.c
@@ -0,0 +1,1129 @@
+/*
+ * Copyright 2014 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+#include "block/block_int.h"
+#include "qemu/error-report.h"
+#include "qemu/thread.h"
+
+#include <inttypes.h>
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+
+#define ARCHIP_FD_READ      0
+#define ARCHIP_FD_WRITE     1
+#define NUM_XSEG_THREADS    1
+#define MAX_REQUEST_SIZE    524288
+
+static struct xseg *xseg;
+static struct xseg_port *port;
+xport srcport = NoPort;
+xport sport = NoPort;
+xport mportno = NoPort;
+xport vportno = NoPort;
+
+#define archipelagolog(fmt, ...) \
+    fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__)
+
+typedef enum {
+    ARCHIP_OP_READ,
+    ARCHIP_OP_WRITE,
+    ARCHIP_OP_FLUSH,
+    ARCHIP_OP_VOLINFO,
+} ARCHIPCmd;
+
+typedef struct ArchipelagoConf {
+    char *volname;
+    int64_t size;
+} ArchipelagoConf;
+
+typedef struct ArchipelagoAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    int64_t ret;
+    QEMUIOVector *qiov;
+    char *buffer;
+    ARCHIPCmd cmd;
+    int64_t sector_num;
+    int error;
+    struct BDRVArchipelagoState *s;
+    int cancelled;
+    int status;
+} ArchipelagoAIOCB;
+
+typedef struct ArchipelagoCB {
+    ArchipelagoAIOCB *acb;
+    struct BDRVArchipelagoState *s;
+    int done;
+    int64_t size;
+    char *buf;
+    int64_t ret;
+} ArchipelagoCB;
+
+typedef struct BDRVArchipelagoState {
+    int fds[2];
+    ArchipelagoConf *gconf;
+    int qemu_aio_count;
+    int event_reader_pos;
+    ArchipelagoCB *event_acb;
+} BDRVArchipelagoState;
+
+typedef struct ArchipelagoSegmentedRequest {
+    size_t count;
+    size_t total;
+    int ref;
+    int failed;
+} ArchipelagoSegmentedRequest;
+
+typedef struct AIORequestData {
+    char *volname;
+    off_t offset;
+    size_t size;
+    char *buf;
+    int ret;
+    int op;
+    ArchipelagoCB *aio_cb;
+    ArchipelagoSegmentedRequest *segreq;
+} AIORequestData;
+
+
+typedef struct ArchipelagoThread {
+    QemuThread request_th;
+    QemuCond request_cond;
+    QemuMutex request_mutex;
+    int is_signaled;
+    int is_running;
+} ArchipelagoThread;
+
+static void archipelago_aio_bh_cb(void *opaque);
+
+static int qemu_archipelago_signal_pipe(BDRVArchipelagoState *s,
+                                        ArchipelagoCB *aio_cb);
+
+QemuMutex archip_mutex;
+QemuCond archip_cond;
+ArchipelagoThread archipelago_th[NUM_XSEG_THREADS];
+static int is_signaled;
+
+static void init_local_signal(void)
+{
+    if (xseg && (sport != srcport)) {
+        xseg_init_local_signal(xseg, srcport);
+        sport = srcport;
+    }
+}
+
+static void archipelago_finish_aiocb(ArchipelagoCB *aio_cb,
+                                     ssize_t c,
+                                     AIORequestData *reqdata)
+{
+    int ret;
+    aio_cb->ret = c;
+    ret = qemu_archipelago_signal_pipe(aio_cb->s, aio_cb);
+    if (ret < 0) {
+        error_report("archipelago_finish_aiocb(): failed writing acb->s->fds");
+        g_free(aio_cb);
+        /* Lock disk and exit ??*/
+    }
+    g_free(reqdata);
+}
+
+static int wait_reply(struct xseg_request *expected_req)
+{
+    struct xseg_request *req;
+    xseg_prepare_wait(xseg, srcport);
+    void *psd = xseg_get_signal_desc(xseg, port);
+    while (1) {
+        req = xseg_receive(xseg, srcport, 0);
+        if (req) {
+            if (req != expected_req) {
+                archipelagolog("Unknown received request.\n");
+                xseg_put_request(xseg, req, srcport);
+            } else if (!(req->state & XS_SERVED)) {
+                archipelagolog("Failed req.\n");
+                return -1;
+            } else {
+                break;
+            }
+        }
+        xseg_wait_signal(xseg, psd, 10000000UL);
+    }
+    xseg_cancel_wait(xseg, srcport);
+    return 0;
+}
+
+static void xseg_request_handler(void *arthd)
+{
+
+    void *psd = xseg_get_signal_desc(xseg, port);
+    ArchipelagoThread *th = (ArchipelagoThread *) arthd;
+    while (th->is_running) {
+        struct xseg_request *req;
+        xseg_prepare_wait(xseg, srcport);
+        req = xseg_receive(xseg, srcport, 0);
+        if (req) {
+            AIORequestData *reqdata;
+            ArchipelagoSegmentedRequest *segreq;
+            xseg_get_req_data(xseg, req, (void **)&reqdata);
+
+            if (!(req->state & XS_SERVED)) {
+                    segreq = reqdata->segreq;
+                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
+            }
+
+            if (reqdata->op == ARCHIP_OP_READ) {
+                char *data = xseg_get_data(xseg, req);
+                segreq = reqdata->segreq;
+                segreq->count += req->serviced;
+
+                memcpy(reqdata->buf, data, req->serviced);
+                xseg_put_request(xseg, req, srcport);
+
+                __sync_add_and_fetch(&segreq->ref, -1);
+
+                if (!segreq->ref && !segreq->failed) {
+                    reqdata->ret = segreq->count;
+                    g_free(reqdata->segreq);
+                    archipelago_finish_aiocb(reqdata->aio_cb, reqdata->ret,
+                                             reqdata);
+                } else if (segreq->ref && segreq->failed) {
+                    g_free(reqdata);
+                } else if (!segreq->ref && segreq->failed) {
+                    g_free(reqdata->segreq);
+                    g_free(reqdata);
+                }
+            } else if (reqdata->op == ARCHIP_OP_WRITE) {
+                segreq = reqdata->segreq;
+                reqdata->ret = req->serviced;
+                segreq->count += req->serviced;
+                xseg_put_request(xseg, req, srcport);
+
+                __sync_add_and_fetch(&segreq->ref, -1);
+
+                if (!segreq->ref && !segreq->failed) {
+                    reqdata->ret = segreq->count;
+                    g_free(reqdata->segreq);
+                    archipelago_finish_aiocb(reqdata->aio_cb, reqdata->ret,
+                                             reqdata);
+                } else if (segreq->ref && segreq->failed) {
+                    g_free(reqdata);
+                } else if (!segreq->ref && segreq->failed) {
+                    g_free(reqdata->segreq);
+                    g_free(reqdata);
+                }
+            } else if (reqdata->op == ARCHIP_OP_VOLINFO) {
+                is_signaled = 1;
+                qemu_cond_signal(&archip_cond);
+            }
+        } else {
+            xseg_wait_signal(xseg, psd, 10000000UL);
+        }
+        xseg_cancel_wait(xseg, srcport);
+    }
+    th->is_signaled = 1;
+    qemu_cond_signal(&th->request_cond);
+    qemu_thread_exit(NULL);
+}
+
+static void qemu_archipelago_gconf_free(ArchipelagoConf *gconf)
+{
+    g_free(gconf->volname);
+    g_free(gconf);
+}
+
+static void xseg_find_port(char *pstr, const char *needle, xport *port)
+{
+    char *a;
+    char *dpstr = strdup(pstr);
+    a = strtok(dpstr, needle);
+    *port = (xport) atoi(a);
+    free(dpstr);
+}
+
+static int parse_volume_options(ArchipelagoConf *gconf, char *path)
+{
+    char *tokens[4];
+    int i;
+    if (!path) {
+        return -EINVAL;
+    }
+    /* Find Volume Name, mapperd and vlmcd ports */
+    char *ds = g_strndup(path, strlen(path));
+    tokens[0] = strtok(ds, ":");
+    tokens[1] = strtok(NULL, "/");
+    tokens[2] = strtok(NULL, ":");
+    tokens[3] = strtok(NULL, ":");
+    if (strcmp(tokens[0], "archipelago") != 0) {
+        /* Should not be here. Protocol is already not supported */
+        return -EINVAL;
+    }
+
+    gconf->volname = g_strndup(tokens[1], strlen(tokens[1]));
+    for (i = 0; i < 4; i++) {
+        if (tokens[i] != NULL) {
+            if (strstr(tokens[i], "mport=")) {
+                xseg_find_port(tokens[i], "mport=", &mportno);
+            }
+            if (strstr(tokens[i], "vport=")) {
+                xseg_find_port(tokens[i], "vport=", &vportno);
+            }
+        }
+    }
+
+    return 0;
+}
+
+static int archipelago_parse_uri(ArchipelagoConf *gconf, const char *filename)
+{
+    return parse_volume_options(gconf, (char *)filename);
+}
+
+static int qemu_archipelago_xseg_init(void)
+{
+    if (xseg_initialize()) {
+        archipelagolog("Cannot initialize xseg.\n");
+        goto err_exit;
+    }
+    xseg = xseg_join((char *)"posix", (char *)"archipelago",
+                     (char *)"posixfd", NULL);
+    if (!xseg) {
+        archipelagolog("Cannot join segment.\n");
+        goto err_exit;
+    }
+    port = xseg_bind_dynport(xseg);
+    srcport = port->portno;
+    init_local_signal();
+    return 0;
+err_exit:
+    return -1;
+}
+
+static int qemu_archipelago_init(ArchipelagoConf *gconf, const char *filename)
+{
+    int ret, i;
+    /* Set default values */
+    vportno = 501;
+    mportno = 1001;
+
+    ret = archipelago_parse_uri(gconf, filename);
+    if (ret < 0) {
+        const char *err_msg =
+            "<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>]]";
+        error_report("Usage: file=archipelago:%s", err_msg);
+        errno = -ret;
+        goto err_exit;
+    }
+
+    ret = qemu_archipelago_xseg_init();
+    if (ret < 0) {
+        error_report("Cannot initialize xseg. Aborting...\n");
+        errno = -ret;
+        goto err_exit;
+    }
+
+    qemu_cond_init(&archip_cond);
+    qemu_mutex_init(&archip_mutex);
+    for (i = 0; i < NUM_XSEG_THREADS; i++) {
+        qemu_cond_init(&archipelago_th[i].request_cond);
+        qemu_mutex_init(&archipelago_th[i].request_mutex);
+        archipelago_th[i].is_signaled = 0;
+        archipelago_th[i].is_running = 1;
+        qemu_thread_create(&archipelago_th[i].request_th, "xseg_io_th",
+                           (void *) xseg_request_handler,
+                           (void *) &archipelago_th[i], QEMU_THREAD_DETACHED);
+    }
+
+err_exit:
+    return ret;
+}
+
+static void qemu_archipelago_complete_aio(ArchipelagoCB *aio_cb)
+{
+    int64_t r;
+    ArchipelagoAIOCB *acb = aio_cb->acb;
+
+    r = aio_cb->ret;
+
+    if (acb->cmd != ARCHIP_OP_READ) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret = aio_cb->size;
+        }
+    } else {
+        if (r < 0) {
+            memset(aio_cb->buf, 0, aio_cb->size);
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < aio_cb->size) {
+            memset(aio_cb->buf + r, 0, aio_cb->size - r);
+            if (!acb->error) {
+                acb->ret = aio_cb->size;
+            }
+        } else if (!acb->error) {
+            acb->ret = r;
+        }
+    }
+
+    acb->bh = qemu_bh_new(archipelago_aio_bh_cb, acb);
+    qemu_bh_schedule(acb->bh);
+    g_free(aio_cb);
+}
+
+static void qemu_archipelago_aio_event_reader(void *opaque)
+{
+    BDRVArchipelagoState *s = opaque;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&s->event_acb;
+
+        ret = read(s->fds[ARCHIP_FD_READ], p + s->event_reader_pos,
+                   sizeof(s->event_acb) - s->event_reader_pos);
+        if (ret > 0) {
+            s->event_reader_pos += ret;
+            if (s->event_reader_pos == sizeof(s->event_acb)) {
+                s->event_reader_pos = 0;
+                qemu_archipelago_complete_aio(s->event_acb);
+                s->qemu_aio_count--;
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static QemuOptsList runtime_opts = {
+    .name = "archipelago",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "filename",
+            .type = QEMU_OPT_STRING,
+            .help = "Specification of the volume image",
+        },
+        { /* end of list */ }
+    },
+};
+
+static int qemu_archipelago_open(BlockDriverState *bs,
+                                 QDict *options,
+                                 int bdrv_flags,
+                                 Error **errp)
+{
+    int ret = 0;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    const char *filename;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        qerror_report_err(local_err);
+        error_free(local_err);
+        ret = -EINVAL;
+        goto err_exit;
+    }
+
+    filename = qemu_opt_get(opts, "filename");
+
+    /* Initialize XSEG, join segment and set s->gconf->volname */
+    s->gconf = g_malloc0(sizeof(ArchipelagoConf));
+    ret = qemu_archipelago_init(s->gconf, filename);
+    if (ret < 0) {
+        ret = -errno;
+        goto err_exit;
+    }
+
+    s->event_reader_pos = 0;
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        ret = -errno;
+        goto err_exit;
+    }
+
+    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
+                            qemu_archipelago_aio_event_reader, NULL,
+                            s);
+
+    qemu_opts_del(opts);
+    return 0;
+
+err_exit:
+    qemu_opts_del(opts);
+    qemu_archipelago_gconf_free(s->gconf);
+    return ret;
+}
+
+static void qemu_archipelago_close(BlockDriverState *bs)
+{
+    int i, r, targetlen;
+    char *target;
+    struct xseg_request *req;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    close(s->fds[0]);
+    close(s->fds[1]);
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
+    for (i = 0; i < NUM_XSEG_THREADS; i++) {
+        archipelago_th[i].is_running = 0;
+    }
+    for (i = 0; i < NUM_XSEG_THREADS; i++) {
+        qemu_mutex_lock(&archipelago_th[i].request_mutex);
+        if (!archipelago_th[i].is_signaled) {
+            qemu_cond_wait(&archipelago_th[i].request_cond,
+                           &archipelago_th[i].request_mutex);
+        }
+        qemu_mutex_unlock(&archipelago_th[i].request_mutex);
+        qemu_cond_destroy(&archipelago_th[i].request_cond);
+        qemu_mutex_destroy(&archipelago_th[i].request_mutex);
+    }
+    qemu_cond_destroy(&archip_cond);
+    qemu_mutex_destroy(&archip_mutex);
+
+    targetlen = strlen(s->gconf->volname);
+    req = xseg_get_request(xseg, srcport, vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get xseg request.\n");
+        goto err_exit;
+    }
+    r = xseg_prep_request(xseg, req, targetlen, 0);
+    if (r < 0) {
+        xseg_put_request(xseg, req, srcport);
+        archipelagolog("Cannot prepare close request.\n");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(xseg, req);
+    strncpy(target, s->gconf->volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_CLOSE;
+
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        xseg_put_request(xseg, req, srcport);
+        archipelagolog("Cannot submit close request.\n");
+        goto err_exit;
+    }
+
+    xseg_signal(xseg, p);
+    r = wait_reply(req);
+    if (r < 0) {
+        archipelagolog("wait_reply() error.\n");
+    }
+
+    xseg_put_request(xseg, req, srcport);
+
+err_exit:
+    xseg_leave_dynport(xseg, port);
+    xseg_leave(xseg);
+}
+
+static int qemu_archipelago_create_volume(ArchipelagoConf *gconf)
+{
+    int ret, targetlen;
+    struct xseg_request *req;
+    struct xseg_request_clone *xclone;
+    char *target;
+
+    req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get xseg request.\n");
+        return -1;
+    }
+
+    targetlen = strlen(gconf->volname);
+    ret = xseg_prep_request(xseg, req, targetlen,
+                            sizeof(struct xseg_request_clone));
+    if (ret < 0) {
+        archipelagolog("Cannot prepare xseg request.\n");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get xseg target.\n");
+        goto err_exit;
+    }
+    strncpy(target, gconf->volname, targetlen);
+    xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
+    memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
+    xclone->targetlen = 0;
+    xclone->size = gconf->size * BDRV_SECTOR_SIZE;
+    req->offset = 0;
+    req->size = req->datalen;
+    req->op = X_CLONE;
+
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit request.\n");
+        goto err_exit;
+    }
+    xseg_signal(xseg, p);
+
+    ret = wait_reply(req);
+    if (ret < 0) {
+        archipelagolog("wait_reply() error. Aborting...\n");
+        goto err_exit;
+    }
+    xseg_put_request(xseg, req, srcport);
+    return ret;
+err_exit:
+    xseg_put_request(xseg, req, srcport);
+    return -1;
+}
+
+static int qemu_archipelago_create(const char *filename,
+                                   QEMUOptionParameter *options,
+                                   Error **errp)
+{
+    int ret = 0;
+    int64_t total_size = 0;
+    ArchipelagoConf *gconf = g_malloc0(sizeof(ArchipelagoConf));
+
+    ret = qemu_archipelago_init(gconf, filename);
+    if (ret < 0) {
+        ret = -errno;
+        goto err_exit;
+    }
+
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            total_size = options->value.n / BDRV_SECTOR_SIZE;
+        }
+        options++;
+    }
+    /* Create Volume in Archipelago */
+    gconf->size = total_size;
+    ret = qemu_archipelago_create_volume(gconf);
+err_exit:
+    qemu_archipelago_gconf_free(gconf);
+    return ret;
+}
+
+static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
+{
+    /* Not implemented yet */
+    return 0;
+}
+
+static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    ArchipelagoAIOCB *acb = (ArchipelagoAIOCB *) blockacb;
+    acb->cancelled = 1;
+    while (acb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+    qemu_aio_release(acb);
+}
+
+static const AIOCBInfo archipelago_aiocb_info = {
+    .aiocb_size = sizeof(ArchipelagoAIOCB),
+    .cancel = qemu_archipelago_aio_cancel,
+};
+
+static int qemu_archipelago_signal_pipe(BDRVArchipelagoState *s,
+                                        ArchipelagoCB *aio_cb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = s->fds[1];
+
+        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
+        if (ret > 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
+
+static void archipelago_aio_bh_cb(void *opaque)
+{
+    ArchipelagoAIOCB *acb = opaque;
+    if (acb->cmd == ARCHIP_OP_READ) {
+        qemu_iovec_from_buf(acb->qiov, 0, acb->buffer, acb->qiov->size);
+    }
+
+    qemu_vfree(acb->buffer);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    acb->status = 0;
+
+    if (!acb->cancelled) {
+        qemu_aio_release(acb);
+    }
+}
+
+
+static int archipelago_aio_read(char *volname,
+                                char *buf,
+                                size_t count,
+                                off_t offset,
+                                ArchipelagoCB *aio_cb,
+                                ArchipelagoSegmentedRequest *segreq)
+{
+    int ret, targetlen;
+    char *target;
+    struct xseg_request *req;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    targetlen = strlen(volname);
+    req = xseg_get_request(xseg, srcport, vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get xseg request.\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(xseg, req, targetlen, count);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare xseg request.\n");
+        goto err_exit;
+    }
+    target = xseg_get_target(xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get xseg target.\n");
+        goto err_exit;
+    }
+    strncpy(target, volname, targetlen);
+    req->size = count;
+    req->offset = offset;
+    req->op = X_READ;
+
+    reqdata->volname = volname;
+    reqdata->offset = offset;
+    reqdata->size = count;
+    reqdata->buf = buf;
+    reqdata->aio_cb = aio_cb;
+    reqdata->op = ARCHIP_OP_READ;
+    reqdata->segreq = segreq;
+
+    xseg_set_req_data(xseg, req, reqdata);
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit xseg request.\n");
+        goto err_exit;
+    }
+    xseg_signal(xseg, p);
+
+    return 0;
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(xseg, req, srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int archipelago_aio_write(char  *volname,
+                                 char *buf,
+                                 size_t count,
+                                 off_t offset,
+                                 ArchipelagoCB *aio_cb,
+                                 ArchipelagoSegmentedRequest *segreq)
+{
+    char *data = NULL;
+    struct xseg_request *req;
+    int ret, targetlen;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    targetlen = strlen(volname);
+    req = xseg_get_request(xseg, srcport, vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get xseg request.\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(xseg, req, targetlen, count);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare xseg request.\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get xseg target.\n");
+        goto err_exit;
+    }
+    strncpy(target, volname, targetlen);
+    req->size = count;
+    req->offset = offset;
+    req->op = X_WRITE;
+
+    reqdata->volname = volname;
+    reqdata->offset = offset;
+    reqdata->size = count;
+    reqdata->buf = buf;
+    reqdata->aio_cb = aio_cb;
+    reqdata->op = ARCHIP_OP_WRITE;
+    reqdata->segreq = segreq;
+
+    xseg_set_req_data(xseg, req, reqdata);
+
+    data = xseg_get_data(xseg, req);
+    if (!data) {
+        archipelagolog("Cannot get xseg data.\n");
+        goto err_exit;
+    }
+    memcpy(data, buf, count);
+
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit xseg request.\n");
+        goto err_exit;
+    }
+    xseg_signal(xseg, p);
+    return 0;
+
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(xseg, req, srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int archipelago_aio_segmented_rw(char *volname,
+                                        char *buf,
+                                        size_t count,
+                                        off_t offset,
+                                        ArchipelagoCB *aio_cb,
+                                        int op)
+{
+    int i, ret, segments_nr, last_segment_size;
+    ArchipelagoSegmentedRequest *segreq;
+
+    segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest));
+
+    if (op == ARCHIP_OP_FLUSH) {
+        segments_nr = 1;
+        segreq->ref = segments_nr;
+        segreq->total = count;
+        segreq->count = 0;
+        segreq->failed = 0;
+        ret = archipelago_aio_write(volname, buf, count, offset, aio_cb,
+                                    segreq);
+        if (ret < 0) {
+            goto err_exit;
+        }
+        return 0;
+    }
+
+    segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
+                  ((count % MAX_REQUEST_SIZE) ? 1 : 0);
+    last_segment_size = (int)(count % MAX_REQUEST_SIZE);
+
+    segreq->ref = segments_nr;
+    segreq->total = count;
+    segreq->count = 0;
+    segreq->failed = 0;
+
+    for (i = 0; i < segments_nr - 1; i++) {
+        if (op == ARCHIP_OP_READ) {
+            ret = archipelago_aio_read(volname, buf + i * MAX_REQUEST_SIZE,
+                                       MAX_REQUEST_SIZE,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+        } else if (op == ARCHIP_OP_WRITE) {
+            ret = archipelago_aio_write(volname, buf + i * MAX_REQUEST_SIZE,
+                                        MAX_REQUEST_SIZE,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+        }
+        if (ret < 0) {
+            goto err_exit;
+        }
+    }
+
+    if ((segments_nr > 1) && last_segment_size) {
+        if (op == ARCHIP_OP_READ) {
+
+            ret = archipelago_aio_read(volname, buf + i * MAX_REQUEST_SIZE,
+                                       last_segment_size,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+        } else if (op == ARCHIP_OP_WRITE) {
+            ret = archipelago_aio_write(volname, buf + i * MAX_REQUEST_SIZE,
+                                        last_segment_size,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+        }
+    } else if ((segments_nr > 1) && !last_segment_size) {
+        if (op == ARCHIP_OP_READ) {
+            ret = archipelago_aio_read(volname, buf + i * MAX_REQUEST_SIZE,
+                                       MAX_REQUEST_SIZE,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+        } else if (op == ARCHIP_OP_WRITE) {
+            ret = archipelago_aio_write(volname, buf + i * MAX_REQUEST_SIZE,
+                                        MAX_REQUEST_SIZE,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+        }
+    } else if (segments_nr == 1) {
+        if (op == ARCHIP_OP_READ) {
+            ret = archipelago_aio_read(volname, buf, count, offset, aio_cb,
+                                       segreq);
+        } else if (op == ARCHIP_OP_WRITE) {
+            ret = archipelago_aio_write(volname, buf, count, offset, aio_cb,
+                                        segreq);
+        }
+    }
+    if (ret < 0) {
+        goto err_exit;
+    }
+
+    return 0;
+
+err_exit:
+    __sync_add_and_fetch(&segreq->failed, 1);
+    if (segments_nr == 1) {
+        __sync_add_and_fetch(&segreq->ref, -1);
+    } else {
+        __sync_add_and_fetch(&segreq->ref, -segments_nr + i);
+    }
+
+    if (!segreq->ref) {
+        g_free(segreq);
+    }
+
+    return ret;
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
+                                                 int64_t sector_num,
+                                                 QEMUIOVector *qiov,
+                                                 int nb_sectors,
+                                                 BlockDriverCompletionFunc *cb,
+                                                 void *opaque,
+                                                 int op)
+{
+    ArchipelagoAIOCB *acb;
+    ArchipelagoCB *aio_cb;
+    BDRVArchipelagoState *s = bs->opaque;
+    int64_t size, off;
+    char *buf;
+    int ret;
+
+    acb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
+    acb->cmd = op;
+    acb->qiov = qiov;
+
+    if (op != ARCHIP_OP_FLUSH) {
+        acb->buffer = qemu_blockalign(bs, qiov->size);
+    } else {
+        acb->buffer = NULL;
+    }
+
+    acb->ret = 0;
+    acb->error = 0;
+    acb->s = s;
+    acb->cancelled = 0;
+    acb->bh = NULL;
+    acb->status = -EINPROGRESS;
+
+    if (op == ARCHIP_OP_WRITE) {
+        qemu_iovec_to_buf(acb->qiov, 0, acb->buffer, qiov->size);
+    }
+
+    buf = acb->buffer;
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+
+    s->qemu_aio_count++;
+
+    aio_cb = g_malloc(sizeof(ArchipelagoCB));
+    aio_cb->done = 0;
+    aio_cb->acb = acb;
+    aio_cb->buf = buf;
+    aio_cb->s =  acb->s;
+    aio_cb->size = size;
+
+    ret = archipelago_aio_segmented_rw(s->gconf->volname, buf, size, off,
+                                       aio_cb, op);
+    if (ret < 0) {
+        goto err_exit;
+    }
+    return &acb->common;
+
+err_exit:
+    error_report("qemu_archipelago_aio_rw(): I/O Error.\n");
+    s->qemu_aio_count--;
+    g_free(aio_cb);
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_READ);
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_WRITE);
+}
+
+static int64_t archipelago_volume_info(char *volname)
+{
+    int64_t size;
+    int ret, targetlen;
+    struct xseg_request *req;
+    struct xseg_reply_info *xinfo;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    targetlen = strlen(volname);
+    req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get xseg request.\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(xseg, req, targetlen,
+                            sizeof(struct xseg_reply_info));
+    if (ret < 0) {
+        archipelagolog("Cannot prepare xseg request.\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get xseg target.\n");
+        goto err_exit;
+    }
+    strncpy(target, volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_INFO;
+
+    reqdata->op = ARCHIP_OP_VOLINFO;
+    reqdata->volname = volname;
+    xseg_set_req_data(xseg, req, reqdata);
+
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Cannot submit xseg request.\n");
+        goto err_exit;
+    }
+    xseg_signal(xseg, p);
+    qemu_mutex_lock(&archip_mutex);
+    if (!is_signaled) {
+        qemu_cond_wait(&archip_cond, &archip_mutex);
+    }
+    is_signaled = 0;
+    qemu_mutex_unlock(&archip_mutex);
+
+    xinfo = (struct xseg_reply_info *) xseg_get_data(xseg, req);
+    size = xinfo->size;
+    xseg_put_request(xseg, req, srcport);
+    g_free(reqdata);
+    return size;
+
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(xseg, req, srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
+{
+    int64_t ret;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    ret = archipelago_volume_info(s->gconf->volname);
+    if (ret < 0) {
+        return -errno;
+    } else {
+        return ret;
+    }
+}
+
+static QEMUOptionParameter qemu_archipelago_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    {NULL}
+};
+
+static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
+                                   ARCHIP_OP_FLUSH);
+}
+
+static BlockDriver bdrv_archipelago = {
+    .format_name = "archipelago",
+    .protocol_name = "archipelago",
+    .instance_size = sizeof(BDRVArchipelagoState),
+    .bdrv_file_open = qemu_archipelago_open,
+    .bdrv_close = qemu_archipelago_close,
+    .bdrv_create = qemu_archipelago_create,
+    .bdrv_getlength = qemu_archipelago_getlength,
+    .bdrv_truncate = qemu_archipelago_truncate,
+    .bdrv_aio_readv = qemu_archipelago_aio_readv,
+    .bdrv_aio_writev = qemu_archipelago_aio_writev,
+    .bdrv_aio_flush = qemu_archipelago_aio_flush,
+    .bdrv_has_zero_init = bdrv_has_zero_init_1,
+    .create_options = qemu_archipelago_create_options,
+};
+
+static void bdrv_archipelago_init(void)
+{
+    bdrv_register(&bdrv_archipelago);
+}
+
+block_init(bdrv_archipelago_init);
diff --git a/configure b/configure
index 0e516f9..9c738a0 100755
--- a/configure
+++ b/configure
@@ -317,6 +317,7 @@ seccomp=""
 glusterfs=""
 glusterfs_discard="no"
 glusterfs_zerofill="no"
+archipelago=""
 virtio_blk_data_plane=""
 gtk=""
 gtkabi=""
@@ -1062,6 +1063,10 @@ for opt do
   ;;
   --enable-glusterfs) glusterfs="yes"
   ;;
+  --disable-archipelago) archipelago="no"
+  ;;
+  --enable-archipelago) archipelago="yes"
+  ;;
   --disable-virtio-blk-data-plane) virtio_blk_data_plane="no"
   ;;
   --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes"
@@ -1351,6 +1356,8 @@ Advanced options (experts only):
   --enable-coroutine-pool  enable coroutine freelist (better performance)
   --enable-glusterfs       enable GlusterFS backend
   --disable-glusterfs      disable GlusterFS backend
+  --enable-archipelago     enable Archipelago backend
+  --disable-archipelago    disable Archipelago backend
   --enable-gcov            enable test coverage analysis with gcov
   --gcov=GCOV              use specified gcov [$gcov_tool]
   --enable-tpm             enable TPM support
@@ -3008,6 +3015,33 @@ EOF
   fi
 fi
 
+
+##########################################
+# archipelago probe
+if test "$archipelago" != "no" ; then
+    cat > $TMPC <<EOF
+#include <stdio.h>
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+int main(void) {
+    xseg_initialize();
+    return 0;
+}
+EOF
+    archipelago_libs=-lxseg
+    if compile_prog "" "$archipelago_libs"; then
+        archipelago="yes"
+        libs_tools="$archipelago_libs $libs_tools"
+        libs_softmmu="$archipelago_libs $libs_softmmu"
+    else
+      if test "$archipelago" = "yes" ; then
+        feature_not_found "Archipelago backend support" "Install libxseg devel"
+      fi
+      archipelago="no"
+    fi
+fi
+
+
 ##########################################
 # glusterfs probe
 if test "$glusterfs" != "no" ; then
@@ -4197,6 +4231,7 @@ echo "seccomp support   $seccomp"
 echo "coroutine backend $coroutine"
 echo "coroutine pool    $coroutine_pool"
 echo "GlusterFS support $glusterfs"
+echo "Archipelago support $archipelago"
 echo "virtio-blk-data-plane $virtio_blk_data_plane"
 echo "gcov              $gcov_tool"
 echo "gcov enabled      $gcov"
@@ -4631,6 +4666,11 @@ if test "$glusterfs_zerofill" = "yes" ; then
   echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak
 fi
 
+if test "$archipelago" = "yes" ; then
+  echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak
+  echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak
+fi
+
 if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=m" >> $config_host_mak
   echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak
-- 
1.7.10.4



reply via email to

[Prev in Thread] Current Thread [Next in Thread]