qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v8 2/4] generic vhost user server


From: Stefan Hajnoczi
Subject: Re: [PATCH v8 2/4] generic vhost user server
Date: Thu, 11 Jun 2020 14:14:49 +0100

On Fri, Jun 05, 2020 at 07:35:36AM +0800, Coiby Xu wrote:
> +static bool coroutine_fn
> +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
> +{
> +    struct iovec iov = {
> +        .iov_base = (char *)vmsg,
> +        .iov_len = VHOST_USER_HDR_SIZE,
> +    };
> +    int rc, read_bytes = 0;
> +    Error *local_err = NULL;
> +    /*
> +     * Store fds/nfds returned from qio_channel_readv_full into
> +     * temporary variables.
> +     *
> +     * VhostUserMsg is a packed structure, gcc will complain about passing
> +     * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
> +     * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
> +     * thus two temporary variables nfds and fds are used here.
> +     */
> +    size_t nfds = 0, nfds_t = 0;
> +    int *fds = NULL, *fds_t = NULL;
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +    QIOChannel *ioc = NULL;
> +
> +    if (conn_fd == server->sioc->fd) {
> +        ioc = server->ioc;
> +    } else {
> +        /* Slave communication will also use this function to read msg */
> +        ioc = slave_io_channel(server, conn_fd, &local_err);
> +    }
> +
> +    if (!ioc) {
> +        error_report_err(local_err);
> +        goto fail;
> +    }
> +
> +    assert(qemu_in_coroutine());
> +    do {
> +        /*
> +         * qio_channel_readv_full may have short reads, keeping calling it
> +         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
> +         */
> +        rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, 
> &local_err);
> +        if (rc < 0) {
> +            if (rc == QIO_CHANNEL_ERR_BLOCK) {
> +                qio_channel_yield(ioc, G_IO_IN);
> +                continue;
> +            } else {
> +                error_report_err(local_err);
> +                return false;
> +            }
> +        }
> +        read_bytes += rc;
> +        if (nfds_t > 0) {
> +            fds = g_renew(int, fds, nfds + nfds_t);
> +            memcpy(fds + nfds, fds_t, nfds_t *sizeof(int));
> +            nfds += nfds_t;
> +            if (nfds > VHOST_MEMORY_MAX_NREGIONS) {
> +                error_report("A maximum of %d fds are allowed, "
> +                             "however got %lu fds now",
> +                             VHOST_MEMORY_MAX_NREGIONS, nfds);
> +                goto fail;
> +            }
> +            g_free(fds_t);

I'm not sure why the temporary fds[] array is necessary. Copying the fds
directly into vmsg->fds would be simpler:

  if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
      error_report("A maximum of %d fds are allowed, "
                   "however got %lu fds now",
                   VHOST_MEMORY_MAX_NREGIONS, nfds);
      goto fail;
  }
  memcpy(vmsg->fds + nfds, fds_t, nfds_t * sizeof(vds->fds[0]));
  nfds += nfds_t;

Did I misunderstand how this works?

> +        }
> +        if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
> +            break;
> +        }
> +        iov.iov_base = (char *)vmsg + read_bytes;
> +        iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
> +    } while (true);
> +
> +    vmsg->fd_num = nfds;
> +    if (nfds > 0) {
> +        memcpy(vmsg->fds, fds, nfds * sizeof(int));
> +    }
> +    g_free(fds);
> +    /* qio_channel_readv_full will make socket fds blocking, unblock them */
> +    vmsg_unblock_fds(vmsg);
> +    if (vmsg->size > sizeof(vmsg->payload)) {
> +        error_report("Error: too big message request: %d, "
> +                     "size: vmsg->size: %u, "
> +                     "while sizeof(vmsg->payload) = %zu",
> +                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
> +        goto fail;
> +    }
> +
> +    struct iovec iov_payload = {
> +        .iov_base = (char *)&vmsg->payload,
> +        .iov_len = vmsg->size,
> +    };
> +    if (vmsg->size) {
> +        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
> +        if (rc == -1) {
> +            error_report_err(local_err);
> +            goto fail;
> +        }
> +    }
> +
> +    return true;
> +
> +fail:
> +    vmsg_close_fds(vmsg);
> +
> +    return false;
> +}
> +
> +
> +static void vu_client_start(VuServer *server);
> +static coroutine_fn void vu_client_trip(void *opaque)
> +{
> +    VuServer *server = opaque;
> +
> +    while (!server->aio_context_changed && server->sioc) {
> +        vu_dispatch(&server->vu_dev);
> +    }
> +
> +    if (server->aio_context_changed && server->sioc) {
> +        server->aio_context_changed = false;
> +        vu_client_start(server);
> +    }
> +}
> +
> +static void vu_client_start(VuServer *server)
> +{
> +    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
> +    aio_co_enter(server->ctx, server->co_trip);
> +}
> +
> +/*
> + * a wrapper for vu_kick_cb
> + *
> + * since aio_dispatch can only pass one user data pointer to the
> + * callback function, pack VuDev and pvt into a struct. Then unpack it
> + * and pass them to vu_kick_cb
> + */
> +static void kick_handler(void *opaque)
> +{
> +    KickInfo *kick_info = opaque;
> +    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
> +}
> +
> +
> +static void
> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
> +          vu_watch_cb cb, void *pvt)
> +{
> +
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +    g_assert(vu_dev);
> +    g_assert(fd >= 0);
> +    long index = (intptr_t) pvt;
> +    g_assert(cb);
> +    KickInfo *kick_info = &server->kick_info[index];
> +    if (!kick_info->cb) {
> +        kick_info->fd = fd;
> +        kick_info->cb = cb;
> +        qemu_set_nonblock(fd);
> +        aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
> +                           NULL, NULL, kick_info);
> +        kick_info->vu_dev = vu_dev;
> +    }
> +}
> +
> +
> +static void remove_watch(VuDev *vu_dev, int fd)
> +{
> +    VuServer *server;
> +    int i;
> +    int index = -1;
> +    g_assert(vu_dev);
> +    g_assert(fd >= 0);
> +
> +    server = container_of(vu_dev, VuServer, vu_dev);
> +    for (i = 0; i < vu_dev->max_queues; i++) {
> +        if (server->kick_info[i].fd == fd) {
> +            index = i;
> +            break;
> +        }
> +    }
> +
> +    if (index == -1) {
> +        return;
> +    }
> +    server->kick_info[i].cb = NULL;
> +    aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
> +}
> +
> +
> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
> +                      gpointer opaque)
> +{
> +    VuServer *server = opaque;
> +
> +    if (server->sioc) {
> +        warn_report("Only one vhost-user client is allowed to "
> +                    "connect the server one time");
> +        return;
> +    }
> +
> +    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
> +                 vu_message_read, set_watch, remove_watch, 
> server->vu_iface)) {
> +        error_report("Failed to initialized libvhost-user");
> +        return;
> +    }
> +
> +    /*
> +     * Unset the callback function for network listener to make another
> +     * vhost-user client keeping waiting until this client disconnects
> +     */
> +    qio_net_listener_set_client_func(server->listener,
> +                                     NULL,
> +                                     NULL,
> +                                     NULL);
> +    server->sioc = sioc;
> +    server->kick_info = g_new0(KickInfo, server->max_queues);

