qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 4/4] Adding support for multi-FD connections dynamically


From: manish.mishra
Subject: Re: [PATCH 4/4] Adding support for multi-FD connections dynamically
Date: Tue, 21 Jun 2022 21:42:20 +0530
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:91.0) Gecko/20100101 Thunderbird/91.10.0


On 17/06/22 12:17 am, Dr. David Alan Gilbert wrote:
* Het Gala (het.gala@nutanix.com) wrote:
i) Dynamically decide appropriate source and destination ip pairs for the
    corresponding multi-FD channel to be connected.

ii) Removed the support for setting the number of multi-fd channels from qmp
     commands. As now all multiFD parameters will be passed via qmp: migrate
     command or incoming flag itself.
We can't do that, because it's part of the API already; what you'll need
to do is check that the number of entries in your list corresponds to
the value set there and error if it's different.

Dave

thanks for review David. Yes, we will make sure in V2 that nothing existing 
breaks.

- Manish Mishra

Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
Signed-off-by: Het Gala <het.gala@nutanix.com>
---
  migration/migration.c | 15 ---------------
  migration/migration.h |  1 -
  migration/multifd.c   | 42 +++++++++++++++++++++---------------------
  migration/socket.c    | 42 +++++++++++++++++++++++++++++++++---------
  migration/socket.h    |  4 +++-
  monitor/hmp-cmds.c    |  4 ----
  qapi/migration.json   |  6 ------
  7 files changed, 57 insertions(+), 57 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 9b0ad732e7..57dd4494b4 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1585,9 +1585,6 @@ static void 
migrate_params_test_apply(MigrateSetParameters *params,
      if (params->has_block_incremental) {
          dest->block_incremental = params->block_incremental;
      }
-    if (params->has_multifd_channels) {
-        dest->multifd_channels = params->multifd_channels;
-    }
      if (params->has_multifd_compression) {
          dest->multifd_compression = params->multifd_compression;
      }
@@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters 
*params, Error **errp)
      if (params->has_block_incremental) {
          s->parameters.block_incremental = params->block_incremental;
      }
-    if (params->has_multifd_channels) {
-        s->parameters.multifd_channels = params->multifd_channels;
-    }
      if (params->has_multifd_compression) {
          s->parameters.multifd_compression = params->multifd_compression;
      }
@@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void)
          MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER];
  }
-int migrate_multifd_channels(void)
-{
-    MigrationState *s;
-
-    s = migrate_get_current();
-
-    return s->parameters.multifd_channels;
-}
-
  MultiFDCompression migrate_multifd_compression(void)
  {
      MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index fa8717ec9e..9464de8ef7 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -372,7 +372,6 @@ bool migrate_validate_uuid(void);
  bool migrate_auto_converge(void);
  bool migrate_use_multifd(void);
  bool migrate_pause_before_switchover(void);
-int migrate_multifd_channels(void);
  MultiFDCompression migrate_multifd_compression(void);
  int migrate_multifd_zlib_level(void);
  int migrate_multifd_zstd_level(void);
diff --git a/migration/multifd.c b/migration/multifd.c
index 9282ab6aa4..ce017436fb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error 
**errp)
          return -1;
      }
- if (msg.id > migrate_multifd_channels()) {
+    if (msg.id > total_multifd_channels()) {
          error_setg(errp, "multifd: received channel version %u "
                     "expected %u", msg.version, MULTIFD_VERSION);
          return -1;
@@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f)
       * using more channels, so ensure it doesn't overflow if the
       * limit is lower now.
       */
-    next_channel %= migrate_multifd_channels();
-    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+    next_channel %= total_multifd_channels();
+    for (i = next_channel;; i = (i + 1) % total_multifd_channels()) {
          p = &multifd_send_state->params[i];
qemu_mutex_lock(&p->mutex);
@@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f)
          }
          if (!p->pending_job) {
              p->pending_job++;
-            next_channel = (i + 1) % migrate_multifd_channels();
+            next_channel = (i + 1) % total_multifd_channels();
              break;
          }
          qemu_mutex_unlock(&p->mutex);
@@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err)
          return;
      }
- for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDSendParams *p = &multifd_send_state->params[i];
qemu_mutex_lock(&p->mutex);
@@ -521,14 +521,14 @@ void multifd_save_cleanup(void)
          return;
      }
      multifd_send_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDSendParams *p = &multifd_send_state->params[i];
if (p->running) {
              qemu_thread_join(&p->thread);
          }
      }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDSendParams *p = &multifd_send_state->params[i];
          Error *local_err = NULL;
@@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) flush_zero_copy = migrate_use_zero_copy_send(); - for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDSendParams *p = &multifd_send_state->params[i];
trace_multifd_send_sync_main_signal(p->id);
@@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f)
              }
          }
      }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDSendParams *p = &multifd_send_state->params[i];
trace_multifd_send_sync_main_wait(p->id);
@@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp)
      int thread_count;
      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
      uint8_t i;
-
+    int idx;
      if (!migrate_use_multifd()) {
          return 0;
      }
@@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp)
          return -1;
      }
- thread_count = migrate_multifd_channels();
+    thread_count = total_multifd_channels();
      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
      multifd_send_state->pages = multifd_pages_init(page_count);
@@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp)
          } else {
              p->write_flags = 0;
          }
-
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        idx = multifd_index(i);
+        socket_send_channel_create(multifd_new_send_channel_async, p, idx);
      }
for (i = 0; i < thread_count; i++) {
@@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err)
          }
      }
- for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDRecvParams *p = &multifd_recv_state->params[i];
qemu_mutex_lock(&p->mutex);
@@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp)
          return 0;
      }
      multifd_recv_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDRecvParams *p = &multifd_recv_state->params[i];
