qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v2 3/7] multifd: adding multi-interface support for multifd on de


From: Het Gala
Subject: [PATCH v2 3/7] multifd: adding multi-interface support for multifd on destination side
Date: Thu, 21 Jul 2022 19:56:16 +0000

i) Modified the format of qemu monitor command: 'migrate-incoming' by adding
   a list, each element in the list to open socket listeners with a given
   number of multifd channels.

ii) Qemu starts with -incoming flag defer and -multi-fd-incoming defer to
    allow the modified 'migrate-incoming' command to be used.

iii) Format for -multi-fd-incoming flag as a comma separated string has been
     added with each substring containing listener socket address and number
     of sockets to open.

Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
Signed-off-by: Het Gala <het.gala@nutanix.com>
---
 migration/migration.c | 143 ++++++++++++++++++++++++++++++++++++------
 migration/migration.h |   2 +
 migration/socket.c    |  11 ++--
 migration/socket.h    |   3 +-
 qapi/migration.json   |  46 ++++++++++++--
 qapi/qapi-util.c      |   3 +-
 qemu-options.hx       |  18 ++++++
 softmmu/vl.c          |  30 ++++++++-
 8 files changed, 221 insertions(+), 35 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 572b909423..c58b81576c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -486,28 +486,41 @@ void migrate_add_address(SocketAddress *address)
                       QAPI_CLONE(SocketAddress, address));
 }
 
-static void qemu_start_incoming_migration(const char *uri, Error **errp)
+static void qemu_start_incoming_migration(const char *uri,
+                                          uint8_t multifd_count, int idx,
+                                          Error **errp)
 {
     const char *p = NULL;
 
-    migrate_protocol_allow_multi_channels(false); /* reset it anyway */
-    qapi_event_send_migration(MIGRATION_STATUS_SETUP);
-    if (strstart(uri, "tcp:", &p) ||
-        strstart(uri, "unix:", NULL) ||
-        strstart(uri, "vsock:", NULL)) {
-        migrate_protocol_allow_multi_channels(true);
-        socket_start_incoming_migration(p ? p : uri, errp);
-#ifdef CONFIG_RDMA
-    } else if (strstart(uri, "rdma:", &p)) {
-        rdma_start_incoming_migration(p, errp);
-#endif
-    } else if (strstart(uri, "exec:", &p)) {
-        exec_start_incoming_migration(p, errp);
-    } else if (strstart(uri, "fd:", &p)) {
-        fd_start_incoming_migration(p, errp);
+    if (multifd_count ==  0) {
+        migrate_protocol_allow_multi_channels(false); /* reset it anyway */
+        qapi_event_send_migration(MIGRATION_STATUS_SETUP);
+        if (strstart(uri, "tcp:", &p) ||
+            strstart(uri, "unix:", NULL) ||
+            strstart(uri, "vsock:", NULL)) {
+            migrate_protocol_allow_multi_channels(true);
+    #ifdef CONFIG_RDMA
+        } else if (strstart(uri, "rdma:", &p)) {
+            rdma_start_incoming_migration(p, errp);
+    #endif
+        } else if (strstart(uri, "exec:", &p)) {
+            exec_start_incoming_migration(p, errp);
+        } else if (strstart(uri, "fd:", &p)) {
+            fd_start_incoming_migration(p, errp);
+        } else {
+            error_setg(errp, "unknown migration protocol: %s", uri);
+        }
     } else {
-        error_setg(errp, "unknown migration protocol: %s", uri);
+        /* multi-FD parameters only support tcp network protocols */
+        if (!strstart(uri, "tcp:", &p)) {
+            error_setg(errp, "multifd-destination uri supports "
+                                "tcp protocol only");
+            return;
+        }
+        store_multifd_migration_params(p ? p : uri, NULL, multifd_count,
+        idx, errp);
     }
+    socket_start_incoming_migration(p ? p : uri, multifd_count, errp);
 }
 
 static void process_incoming_migration_bh(void *opaque)
@@ -2192,7 +2205,8 @@ void migrate_del_blocker(Error *reason)
     migration_blockers = g_slist_remove(migration_blockers, reason);
 }
 