Where is kick_info freed?

> +    /*
> +     * Increase the object reference, so cioc will not freed by

s/cioc/sioc/

> +     * qio_net_listener_channel_func which will call 
> object_unref(OBJECT(sioc))
> +     */
> +    object_ref(OBJECT(server->sioc));
> +    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
> +    server->ioc = QIO_CHANNEL(sioc);
> +    object_ref(OBJECT(server->ioc));
> +    object_ref(OBJECT(sioc));

Why are there two object_refs for sioc and where is unref called?

> +    qio_channel_attach_aio_context(server->ioc, server->ctx);
> +    qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
> +    vu_client_start(server);
> +}
> +
> +
> +void vhost_user_server_stop(VuServer *server)
> +{
> +    if (!server) {
> +        return;
> +    }
> +
> +    if (server->sioc) {
> +        close_client(server);
> +        object_unref(OBJECT(server->sioc));

This call is object_unref(NULL) since close_client() does server->sioc =
NULL.

> +    }
> +
> +    if (server->listener) {
> +        qio_net_listener_disconnect(server->listener);
> +        object_unref(OBJECT(server->listener));
> +    }
> +}
> +
> +static void detach_context(VuServer *server)
> +{
> +    int i;
> +    AioContext *ctx = server->ioc->ctx;
> +    qio_channel_detach_aio_context(server->ioc);
> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
> +        if (server->kick_info[i].cb) {
> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
> +                               NULL, NULL, NULL);
> +        }
> +    }
> +}
> +
> +static void attach_context(VuServer *server, AioContext *ctx)
> +{
> +    int i;
> +    qio_channel_attach_aio_context(server->ioc, ctx);
> +    server->aio_context_changed = true;
> +    if (server->co_trip) {
> +        aio_co_schedule(ctx, server->co_trip);
> +    }
> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
> +        if (server->kick_info[i].cb) {
> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
> +                               kick_handler, NULL, NULL,
> +                               &server->kick_info[i]);
> +        }
> +    }
> +}
> +
> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
> +{
> +    server->ctx = ctx ? ctx : qemu_get_aio_context();
> +    if (!server->sioc) {
> +        return;
> +    }
> +    if (ctx) {
> +        attach_context(server, ctx);
> +    } else {
> +        detach_context(server);
> +    }
> +}
> +
> +
> +bool vhost_user_server_start(uint16_t max_queues,
> +                             SocketAddress *socket_addr,
> +                             AioContext *ctx,
> +                             VuServer *server,
> +                             void *device_panic_notifier,
> +                             const VuDevIface *vu_iface,
> +                             Error **errp)
> +{
> +    server->listener = qio_net_listener_new();
> +    if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
> +                                   errp) < 0) {
> +        goto error;
> +    }
> +
> +    qio_net_listener_set_name(server->listener, 
> "vhost-user-backend-listener");
> +
> +    server->vu_iface = vu_iface;
> +    server->max_queues = max_queues;
> +    server->ctx = ctx;
> +    server->device_panic_notifier = device_panic_notifier;
> +    qio_net_listener_set_client_func(server->listener,
> +                                     vu_accept,
> +                                     server,
> +                                     NULL);

