qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v8 2/4] generic vhost user server


From: Coiby Xu
Subject: Re: [PATCH v8 2/4] generic vhost user server
Date: Mon, 15 Jun 2020 02:43:46 +0800

On Thu, Jun 11, 2020 at 02:14:49PM +0100, Stefan Hajnoczi wrote:
On Fri, Jun 05, 2020 at 07:35:36AM +0800, Coiby Xu wrote:
+static bool coroutine_fn
+vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
+{
+    struct iovec iov = {
+        .iov_base = (char *)vmsg,
+        .iov_len = VHOST_USER_HDR_SIZE,
+    };
+    int rc, read_bytes = 0;
+    Error *local_err = NULL;
+    /*
+     * Store fds/nfds returned from qio_channel_readv_full into
+     * temporary variables.
+     *
+     * VhostUserMsg is a packed structure, gcc will complain about passing
+     * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
+     * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
+     * thus two temporary variables nfds and fds are used here.
+     */
+    size_t nfds = 0, nfds_t = 0;
+    int *fds = NULL, *fds_t = NULL;
+    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+    QIOChannel *ioc = NULL;
+
+    if (conn_fd == server->sioc->fd) {
+        ioc = server->ioc;
+    } else {
+        /* Slave communication will also use this function to read msg */
+        ioc = slave_io_channel(server, conn_fd, &local_err);
+    }
+
+    if (!ioc) {
+        error_report_err(local_err);
+        goto fail;
+    }
+
+    assert(qemu_in_coroutine());
+    do {
+        /*
+         * qio_channel_readv_full may have short reads, keeping calling it
+         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
+         */
+        rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &local_err);
+        if (rc < 0) {
+            if (rc == QIO_CHANNEL_ERR_BLOCK) {
+                qio_channel_yield(ioc, G_IO_IN);
+                continue;
+            } else {
+                error_report_err(local_err);
+                return false;
+            }
+        }
+        read_bytes += rc;
+        if (nfds_t > 0) {
+            fds = g_renew(int, fds, nfds + nfds_t);
+            memcpy(fds + nfds, fds_t, nfds_t *sizeof(int));
+            nfds += nfds_t;
+            if (nfds > VHOST_MEMORY_MAX_NREGIONS) {
+                error_report("A maximum of %d fds are allowed, "
+                             "however got %lu fds now",
+                             VHOST_MEMORY_MAX_NREGIONS, nfds);
+                goto fail;
+            }
+            g_free(fds_t);

I'm not sure why the temporary fds[] array is necessary. Copying the fds
directly into vmsg->fds would be simpler:

 if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
     error_report("A maximum of %d fds are allowed, "
                  "however got %lu fds now",
                  VHOST_MEMORY_MAX_NREGIONS, nfds);
     goto fail;
 }
 memcpy(vmsg->fds + nfds, fds_t, nfds_t * sizeof(vds->fds[0]));
 nfds += nfds_t;

Did I misunderstand how this works?

No, the temporary fds[] array is not necessary. Thank for the
simplication!

+        }
+        if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
+            break;
+        }
+        iov.iov_base = (char *)vmsg + read_bytes;
+        iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
+    } while (true);
+
+    vmsg->fd_num = nfds;
+    if (nfds > 0) {
+        memcpy(vmsg->fds, fds, nfds * sizeof(int));
+    }
+    g_free(fds);
+    /* qio_channel_readv_full will make socket fds blocking, unblock them */
+    vmsg_unblock_fds(vmsg);
+    if (vmsg->size > sizeof(vmsg->payload)) {
+        error_report("Error: too big message request: %d, "
+                     "size: vmsg->size: %u, "
+                     "while sizeof(vmsg->payload) = %zu",
+                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
+        goto fail;
+    }
+
+    struct iovec iov_payload = {
+        .iov_base = (char *)&vmsg->payload,
+        .iov_len = vmsg->size,
+    };
+    if (vmsg->size) {
+        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
+        if (rc == -1) {
+            error_report_err(local_err);
+            goto fail;
+        }
+    }
+
+    return true;
+
+fail:
+    vmsg_close_fds(vmsg);
+
+    return false;
+}
+
+
+static void vu_client_start(VuServer *server);
+static coroutine_fn void vu_client_trip(void *opaque)
+{
+    VuServer *server = opaque;
+
+    while (!server->aio_context_changed && server->sioc) {
+        vu_dispatch(&server->vu_dev);
+    }
+
+    if (server->aio_context_changed && server->sioc) {
+        server->aio_context_changed = false;
+        vu_client_start(server);
+    }
+}
+
+static void vu_client_start(VuServer *server)
+{
+    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
+    aio_co_enter(server->ctx, server->co_trip);
+}
+
+/*
+ * a wrapper for vu_kick_cb
+ *
+ * since aio_dispatch can only pass one user data pointer to the
+ * callback function, pack VuDev and pvt into a struct. Then unpack it
+ * and pass them to vu_kick_cb
+ */
+static void kick_handler(void *opaque)
+{
+    KickInfo *kick_info = opaque;
+    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
+}
+
+
+static void
+set_watch(VuDev *vu_dev, int fd, int vu_evt,
+          vu_watch_cb cb, void *pvt)
+{
+
+    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+    g_assert(vu_dev);
+    g_assert(fd >= 0);
+    long index = (intptr_t) pvt;
+    g_assert(cb);
+    KickInfo *kick_info = &server->kick_info[index];
+    if (!kick_info->cb) {
+        kick_info->fd = fd;
+        kick_info->cb = cb;
+        qemu_set_nonblock(fd);
+        aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
+                           NULL, NULL, kick_info);
+        kick_info->vu_dev = vu_dev;
+    }
+}
+
+
+static void remove_watch(VuDev *vu_dev, int fd)
+{
+    VuServer *server;
+    int i;
+    int index = -1;
+    g_assert(vu_dev);
+    g_assert(fd >= 0);
+
+    server = container_of(vu_dev, VuServer, vu_dev);
+    for (i = 0; i < vu_dev->max_queues; i++) {
+        if (server->kick_info[i].fd == fd) {
+            index = i;
+            break;
+        }
+    }
+
+    if (index == -1) {
+        return;
+    }
+    server->kick_info[i].cb = NULL;
+    aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
+}
+
+
+static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
+                      gpointer opaque)
+{
+    VuServer *server = opaque;
+
+    if (server->sioc) {
+        warn_report("Only one vhost-user client is allowed to "
+                    "connect the server one time");
+        return;
+    }
+
+    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
+                 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
+        error_report("Failed to initialized libvhost-user");
+        return;
+    }
+
+    /*
+     * Unset the callback function for network listener to make another
+     * vhost-user client keeping waiting until this client disconnects
+     */
+    qio_net_listener_set_client_func(server->listener,
+                                     NULL,
+                                     NULL,
+                                     NULL);
+    server->sioc = sioc;
+    server->kick_info = g_new0(KickInfo, server->max_queues);