-void qmp_migrate_incoming(const char *uri, Error **errp)
+/* migrate_incoming function is for -incoming flag process */
+void migrate_incoming(const char *uri, Error **errp)
 {
     Error *local_err = NULL;
     static bool once = true;
@@ -2210,7 +2224,7 @@ void qmp_migrate_incoming(const char *uri, Error **errp)
         return;
     }
 
-    qemu_start_incoming_migration(uri, &local_err);
+    qemu_start_incoming_migration(uri, 0, 0, &local_err);
 
     if (local_err) {
         yank_unregister_instance(MIGRATION_YANK_INSTANCE);
@@ -2221,6 +2235,95 @@ void qmp_migrate_incoming(const char *uri, Error **errp)
     once = false;
 }
 
+/*
+ * multi_fd_migrate_incoming function is for -multi-fd-migrate-incoming
+ * flag process
+ */
+void multi_fd_migrate_incoming(const char *uri, Error **errp)
+{
+    Error *local_err = NULL;
+    static bool once = true;
+
+    if (!once) {
+        error_setg(errp, "The incoming migration has already been started");
+        return;
+    }
+    if (!runstate_check(RUN_STATE_INMIGRATE)) {
+        error_setg(errp, "'-multi-fd-incoming' was not specified on the 
command line");
+        return;
+    }
+
+    strList *st = strList_from_string(uri, ',');
+    strList *r = st;
+    int length = QAPI_LIST_LENGTH(st);
+    init_multifd_array(length);
+
+    for (int i = 0; i < length; i++) {
+        const char *uri = NULL, *ret = NULL;
+        const char *str = r->value;
+        uint8_t multifd_channels = DEFAULT_MIGRATE_MULTIFD_CHANNELS;
+        int parse_count = qemu_string_count_delim(str, ':');
+        if (parse_count < 2 || parse_count > 3) {
+            error_setg(errp, "Invalid format of string-id %d in "
+                             "'-multi-fd-incoming' flag", i);
+            return;
+        }
+        if (parse_count == 3) {
+            ret = strrchr(str, ':');
+            uri = g_strndup(str, strlen(str) - strlen(ret));
+            multifd_channels = atoi(ret + 1);
+        }
+        qemu_start_incoming_migration(parse_count == 2 ? str : uri,
+                                      multifd_channels, i, &local_err);
+        r = r->next;
+    }
+    if (local_err) {
+        yank_unregister_instance(MIGRATION_YANK_INSTANCE);
+        error_propagate(errp, local_err);
+        return;
+    }
+
+    once = false;
+}
+
+/* qmp_migrate_incoming comes from qemu qmp monitor command */
+void qmp_migrate_incoming(const char *uri, bool has_multi_fd_uri_list,
+                          MigrateIncomingUriList *cap, Error **errp)
+{
+    Error *local_err = NULL;
+    static bool once = true;
+
+    if (!once) {
+        error_setg(errp, "The incoming migration has already been started");
+        return;
+    }
+
+    if (!yank_register_instance(MIGRATION_YANK_INSTANCE, errp)) {
+        return;
+    }
+
+    /* For migration thread */
+    qemu_start_incoming_migration(uri, 0, 0, &local_err);
+
+    /* For Multi-FD */
+    int length = QAPI_LIST_LENGTH(cap);
+    init_multifd_array(length);
+    for (int i = 0; i < length; i++) {
+        const char *multifd_dst_uri = cap->value->destination_uri;
+        uint8_t multifd_channels = cap->value->multifd_channels;
+        qemu_start_incoming_migration(multifd_dst_uri, multifd_channels,
+                                      i, &local_err);
+        cap = cap->next;
+    }
+
+    if (local_err) {
+        yank_unregister_instance(MIGRATION_YANK_INSTANCE);
+        error_propagate(errp, local_err);
+        return;
+    }
+    once = false;
+}
+
 void qmp_migrate_recover(const char *uri, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
@@ -2246,7 +2349,7 @@ void qmp_migrate_recover(const char *uri, Error **errp)
      * only re-setup the migration stream and poke existing migration
      * to continue using that newly established channel.
      */
-    qemu_start_incoming_migration(uri, errp);
+    qemu_start_incoming_migration(uri, 0, 0, errp);
 }
 
 void qmp_migrate_pause(Error **errp)
diff --git a/migration/migration.h b/migration/migration.h
index cdad8aceaa..4cb81f7cf0 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -394,6 +394,8 @@ bool migration_is_setup_or_active(int state);
 bool migration_is_running(int state);
 
 void migrate_init(MigrationState *s);
+void migrate_incoming(const char *uri, Error **errp);
+void multi_fd_migrate_incoming(const char *uri_str, Error **errp);
 bool migration_is_blocked(Error **errp);
 /* True if outgoing migration has entered postcopy phase */
 bool migration_in_postcopy(void);
diff --git a/migration/socket.c b/migration/socket.c
index f199430bc2..dab872a0fe 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -227,17 +227,17 @@ socket_incoming_migration_end(void *opaque)
 
 static void
 socket_start_incoming_migration_internal(SocketAddress *saddr,
-                                         Error **errp)
+                                         uint8_t multifd_count, Error **errp)
 {
     QIONetListener *listener = qio_net_listener_new();
     MigrationIncomingState *mis = migration_incoming_get_current();
     size_t i;
-    int num = 1;
+    uint8_t num = 1;
 
     qio_net_listener_set_name(listener, "migration-socket-listener");
 
     if (migrate_use_multifd()) {
-        num = migrate_multifd_channels();
+        num = multifd_count;
     } else if (migrate_postcopy_preempt()) {
         num = RAM_CHANNEL_MAX;
     }
@@ -266,12 +266,13 @@ socket_start_incoming_migration_internal(SocketAddress 
*saddr,
     }
 }
 