if (p->running) {
@@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp)
              qemu_thread_join(&p->thread);
          }
      }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDRecvParams *p = &multifd_recv_state->params[i];
migration_ioc_unregister_yank(p->c);
@@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void)
      if (!migrate_use_multifd()) {
          return;
      }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDRecvParams *p = &multifd_recv_state->params[i];
trace_multifd_recv_sync_main_wait(p->id);
          qemu_sem_wait(&multifd_recv_state->sem_sync);
      }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
          MultiFDRecvParams *p = &multifd_recv_state->params[i];
WITH_QEMU_LOCK_GUARD(&p->mutex) {
@@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp)
          error_setg(errp, "multifd is not supported by current protocol");
          return -1;
      }
-    thread_count = migrate_multifd_channels();
+    thread_count = total_multifd_channels();
      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
      qatomic_set(&multifd_recv_state->count, 0);
@@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp)
bool multifd_recv_all_channels_created(void)
  {
-    int thread_count = migrate_multifd_channels();
+    int thread_count = total_multifd_channels();
if (!migrate_use_multifd()) {
          return true;
@@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
                         QEMU_THREAD_JOINABLE);
      qatomic_inc(&multifd_recv_state->count);
      return qatomic_read(&multifd_recv_state->count) ==
-           migrate_multifd_channels();
+           total_multifd_channels();
  }
diff --git a/migration/socket.c b/migration/socket.c
index d0cb7cc6a6..c0ac6dbbe2 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -28,9 +28,6 @@
  #include "trace.h"
-struct SocketOutgoingArgs {
-    SocketAddress *saddr;
-} outgoing_args;
struct SocketArgs {
      struct SrcDestAddr data;
@@ -43,20 +40,47 @@ struct OutgoingMigrateParams {
      uint64_t total_multifd_channel;
  } outgoing_migrate_params;
-void socket_send_channel_create(QIOTaskFunc f, void *data)
+
+int total_multifd_channels(void)
+{
+    return outgoing_migrate_params.total_multifd_channel;
+}
+
+int multifd_index(int i)
+{
+    int length = outgoing_migrate_params.length;
+    int j = 0;
+    int runn_sum = 0;
+    while (j < length) {
+        runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels;
+        if (i >= runn_sum) {
+            j++;
+        } else {
+            break;
+        }
+    }
+    return j;
+}
+
+void socket_send_channel_create(QIOTaskFunc f, void *data, int idx)
  {
      QIOChannelSocket *sioc = qio_channel_socket_new();
-    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
-                                     f, data, NULL, NULL, NULL);
+    qio_channel_socket_connect_async(sioc,
+                       outgoing_migrate_params.socket_args[idx].data.dst_addr,
+                       f, data, NULL, NULL,
+                       outgoing_migrate_params.socket_args[idx].data.src_addr);
  }
int socket_send_channel_destroy(QIOChannel *send)
  {
      /* Remove channel */
      object_unref(OBJECT(send));
-    if (outgoing_args.saddr) {
-        qapi_free_SocketAddress(outgoing_args.saddr);
-        outgoing_args.saddr = NULL;
+    if (outgoing_migrate_params.socket_args != NULL) {
+        g_free(outgoing_migrate_params.socket_args);
+        outgoing_migrate_params.socket_args = NULL;
+    }
+    if (outgoing_migrate_params.length) {
+        outgoing_migrate_params.length = 0;
      }
if (outgoing_migrate_params.socket_args != NULL) {
diff --git a/migration/socket.h b/migration/socket.h
index b9e3699167..c8b9252384 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -27,7 +27,9 @@ struct SrcDestAddr {
      SocketAddress *src_addr;
  };
-void socket_send_channel_create(QIOTaskFunc f, void *data);
+int total_multifd_channels(void);
+int multifd_index(int i);
+void socket_send_channel_create(QIOTaskFunc f, void *data, int idx);
  int socket_send_channel_destroy(QIOChannel *send);
void socket_start_incoming_migration(const char *str, uint8_t number,
diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c
index 32a6b67d5f..9a3d76d6ba 100644
--- a/monitor/hmp-cmds.c
+++ b/monitor/hmp-cmds.c
@@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict 
*qdict)
          p->has_block_incremental = true;
          visit_type_bool(v, param, &p->block_incremental, &err);
          break;
-    case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
-        p->has_multifd_channels = true;
-        visit_type_uint8(v, param, &p->multifd_channels, &err);
-        break;
      case MIGRATION_PARAMETER_MULTIFD_COMPRESSION:
          p->has_multifd_compression = true;
          visit_type_MultiFDCompression(v, param, &p->multifd_compression,
diff --git a/qapi/migration.json b/qapi/migration.json
index 62a7b22d19..1b1c6d01d3 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -877,11 +877,6 @@
  #                     migrated and the destination must already have access 
to the
  #                     same backing chain as was used on the source.  (since 
2.10)
  #
-# @multifd-channels: Number of channels used to migrate data in
-#                    parallel. This is the same number that the
-#                    number of sockets used for migration.  The
-#                    default value is 2 (since 4.0)
-#
  # @xbzrle-cache-size: cache size to be used by XBZRLE migration.  It
  #                     needs to be a multiple of the target page size
  #                     and a power of 2
@@ -965,7 +960,6 @@
              '*x-checkpoint-delay': { 'type': 'uint32',
                                       'features': [ 'unstable' ] },
              '*block-incremental': 'bool',
-            '*multifd-channels': 'uint8',
              '*xbzrle-cache-size': 'size',
              '*max-postcopy-bandwidth': 'size',
              '*max-cpu-throttle': 'uint8',
--
2.22.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]