Where is kick_info freed?

+    /*
+     * Increase the object reference, so cioc will not freed by

s/cioc/sioc/

+     * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
+     */
+    object_ref(OBJECT(server->sioc));
+    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
+    server->ioc = QIO_CHANNEL(sioc);
+    object_ref(OBJECT(server->ioc));
+    object_ref(OBJECT(sioc));

Why are there two object_refs for sioc and where is unref called?

Thank you for pointing out the errors regarding memory deallocation and
the typo.
+    qio_channel_attach_aio_context(server->ioc, server->ctx);
+    qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
+    vu_client_start(server);
+}
+
+
+void vhost_user_server_stop(VuServer *server)
+{
+    if (!server) {
+        return;
+    }
+
+    if (server->sioc) {
+        close_client(server);
+        object_unref(OBJECT(server->sioc));

This call is object_unref(NULL) since close_client() does server->sioc =
NULL.

+    }
+
+    if (server->listener) {
+        qio_net_listener_disconnect(server->listener);
+        object_unref(OBJECT(server->listener));
+    }
+}
+
+static void detach_context(VuServer *server)
+{
+    int i;
+    AioContext *ctx = server->ioc->ctx;
+    qio_channel_detach_aio_context(server->ioc);
+    for (i = 0; i < server->vu_dev.max_queues; i++) {
+        if (server->kick_info[i].cb) {
+            aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
+                               NULL, NULL, NULL);
+        }
+    }
+}
+
+static void attach_context(VuServer *server, AioContext *ctx)
+{
+    int i;
+    qio_channel_attach_aio_context(server->ioc, ctx);
+    server->aio_context_changed = true;
+    if (server->co_trip) {
+        aio_co_schedule(ctx, server->co_trip);
+    }
+    for (i = 0; i < server->vu_dev.max_queues; i++) {
+        if (server->kick_info[i].cb) {
+            aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
+                               kick_handler, NULL, NULL,
+                               &server->kick_info[i]);
+        }
+    }
+}
+
+void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
+{
+    server->ctx = ctx ? ctx : qemu_get_aio_context();
+    if (!server->sioc) {
+        return;
+    }
+    if (ctx) {
+        attach_context(server, ctx);
+    } else {
+        detach_context(server);
+    }
+}
+
+
+bool vhost_user_server_start(uint16_t max_queues,
+                             SocketAddress *socket_addr,
+                             AioContext *ctx,
+                             VuServer *server,
+                             void *device_panic_notifier,
+                             const VuDevIface *vu_iface,
+                             Error **errp)
+{
+    server->listener = qio_net_listener_new();
+    if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
+                                   errp) < 0) {
+        goto error;
+    }
+
+    qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
+
+    server->vu_iface = vu_iface;
+    server->max_queues = max_queues;
+    server->ctx = ctx;
+    server->device_panic_notifier = device_panic_notifier;
+    qio_net_listener_set_client_func(server->listener,
+                                     vu_accept,
+                                     server,
+                                     NULL);