-void socket_start_incoming_migration(const char *str, Error **errp)
+void socket_start_incoming_migration(const char *str,
+                                     uint8_t multifd_count, Error **errp)
 {
     Error *err = NULL;
     SocketAddress *saddr = socket_parse(str, &err);
     if (!err) {
-        socket_start_incoming_migration_internal(saddr, &err);
+        socket_start_incoming_migration_internal(saddr, multifd_count, &err);
     }
     qapi_free_SocketAddress(saddr);
     error_propagate(errp, err);
diff --git a/migration/socket.h b/migration/socket.h
index 0cbb7220ac..7c82278d33 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -33,7 +33,8 @@ void socket_send_channel_create(QIOTaskFunc f, void *data);
 QIOChannel *socket_send_channel_create_sync(Error **errp);
 int socket_send_channel_destroy(QIOChannel *send);
 
-void socket_start_incoming_migration(const char *str, Error **errp);
+void socket_start_incoming_migration(const char *str, uint8_t number,
+                                     Error **errp);
 
 void socket_start_outgoing_migration(MigrationState *s, const char *dst_str,
                                      Error **errp);
diff --git a/qapi/migration.json b/qapi/migration.json
index 456247af8f..3908c9096c 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -1526,14 +1526,36 @@
            '*blk': 'bool', '*inc': 'bool', '*detach': 'bool',
            '*resume': 'bool' } }
 
