qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU blo


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend
Date: Fri, 20 Jun 2014 22:33:46 +0800
User-agent: Mutt/1.5.23 (2014-03-12)

On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
> +typedef struct BDRVArchipelagoState {
> +    int fds[2];
> +    int qemu_aio_count;

This field is never used.  It's increment and decremented but nothing
ever checks the value.  It can be dropped.

> +    int event_reader_pos;
> +    ArchipelagoAIOCB *event_acb;
> +    const char *volname;
> +    uint64_t size;
> +    /* Archipelago specific */
> +    struct xseg *xseg;
> +    struct xseg_port *port;
> +    xport srcport;
> +    xport sport;
> +    xport mportno;
> +    xport vportno;
> +    QemuMutex archip_mutex;
> +    QemuCond archip_cond;
> +    bool is_signaled;
> +    /* Request handler specific */
> +    QemuThread request_th;
> +    QemuCond request_cond;
> +    QemuMutex request_mutex;
> +    bool th_is_signaled;
> +    bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> +    size_t count;
> +    size_t total;
> +    int ref;
> +    int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> +    const char *volname;
> +    off_t offset;
> +    size_t size;
> +    uint64_t bufidx;
> +    int ret;
> +    int op;
> +    ArchipelagoAIOCB *aio_cb;
> +    ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> +    if (xseg && (sport != srcport)) {
> +        xseg_init_local_signal(xseg, srcport);
> +        sport = srcport;
> +    }
> +}

QEMU should clean up by calling xseg_quit_local_signal().

> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> +    int ret;
> +    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
> +    if (ret < 0) {
> +        error_report("archipelago_finish_aiocb(): failed writing"
> +                     " aio_cb->s->fds");
> +    }
> +    g_free(reqdata);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port 
> *port,
> +                      struct xseg_request *expected_req)
> +{
> +    struct xseg_request *req;
> +    xseg_prepare_wait(xseg, srcport);
> +    void *psd = xseg_get_signal_desc(xseg, port);
> +    while (1) {
> +        req = xseg_receive(xseg, srcport, 0);
> +        if (req) {
> +            if (req != expected_req) {
> +                archipelagolog("Unknown received request\n");
> +                xseg_put_request(xseg, req, srcport);
> +            } else if (!(req->state & XS_SERVED)) {
> +                archipelagolog("Failed req\n");
> +                return -1;
> +            } else {
> +                break;
> +            }
> +        }
> +        xseg_wait_signal(xseg, psd, 100000UL);
> +    }
> +    xseg_cancel_wait(xseg, srcport);
> +    return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{

This thread is only necessary because you're not integrating xseg into
the QEMU event loop.  If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread.  The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.

> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
> +    qemu_mutex_lock(&s->request_mutex);
> +
> +    while (!s->stopping) {
> +        struct xseg_request *req;
> +        char *data;
> +        xseg_prepare_wait(s->xseg, s->srcport);
> +        req = xseg_receive(s->xseg, s->srcport, 0);
> +        if (req) {
> +            AIORequestData *reqdata;
> +            ArchipelagoSegmentedRequest *segreq;
> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> +            if (!(req->state & XS_SERVED)) {
> +                    segreq = reqdata->segreq;
> +                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
> +            }
> +
> +            switch (reqdata->op) {
> +            case ARCHIP_OP_READ:
> +                    data = xseg_get_data(s->xseg, req);
> +                    segreq = reqdata->segreq;
> +                    segreq->count += req->serviced;
> +
> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, 
> reqdata->bufidx,
> +                            data,
> +                            req->serviced);
> +
> +                    xseg_put_request(s->xseg, req, s->srcport);
> +
> +                    __sync_add_and_fetch(&segreq->ref, -1);
> +
> +                    if (segreq->ref == 0) {

Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again.  But I'm not reviewing the details
of the shared memory accesses.  I'm assuming this stuff is correct,
secure, etc.

> +                        if (!segreq->failed) {
> +                            reqdata->aio_cb->ret = segreq->count;
> +                            archipelago_finish_aiocb(reqdata);
> +                        }

What does segreq->failed mean?  We should always finish the I/O request,
otherwise the upper layers will run out of resources as we leak
failed requests.

> +static void parse_filename_opts(const char *filename, Error **errp,
> +                                char **volume, xport *mport, xport *vport)
> +{
> +    const char *start;
> +    char *tokens[3], *ds;
> +    int idx;
> +    xport lmport = NoPort, lvport = NoPort;
> +
> +    strstart(filename, "archipelago:", &start);
> +
> +    ds = g_strdup(start);
> +    tokens[0] = strtok(ds, "/");
> +    tokens[1] = strtok(NULL, ":");
> +    tokens[2] = strtok(NULL, "\0");
> +
> +    if (!strlen(tokens[0])) {
> +        error_setg(errp, "volume name must be specified first");
> +        return;

ds is leaked.

> +    }
> +
> +    for (idx = 1; idx < 3; idx++) {
> +        if (tokens[idx] != NULL) {
> +            if (strstart(tokens[idx], "mport=", NULL)) {
> +                xseg_find_port(tokens[idx], "mport=", &lmport);
> +            }
> +            if (strstart(tokens[idx], "vport=", NULL)) {
> +                xseg_find_port(tokens[idx], "vport=", &lvport);
> +            }
> +        }
> +    }
> +
> +    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
> +        error_setg(errp, "Usage: file=archipelago:"
> +                   "<volumename>[/mport=<mapperd_port>"
> +                   "[:vport=<vlmcd_port>]]");

ds is leaked.

> +        return;
> +    }
> +    *volume = g_strdup(tokens[0]);
> +    *mport = lmport;
> +    *vport = lvport;
> +    g_free(ds);
> +}
> +
> +static void archipelago_parse_filename(const char *filename, QDict *options,
> +                                       Error **errp)
> +{
> +    const char *start;
> +    char *volume = NULL;
> +    xport mport = NoPort, vport = NoPort;
> +
> +    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
> +        error_setg(errp, "volume/mport/vport and a file name may not be "
> +                         "specified at the same time");
> +        return;
> +    }
> +
> +    if (!strstart(filename, "archipelago:", &start)) {
> +        error_setg(errp, "File name must start with 'archipelago:'");
> +        return;
> +    }
> +
> +    if (!strlen(start) || strstart(start, "/", NULL)) {
> +        error_setg(errp, "volume name must be specified");
> +        return;
> +    }
> +
> +    parse_filename_opts(filename, errp, &volume, &mport, &vport);
> +
> +    if (volume) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
> +        g_free(volume);
> +    }
> +    if (mport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
> +    }
> +    if (vport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
> +    }
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> +    .name = "archipelago",
> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = ARCHIPELAGO_OPT_VOLUME,
> +            .type = QEMU_OPT_STRING,
> +            .help = "Name of the volume image",
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_MPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago mapperd port number"
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_VPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago vlmcd port number"
> +
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> +                                 QDict *options,
> +                                 int bdrv_flags,
> +                                 Error **errp)
> +{
> +    int ret = 0;
> +    const char *volume;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, 
> &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +
> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> +    if (volume == NULL) {
> +        error_setg(errp, "archipelago block driver requires an 'volume'"
> +                   " options");

"archipelago block driver requires the 'volume' option"

> +        error_propagate(errp, local_err);

This line is unnecessary since the error message was already put into
errp.

> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +    s->volname = g_strdup(volume);
> +
> +    /* Initialize XSEG, join shared memory segment */
> +    ret = qemu_archipelago_init(s);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot initialize XSEG and join shared "
> +                   "memory segment");
> +        goto err_exit;
> +    }
> +
> +    s->event_reader_pos = 0;
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot create pipe");
> +        goto err_exit;

Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?

> +    }
> +
> +    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
> +    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
> +                            qemu_archipelago_aio_event_reader, NULL,
> +                            s);
> +
> +    qemu_opts_del(opts);
> +    return 0;
> +
> +err_exit:
> +    qemu_opts_del(opts);
> +    return ret;

