[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] Re: [PATCH 3/3] block/nbd: Make the NBD block device use th
From: |
Kevin Wolf |
Subject: |
[Qemu-devel] Re: [PATCH 3/3] block/nbd: Make the NBD block device use the AIO interface |
Date: |
Mon, 21 Feb 2011 13:59:30 +0100 |
User-agent: |
Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.15) Gecko/20101027 Fedora/3.0.10-1.fc12 Thunderbird/3.0.10 |
Am 18.02.2011 13:55, schrieb Nick Thomas:
> This preserves the previous behaviour where the NBD server is
> unavailable or goes away during guest execution, but switches the
> NBD backend to present the AIO interface instead of the sync IO
> interface.
>
> We also split write requests into 1 MiB blocks (minus request size).
> This is a hard limit in the NBD servers (including qemu-nbd), but
> never seemed to come up with the previous backend code.
> ---
> block/nbd.c | 555
> ++++++++++++++++++++++++++++++++++++++++++++++++++---------
> 1 files changed, 470 insertions(+), 85 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index c8dc763..e7b1f7e 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -1,11 +1,12 @@
> /*
> - * QEMU Block driver for NBD
> + * QEMU Block driver for NBD - asynchronous IO
> *
> * Copyright (C) 2008 Bull S.A.S.
> * Author: Laurent Vivier <address@hidden>
> *
> * Some parts:
> * Copyright (C) 2007 Anthony Liguori <address@hidden>
> + * Copyright (C) 2011 Nick Thomas <address@hidden>
> *
> * Permission is hereby granted, free of charge, to any person obtaining a
> copy
> * of this software and associated documentation files (the "Software"), to
> deal
> @@ -27,66 +28,135 @@
> */
>
> #include "qemu-common.h"
> +#include "qemu_socket.h"
> #include "nbd.h"
> #include "module.h"
>
> #include <sys/types.h>
> #include <unistd.h>
>
> -#define EN_OPTSTR ":exportname="
> +#define EN_OPTSTR ":exportname="
> +#define SECTOR_SIZE 512
> +
> +/* 1MiB minus request header size */
> +#define MAX_NBD_WRITE ((1024*1024) - (4 + 4 + 8 + 8 + 4))
-EMAGIC :-)
I think we can add an uint32_t magic to struct nbd_request, make it
packed and then replace this magic "4 + 4 + 8 + 8 + 4" thing by
"sizeof(struct nbd_request)" everywhere.
> +
> +/* #define DEBUG_NBD */
> +
> +#if defined(DEBUG_NBD)
> +#define logout(fmt, ...) \
> + fprintf(stderr, "nbd\t%-24s" fmt, __func__, ##__VA_ARGS__)
> +#else
> +#define logout(fmt, ...) ((void)0)
> +#endif
> +
> +
> +typedef struct NBDAIOCB NBDAIOCB;
> +
> +typedef struct AIOReq {
> + NBDAIOCB *aiocb;
> + off_t iov_offset; /* Where on the iov does this req start? */
> + off_t offset; /* Starting point of the read */
> +
> + size_t data_len;
> + uint8_t flags;
> + uint64_t handle;
> +
> + QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
> + QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
>
> typedef struct BDRVNBDState {
> int sock;
> off_t size;
> size_t blocksize;
> +
> + /* Filled in by nbd_config. Store host_spec because DNS may change */
> + bool tcp_conn; /* True, we use TCP. False, UNIX domain sockets */
Should it be an enum then?
> + char *export_name; /* An NBD server may export several devices */
> + char *host_spec; /* Path to socket (UNIX) or hostname/IP (TCP) */
> + uint16_t tcp_port;
Is this part really related to AIO? If not, it should be a patch of its own.
> +
> + /* We use these for asynchronous I/O */
> + uint64_t aioreq_seq_num;
> + QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
> } BDRVNBDState;
>
> -static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> +enum AIOCBState {
> + AIOCB_WRITE_UDATA,
> + AIOCB_READ_UDATA,
> +};
> +
> +struct NBDAIOCB {
> + BlockDriverAIOCB common;
> +
> + QEMUIOVector *qiov;
> +
> + int64_t sector_num;
> + int nb_sectors;
> +
> + int ret;
> + enum AIOCBState aiocb_type;
> +
> + QEMUBH *bh;
> + void (*aio_done_func)(NBDAIOCB *);
> +
> + int canceled;
bool?
> +
> + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
> +
> +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
> {
> - BDRVNBDState *s = bs->opaque;
> - uint32_t nbdflags;
> + NBDAIOCB *acb = aio_req->aiocb;
> + QLIST_REMOVE(aio_req, outstanding_aio_siblings);
> + QLIST_REMOVE(aio_req, aioreq_siblings);
> + qemu_free(aio_req);
>
> + return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static int nbd_config(BDRVNBDState *s, const char* filename, int flags)
> +{
> char *file;
> - char *name;
> - const char *host;
> + char *export_name;
> + const char *host_spec;
> const char *unixpath;
> - int sock;
> - off_t size;
> - size_t blocksize;
> - int ret;
> int err = -EINVAL;
>
> file = qemu_strdup(filename);
>
> - name = strstr(file, EN_OPTSTR);
> - if (name) {
> - if (name[strlen(EN_OPTSTR)] == 0) {
> + export_name = strstr(file, EN_OPTSTR);
> + if (export_name) {
> + if (export_name[strlen(EN_OPTSTR)] == 0) {
> goto out;
> }
> - name[0] = 0;
> - name += strlen(EN_OPTSTR);
> + export_name[0] = 0; /* truncate 'file' */
> + export_name += strlen(EN_OPTSTR);
> + s->export_name = qemu_strdup(export_name);
> }
>
> - if (!strstart(file, "nbd:", &host)) {
> + /* extract the host_spec - fail if it's not nbd:* */
> + if (!strstart(file, "nbd:", &host_spec)) {
> goto out;
> }
>
> - if (strstart(host, "unix:", &unixpath)) {
> -
> - if (unixpath[0] != '/') {
> + /* are we a UNIX or TCP socket? */
> + if (strstart(host_spec, "unix:", &unixpath)) {
> + if (unixpath[0] != '/') { /* We demand an absolute path*/
> goto out;
> }
> -
> - sock = unix_socket_outgoing(unixpath);
> -
> + s->tcp_conn = false;
> + s->host_spec = qemu_strdup(unixpath);
> } else {
> + /* We should have an <IPv4 address>:<port> string to split up */
> uint16_t port = NBD_DEFAULT_PORT;
> char *p, *r;
> char hostname[128];
>
> - pstrcpy(hostname, 128, host);
> -
> - p = strchr(hostname, ':');
> + pstrcpy(hostname, 128, host_spec);
> + p = strchr(hostname, ':'); /* FIXME: IPv6 */
> if (p != NULL) {
> *p = '\0';
> p++;
> @@ -96,121 +166,436 @@ static int nbd_open(BlockDriverState *bs, const char*
> filename, int flags)
> goto out;
> }
> }
> + s->tcp_conn = true;
> + s->host_spec = qemu_strdup(hostname);
> + s->tcp_port = port;
> + }
> +
> + err = 0;
> +
> +out:
> + qemu_free(file);
> + if (err != 0) {
> + if (s->export_name != NULL) {
> + qemu_free(s->export_name);
> + }
> + if (s->host_spec != NULL) {
> + qemu_free(s->host_spec);
> + }
free (and qemu_free) works just fine with NULL. It's defined to do
nothing in this case.
> + }
> + return err;
> +}
> +
> +static void aio_read_response(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + struct nbd_reply reply;
> +
> + AIOReq *aio_req = NULL;
> + NBDAIOCB *acb;
> + int rest;
> +
> + if (QLIST_EMPTY(&s->outstanding_aio_head)) {
> + return;
> + }
> +
> + /* read the header */
> + if (nbd_receive_reply(s->sock, &reply) == -1) {
> + logout("Failed to read response from socket\n");
> + /* Having failed to read the reply header, we can't know which
> + * aio_req this corresponds to - so we can't signal a failure.
> + */
> + return;
> + }
> +
> + /* find the right aio_req from the outstanding_aio list */
> + QLIST_FOREACH(aio_req, &s->outstanding_aio_head,
> outstanding_aio_siblings) {
> + if (aio_req->handle == reply.handle) {
> + break;
> + }
> + }
> +
> + if (!aio_req) {
> + logout("cannot find aio_req for handle %lu\n", reply.handle);
> + return;
> + }
> +
> + acb = aio_req->aiocb;
> +
> + if (acb->aiocb_type == AIOCB_READ_UDATA) {
> + off_t offset = 0;
> + int ret = 0;
> + size_t total = aio_req->data_len;
> +
> + while (offset < total) {
> + ret = nbd_wr_aio(s->sock, acb->qiov->iov, total - offset, offset,
> + true);
> +
> + if (ret == -1) {
> + logout("Error reading from NBD server: %i (%s)\n",
> + errno, strerror(errno));
I think you need to call the AIO callback with ret = -errno here.
> + return;
> + }
> +
> + offset += ret;
> + }
> + }
>
> - sock = tcp_socket_outgoing(hostname, port);
> + if (reply.error != 0) {
> + acb->ret = -EIO;
> + logout("NBD request resulted in error %i\n", reply.error);
> }
>
> + rest = free_aio_req(s, aio_req);
> + if (!rest) {
> + acb->aio_done_func(acb);
> + }
> +
> + return;
> +}
> +
> +static int aio_flush_request(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> +
> + return !QLIST_EMPTY(&s->outstanding_aio_head);
> +}
> +
> +/*
> + * Connect to the NBD server specified in the state object
> + */
> +static int nbd_establish_connection(BlockDriverState *bs)
> +{
> + BDRVNBDState *s = bs->opaque;
> + int sock;
> + int ret;
> + off_t size;
> + size_t blocksize;
> + uint32_t nbdflags;
> +
> + if (s->tcp_conn == true) {
> + sock = tcp_socket_outgoing(s->host_spec, s->tcp_port);
> + } else {
> + sock = unix_socket_outgoing(s->host_spec);
> + }
> +
> + /* Failed to establish connection */
> if (sock == -1) {
> - err = -errno;
> - goto out;
> + logout("Failed to establish connection to NBD server\n");
> + return -errno;
> }
>
> - ret = nbd_receive_negotiate(sock, name, &nbdflags, &size, &blocksize);
> + /* NBD handshake */
> + ret = nbd_receive_negotiate(sock, s->export_name, &nbdflags, &size,
> + &blocksize);
> if (ret == -1) {
> - err = -errno;
> - goto out;
> + logout("Failed to negotiate with the NBD server\n");
> + closesocket(sock);
> + return -errno;
> }
>
> + /* Now that we're connected, set the socket to be non-blocking */
> + socket_set_nonblock(sock);
> +
> s->sock = sock;
> s->size = size;
> s->blocksize = blocksize;
> - err = 0;
>
> -out:
> - qemu_free(file);
> - return err;
> + /* Response handler. This is called when there is data to read */
> + qemu_aio_set_fd_handler(sock, aio_read_response, NULL, aio_flush_request,
> + NULL, s);
> + logout("Established connection with NBD server\n");
> + return 0;
> }
>
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> - uint8_t *buf, int nb_sectors)
> +static void nbd_teardown_connection(BlockDriverState *bs)
> {
> + /* Send the final packet to the NBD server and close the socket */
> BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
> - struct nbd_reply reply;
>
> - request.type = NBD_CMD_READ;
> + request.type = NBD_CMD_DISC;
> request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + request.from = 0;
> + request.len = 0;
> + nbd_send_request(s->sock, &request);
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
> + closesocket(s->sock);
> + logout("Connection to NBD server closed\n");
> + return;
> +}
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> +static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> +{
> + BDRVNBDState *s = bs->opaque;
> + int result;
>
> - if (reply.error !=0)
> - return -reply.error;
> + /* Pop the config into our state object. Exit if invalid. */
> + result = nbd_config(s, filename, flags);
>
> - if (reply.handle != request.handle)
> - return -EIO;
> + if (result != 0) {
> + return result;
> + }
>
> - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> - return -EIO;
> + QLIST_INIT(&s->outstanding_aio_head);
> +
> + /* establish TCP connection, return error if it fails
> + * TODO: Configurable retry-until-timeout behaviour.
> + */
> + result = nbd_establish_connection(bs);
> + if (result != 0) {
> + return result;
> + }
>
> return 0;
> }
>
> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> - const uint8_t *buf, int nb_sectors)
> +static void nbd_close(BlockDriverState *bs)
> {
> + nbd_teardown_connection(bs);
> BDRVNBDState *s = bs->opaque;
> +
> + if (s->export_name != NULL) {
> + qemu_free(s->export_name);
> + }
> + if (s->host_spec != NULL) {
> + qemu_free(s->host_spec);
> + }
> +
> + return;
> +}
> +
> +static int add_aio_request(BDRVNBDState *s, AIOReq *aio_req, QEMUIOVector
> *qiov,
> + enum AIOCBState aiocb_type)
> +{
> struct nbd_request request;
> - struct nbd_reply reply;
>
> - request.type = NBD_CMD_WRITE;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + request.from = aio_req->offset;
> + request.len = aio_req->data_len;
> + request.handle = aio_req->handle;
> +
> + if (aiocb_type == AIOCB_READ_UDATA) {
> + request.type = NBD_CMD_READ;
> + } else {
> + request.type = NBD_CMD_WRITE;
> + }
>
> - if (nbd_send_request(s->sock, &request) == -1)
> + /* Write the request to the socket. Header first. */
> + if (nbd_send_request(s->sock, &request) == -1) {
> + /* TODO: retry handling. This leads to -EIO and request cancellation
> */
> + logout("writing request header to server failed\n");
> return -errno;
> + }
>
> - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> - return -EIO;
> + /* If this is a write, send the data too */
> + if (aiocb_type == AIOCB_WRITE_UDATA) {
> + int ret = 0;
> + off_t offset = 0;
> + size_t total = aio_req->data_len;
> +
> + while (offset < total) {
> + ret = nbd_wr_aio(s->sock, qiov->iov, total - offset,
> + offset + aio_req->iov_offset, false);
> + if (ret == -1) {
> + logout("Error writing request data to NBD server: %i (%s)\n",
> + errno, strerror(errno));
> + return -EIO;
-errno
Or maybe nbd_wr_aio should already return -errno so that you can return
ret here.
> + }
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> + offset += ret;
> + }
> + }
>
> - if (reply.error !=0)
> - return -reply.error;
> + return 0;
> +}
> +
> +static inline AIOReq *alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
> + size_t data_len,
> + off_t offset,
> + off_t iov_offset)
> +{
> + AIOReq *aio_req;
>
> - if (reply.handle != request.handle)
> + aio_req = qemu_malloc(sizeof(*aio_req));
> + aio_req->aiocb = acb;
> + aio_req->iov_offset = iov_offset;
> + aio_req->offset = offset;
> + aio_req->data_len = data_len;
> + aio_req->handle = s->aioreq_seq_num++; /* FIXME: Trivially guessable */
> +
> + QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
> + outstanding_aio_siblings);
> + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> +
> + return aio_req;
> +}
> +
> +static void nbd_finish_aiocb(NBDAIOCB *acb)
> +{
> + if (!acb->canceled) {
> + acb->common.cb(acb->common.opaque, acb->ret);
> + }
> + qemu_aio_release(acb);
> +}
> +
> +
> +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> + NBDAIOCB *acb = (NBDAIOCB *)blockacb;
> +
> + /*
> + * We cannot cancel the requests which are already sent to
> + * the servers, so we just complete the request with -EIO here.
> + */
> + acb->common.cb(acb->common.opaque, -EIO);
> + acb->canceled = 1;
> +}
I think you need to check for acb->canceled before you write to the
associated buffer when receiving the reply for a read request. The
buffer might not exist any more after the request is cancelled.
> +
> +static AIOPool nbd_aio_pool = {
> + .aiocb_size = sizeof(NBDAIOCB),
> + .cancel = nbd_aio_cancel,
> +};
> +
> +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> + int64_t sector_num, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void
> *opaque)
> +{
> + NBDAIOCB *acb;
> +
> + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
> +
> + acb->qiov = qiov;
> +
> + acb->sector_num = sector_num;
> + acb->nb_sectors = nb_sectors;
> +
> + acb->aio_done_func = NULL;
> + acb->canceled = 0;
> + acb->bh = NULL;
> + acb->ret = 0;
> + QLIST_INIT(&acb->aioreq_head);
> + return acb;
> +}
> +
> +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
> +{
> + if (acb->bh) {
> + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> return -EIO;
> + }
> +
> + acb->bh = qemu_bh_new(cb, acb);
> + if (!acb->bh) {
> + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> + return -EIO;
> + }
> +
> + qemu_bh_schedule(acb->bh);
>
> return 0;
> }
>
> -static void nbd_close(BlockDriverState *bs)
> +/*
> + * Send I/O requests to the server.
> + *
> + * This function sends requests to the server, links the requests to
> + * the outstanding_list in BDRVNBDState, and exits without waiting for
> + * the response. The responses are received in the `aio_read_response'
> + * function which is called from the main loop as a fd handler.
> + * If this is a write request and it's >1MB, split it into multiple AIOReqs
> + */
> +static void nbd_readv_writev_bh_cb(void *p)
> {
> - BDRVNBDState *s = bs->opaque;
> - struct nbd_request request;
> + NBDAIOCB *acb = p;
> + int ret = 0;
>
> - request.type = NBD_CMD_DISC;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = 0;
> - request.len = 0;
> - nbd_send_request(s->sock, &request);
> + size_t len, done = 0;
> + size_t total = acb->nb_sectors * SECTOR_SIZE;
> +
> + /* Where the read/write starts from */
> + size_t offset = acb->sector_num * SECTOR_SIZE;
> + BDRVNBDState *s = acb->common.bs->opaque;
> +
> + AIOReq *aio_req;
>
> - close(s->sock);
> + qemu_bh_delete(acb->bh);
> + acb->bh = NULL;
> +
> + while (done != total) {
> + len = (total - done);
> +
> + /* Split write requests into 1MiB segments */
> + if(acb->aiocb_type == AIOCB_WRITE_UDATA && len > MAX_NBD_WRITE) {
> + len = MAX_NBD_WRITE;
> + }
> +
> + aio_req = alloc_aio_req(s, acb, len, offset + done, done);
> + ret = add_aio_request(s, aio_req, acb->qiov, acb->aiocb_type);
> +
> + if (ret < 0) {
> + free_aio_req(s, aio_req);
> + acb->ret = -EIO;
> + goto out;
> + }
> +
> + done += len;
> + }
> +out:
> + if (QLIST_EMPTY(&acb->aioreq_head)) {
> + nbd_finish_aiocb(acb);
> + }
> }
>
> +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + NBDAIOCB *acb;
> + int i;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_READ_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + for (i = 0; i < qiov->niov; i++) {
> + memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
> + }
qemu_iovec_memset?
What is this even for? Aren't these zeros overwritten anyway?
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + NBDAIOCB *acb;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_WRITE_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> +}
> +
> +
> static int64_t nbd_getlength(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> -
> return s->size;
> }
>
> static BlockDriver bdrv_nbd = {
> - .format_name = "nbd",
> - .instance_size = sizeof(BDRVNBDState),
> - .bdrv_file_open = nbd_open,
> - .bdrv_read = nbd_read,
> - .bdrv_write = nbd_write,
> - .bdrv_close = nbd_close,
> - .bdrv_getlength = nbd_getlength,
> - .protocol_name = "nbd",
> + .format_name = "nbd",
> + .instance_size = sizeof(BDRVNBDState),
> + .bdrv_file_open = nbd_open,
> + .bdrv_aio_readv = nbd_aio_readv,
> + .bdrv_aio_writev = nbd_aio_writev,
> + .bdrv_close = nbd_close,
> + .bdrv_getlength = nbd_getlength,
> + .protocol_name = "nbd"
> };
>
> static void bdrv_nbd_init(void)
Kevin
- [Qemu-devel] Re: [PATCH 3/3] block/nbd: Make the NBD block device use the AIO interface,
Kevin Wolf <=