+##
+# @MigrateIncomingUri:
+#
+# Information regarding which destination listening interface to be connected
+# and number of multifd channels over that interface.
+#
+# @destination-uri: the Uniform Resource Identifier of the destination VM
+#
+# @multifd-channels: number of channels used to migrate data in parallel for
+#                    for specific source-uri and destination-uri.
+#                    Default value in this case is 2 (Since 4.0)
+#
+##
+{ 'struct' : 'MigrateIncomingUri',
+  'data' : { 'destination-uri' : 'str',
+           '*multifd-channels' : 'uint8'} }
+
 ##
 # @migrate-incoming:
 #
 # Start an incoming migration, the qemu must have been started
-# with -incoming defer
+# with -incoming defer. qemu can also be started with optional
+# -multi-fd-incoming defer for opening multifd listening sockets
 #
 # @uri: The Uniform Resource Identifier identifying the source or
-#       address to listen on
+#       address to listen on.
+#
+# @multi-fd-uri-list: list of pair of source and destination VM Uniform
+#                     Resource Identifiers with number of multifd-channels
+#                     for each pair.
 #
 # Returns: nothing on success
 #
@@ -1545,19 +1567,31 @@
 #    compatible with -incoming and the format of the uri is already exposed
 #    above libvirt.
 #
-# 2. QEMU must be started with -incoming defer to allow migrate-incoming to
+# 2. multi-fd-uri-list will have list of destination uri as listening sockets
+#    and multi-fd number of channels on each listening socket.
+#
+# 3. QEMU must be started with -incoming defer to allow migrate-incoming to
 #    be used.
 #
-# 3. The uri format is the same as for -incoming
+# 4. multi-fd-uri-list format is not the same as for -multi-fd-incoming flag.
+#    For -multi-fd-incoming flag, it is a comma separated list of listener
+#    sockets with multifd channels.
+#    Example: -multi-fd-incoming "tcp::6900:4,tcp:11.0.0.0:7789:5".
 #
 # Example:
 #
 # -> { "execute": "migrate-incoming",
-#      "arguments": { "uri": "tcp::4446" } }
+#      "arguments": {
+#          "uri": "tcp::6789",
+#          "multi-fd-uri-list" : [ { "destination-uri" : "tcp::6900",
+#                                    "multifd-channels": 4},
+#                                  { "destination-uri" : "tcp:11.0.0.0:7789",
+#                                    "multifd-channels": 5} ] } }
 # <- { "return": {} }
 #
 ##
-{ 'command': 'migrate-incoming', 'data': {'uri': 'str' } }
+{ 'command': 'migrate-incoming',
+  'data': {'uri': 'str', '*multi-fd-uri-list': ['MigrateIncomingUri'] } }
 
 ##
 # @xen-save-devices-state:
diff --git a/qapi/qapi-util.c b/qapi/qapi-util.c
index 9672ac6018..a256f12ad2 100644
--- a/qapi/qapi-util.c
+++ b/qapi/qapi-util.c
@@ -15,6 +15,7 @@
 #include "qapi/error.h"
 #include "qemu/ctype.h"
 #include "qapi/qmp/qerror.h"
+#include "qapi/qapi-builtin-types.h"
 
 CompatPolicy compat_policy;
 
@@ -157,7 +158,7 @@ int parse_qapi_name(const char *str, bool complete)
  * Produce a strList from a delimiter separated list.
  * A NULL or empty input string return NULL.
  */