s->volname is leaked

> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> +    int r, targetlen;
> +    char *target;
> +    struct xseg_request *req;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
> +    close(s->fds[0]);
> +    close(s->fds[1]);
> +
> +    s->stopping = true;
> +
> +    qemu_mutex_lock(&s->request_mutex);
> +    while (!s->th_is_signaled) {
> +        qemu_cond_wait(&s->request_cond,
> +                       &s->request_mutex);
> +    }
> +    qemu_mutex_unlock(&s->request_mutex);
> +    qemu_cond_destroy(&s->request_cond);
> +    qemu_mutex_destroy(&s->request_mutex);

It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.

Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.

(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)

> +
> +    qemu_cond_destroy(&s->archip_cond);
> +    qemu_mutex_destroy(&s->archip_mutex);
> +
> +    targetlen = strlen(s->volname);
> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit;
> +    }
> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
> +    if (r < 0) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot prepare XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    target = xseg_get_target(s->xseg, req);
> +    strncpy(target, s->volname, targetlen);

Using strncpy() hints that target is a string when in fact it's not.  I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.

Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?

> +    req->size = req->datalen;
> +    req->offset = 0;
> +    req->op = X_CLOSE;
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot submit XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    xseg_signal(s->xseg, p);
> +    r = wait_reply(s->xseg, s->srcport, s->port, req);
> +    if (r < 0) {
> +        archipelagolog("wait_reply() error\n");
> +    }
> +    if (!(req->state & XS_SERVED)) {
> +        archipelagolog("Could no close map for volume '%s'\n", s->volname);
> +    }
> +
> +    xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> +    xseg_leave_dynport(s->xseg, s->port);
> +    xseg_leave(s->xseg);

s->volname is leaked.

> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> +    aio_cb->cancelled = true;
> +    while (aio_cb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +    qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
> +    .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
> +{
> +    int ret = 0;
> +    while (1) {
> +        fd_set wfd;
> +        int fd = aio_cb->s->fds[1];
> +
> +        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
> +        if (ret > 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }
> +        FD_ZERO(&wfd);
> +        FD_SET(fd, &wfd);
> +        do {
> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> +        } while (ret < 0 && errno == EINTR);
> +    }
> +    return ret;
> +}

A newer signalling approach is available and will let you drop the pipe
code.  QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.

See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.

> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> +    uint64_t size;
> +    int ret, targetlen;
> +    struct xseg_request *req;
> +    struct xseg_reply_info *xinfo;
> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> +    if (!reqdata) {
> +        archipelagolog("Cannot allocate reqdata\n");
> +        return -1;

g_malloc() never returns NULL, this if statement can be dropped.

Attachment: pgpQwi3tlyFDQ.pgp
Description: PGP signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]