The qio_net_listener_set_client_func() call uses the default
GMainContext but we have an AioContext *ctx argument. This is
surprising. I would expect the socket to be handled in the AioContext.

Can you clarify how this should work?
Yes, the vhost-user server will accept new client connections in the
default GMainText. But vhost-user message and kick event will be
processed in the block drive's AIOContext and if the block drive's
AIOContext is changed, these tasks will also be move to the new AIOContext.

Btw, I intended to use chardev to help manage client connections after
limiting one vhost-user server to serve one client one time. But according
to Mark and other people[1], converting all chardev functions to AIO doesn't
seem to be worth the effort.

[1] https://lists.gnu.org/archive/html/qemu-devel/2020-04/msg01485.html


+
+    return true;
+error:
+    g_free(server);

It's surprising that this function frees the server argument when an
error occurs. vhost_user_server_stop() does not free server. I suggest
letting the caller free server since they own the object.

+    return false;
+}
diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
new file mode 100644
index 0000000000..4315556b66
--- /dev/null
+++ b/util/vhost-user-server.h
@@ -0,0 +1,59 @@
+/*
+ * Sharing QEMU devices via vhost-user protocol
+ *
+ * Author: Coiby Xu <coiby.xu@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef VHOST_USER_SERVER_H
+#define VHOST_USER_SERVER_H
+
+#include "contrib/libvhost-user/libvhost-user.h"
+#include "io/channel-socket.h"
+#include "io/channel-file.h"
+#include "io/net-listener.h"
+#include "qemu/error-report.h"
+#include "qapi/error.h"
+#include "standard-headers/linux/virtio_blk.h"
+
+typedef struct KickInfo {
+    VuDev *vu_dev;
+    int fd; /*kick fd*/
+    long index; /*queue index*/
+    vu_watch_cb cb;
+} KickInfo;
+
+typedef struct VuServer {
+    QIONetListener *listener;
+    AioContext *ctx;
+    void (*device_panic_notifier)(struct VuServer *server) ;
+    int max_queues;
+    const VuDevIface *vu_iface;
+    VuDev vu_dev;
+    QIOChannel *ioc; /* The I/O channel with the client */
+    QIOChannelSocket *sioc; /* The underlying data channel with the client */
+    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
+    QIOChannel *ioc_slave;
+    QIOChannelSocket *sioc_slave;
+    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
+    KickInfo *kick_info; /* an array with the length of the queue number */
+    /* restart coroutine co_trip if AIOContext is changed */
+    bool aio_context_changed;
+} VuServer;
+
+
+bool vhost_user_server_start(uint16_t max_queues,
+                             SocketAddress *unix_socket,
+                             AioContext *ctx,
+                             VuServer *server,
+                             void *device_panic_notifier,

Please declare the function pointer type:

typedef void DevicePanicNotifierFn(struct VuServer *server);

Then the argument list can use DevicePanicNotifierFn
*device_panic_notifier instead of void *.

+                             const VuDevIface *vu_iface,
+                             Error **errp);
+
+void vhost_user_server_stop(VuServer *server);
+
+void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);

If you send another revision, please make VuServer *server the first
argument of vhost_user_server_start() and
vhost_user_server_set_aio_context(). Functions usually have the object
they act on as the first argument.

Thank you! These issues have been addressed in v9.


--
Best regards,
Coiby



reply via email to

[Prev in Thread] Current Thread [Next in Thread]