-strList *strList_from_string(const char *in, char c)
+struct strList *strList_from_string(const char *in, char c)
 {
     strList *res = NULL;
     strList **tail = &res;
diff --git a/qemu-options.hx b/qemu-options.hx
index 79e00916a1..5555f0d2fd 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -4479,6 +4479,24 @@ SRST
     to issuing the migrate\_incoming to allow the migration to begin.
 ERST
 
+DEF("multi-fd-incoming", HAS_ARG, QEMU_OPTION_multi_fd_incoming, \
+    "-multi-fd-incoming 
tcp:[host]:port[:channel][,to=maxport][,ipv4=on|off][,ipv6=on|off]\n" \
+    "-multi-fd-incoming defer\n" \
+    "                wait for the URI to be specified via\n" \
+    "                multi_fd_migrate_incoming\n",
+    QEMU_ARCH_ALL)
+SRST
+``-multi-fd-incoming 
tcp:[host]:port[:channel][,to=maxport][,ipv4=on|off][,ipv6=on|off]``
+    Prepare for multi-fd incoming migration, with multi-fd listening sockets
+    on that connection. Default number of multi-fd channels is 2.
+
+``-multi-fd-incoming defer``
+    Wait for the URI to be specified via multi_fd_migrate\_incoming. The
+    monitor can be used to change settings (such as migration parameters)
+    prior to issuing the multi_fd_migrate\_incoming to allow the migration
+    to begin.
+ERST
+
 DEF("only-migratable", 0, QEMU_OPTION_only_migratable, \
     "-only-migratable     allow only migratable devices\n", QEMU_ARCH_ALL)
 SRST
diff --git a/softmmu/vl.c b/softmmu/vl.c
index aabd82e09a..07c33ded20 100644
--- a/softmmu/vl.c
+++ b/softmmu/vl.c
@@ -45,7 +45,7 @@
 #include "sysemu/seccomp.h"
 #include "sysemu/tcg.h"
 #include "sysemu/xen.h"
-
+#include "migration/migration.h"
 #include "qemu/error-report.h"
 #include "qemu/sockets.h"
 #include "qemu/accel.h"
@@ -160,6 +160,7 @@ typedef struct DeviceOption {
 static const char *cpu_option;
 static const char *mem_path;
 static const char *incoming;
+static const char *multi_fd_incoming;
 static const char *loadvm;
 static const char *accelerators;
 static bool have_custom_ram_size;
@@ -2312,6 +2313,11 @@ static void qemu_validate_options(const QDict 
*machine_opts)
         error_report("'preconfig' supports '-incoming defer' only");
         exit(EXIT_FAILURE);
     }
+    if (multi_fd_incoming && preconfig_requested &&
+        strcmp(multi_fd_incoming, "defer") != 0) {
+        error_report("'preconfig' supports '-multi-fd-incoming defer' only");
+        exit(EXIT_FAILURE);
+    }
 
 #ifdef CONFIG_CURSES
     if (is_daemonized() && dpy.type == DISPLAY_TYPE_CURSES) {
@@ -2600,7 +2606,7 @@ void qmp_x_exit_preconfig(Error **errp)
     if (incoming) {
         Error *local_err = NULL;
         if (strcmp(incoming, "defer") != 0) {
-            qmp_migrate_incoming(incoming, &local_err);
+            migrate_incoming(incoming, &local_err);
             if (local_err) {
                 error_reportf_err(local_err, "-incoming %s: ", incoming);
                 exit(1);
@@ -2609,6 +2615,20 @@ void qmp_x_exit_preconfig(Error **errp)
     } else if (autostart) {
         qmp_cont(NULL);
     }
+
+    if (multi_fd_incoming) {
+        Error *local_err = NULL;
+        if (strcmp(multi_fd_incoming, "defer") != 0) {
+            multi_fd_migrate_incoming(multi_fd_incoming, &local_err);
+            if (local_err) {
+                error_reportf_err(local_err, "-multi-fd-incoming %s: ",
+                                multi_fd_incoming);
+                exit(1);
+            }
+        }
+    } else if (autostart) {
+        qmp_cont(NULL);
+    }
 }
 
 void qemu_init(int argc, char **argv, char **envp)
@@ -3331,6 +3351,12 @@ void qemu_init(int argc, char **argv, char **envp)
                 }
                 incoming = optarg;
                 break;
+            case QEMU_OPTION_multi_fd_incoming:
+                if (!multi_fd_incoming) {
+                    runstate_set(RUN_STATE_INMIGRATE);
+                }
+                multi_fd_incoming = optarg;
+                break;
             case QEMU_OPTION_only_migratable:
                 only_migratable = 1;
                 break;
-- 
2.22.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]