The qio_net_listener_set_client_func() call uses the default
GMainContext but we have an AioContext *ctx argument. This is
surprising. I would expect the socket to be handled in the AioContext.

Can you clarify how this should work?

> +
> +    return true;
> +error:
> +    g_free(server);

It's surprising that this function frees the server argument when an
error occurs. vhost_user_server_stop() does not free server. I suggest
letting the caller free server since they own the object.

> +    return false;
> +}
> diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
> new file mode 100644
> index 0000000000..4315556b66
> --- /dev/null
> +++ b/util/vhost-user-server.h
> @@ -0,0 +1,59 @@
> +/*
> + * Sharing QEMU devices via vhost-user protocol
> + *
> + * Author: Coiby Xu <coiby.xu@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef VHOST_USER_SERVER_H
> +#define VHOST_USER_SERVER_H
> +
> +#include "contrib/libvhost-user/libvhost-user.h"
> +#include "io/channel-socket.h"
> +#include "io/channel-file.h"
> +#include "io/net-listener.h"
> +#include "qemu/error-report.h"
> +#include "qapi/error.h"
> +#include "standard-headers/linux/virtio_blk.h"
> +
> +typedef struct KickInfo {
> +    VuDev *vu_dev;
> +    int fd; /*kick fd*/
> +    long index; /*queue index*/
> +    vu_watch_cb cb;
> +} KickInfo;
> +
> +typedef struct VuServer {
> +    QIONetListener *listener;
> +    AioContext *ctx;
> +    void (*device_panic_notifier)(struct VuServer *server) ;
> +    int max_queues;
> +    const VuDevIface *vu_iface;
> +    VuDev vu_dev;
> +    QIOChannel *ioc; /* The I/O channel with the client */
> +    QIOChannelSocket *sioc; /* The underlying data channel with the client */
> +    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
> +    QIOChannel *ioc_slave;
> +    QIOChannelSocket *sioc_slave;
> +    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
> +    KickInfo *kick_info; /* an array with the length of the queue number */
> +    /* restart coroutine co_trip if AIOContext is changed */
> +    bool aio_context_changed;
> +} VuServer;
> +
> +
> +bool vhost_user_server_start(uint16_t max_queues,
> +                             SocketAddress *unix_socket,
> +                             AioContext *ctx,
> +                             VuServer *server,
> +                             void *device_panic_notifier,

Please declare the function pointer type:

typedef void DevicePanicNotifierFn(struct VuServer *server);

Then the argument list can use DevicePanicNotifierFn
*device_panic_notifier instead of void *.

> +                             const VuDevIface *vu_iface,
> +                             Error **errp);
> +
> +void vhost_user_server_stop(VuServer *server);
> +
> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);

If you send another revision, please make VuServer *server the first
argument of vhost_user_server_start() and
vhost_user_server_set_aio_context(). Functions usually have the object
they act on as the first argument.

Attachment: signature.asc
Description: PGP signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]