qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH RFC 03/14] migration/rdma: Create multiFd migration threads


From: Zhimin Feng
Subject: [PATCH RFC 03/14] migration/rdma: Create multiFd migration threads
Date: Thu, 13 Feb 2020 17:37:44 +0800

Creation of the multifd send threads for RDMA migration,
nothing inside yet.

Signed-off-by: Zhimin Feng <address@hidden>
---
 migration/multifd.c   | 33 +++++++++++++---
 migration/multifd.h   |  2 +
 migration/qemu-file.c |  5 +++
 migration/qemu-file.h |  1 +
 migration/rdma.c      | 88 ++++++++++++++++++++++++++++++++++++++++++-
 migration/rdma.h      |  3 ++
 6 files changed, 125 insertions(+), 7 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index b3e8ae9bcc..63678d7fdd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_use_multifd() || migrate_use_rdma()) {
         return;
     }
     if (multifd_send_state->pages->used) {
@@ -562,6 +562,20 @@ out:
     return NULL;
 }
 
+static void rdma_send_channel_create(MultiFDSendParams *p)
+{
+    Error *local_err = NULL;
+
+    if (p->quit) {
+        error_setg(&local_err, "multifd: send id %d already quit", p->id);
+        return ;
+    }
+    p->running = true;
+
+    qemu_thread_create(&p->thread, p->name, multifd_rdma_send_thread, p,
+                       QEMU_THREAD_JOINABLE);
+}
+
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
         p->name = g_strdup_printf("multifdsend_%d", i);
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        if (!migrate_use_rdma()) {
+            socket_send_channel_create(multifd_new_send_channel_async, p);
+        } else {
+            rdma_send_channel_create(p);
+        }
     }
     return 0;
 }
@@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_use_multifd() || migrate_use_rdma()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -890,8 +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
     p->num_packets = 1;
 
     p->running = true;
-    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                       QEMU_THREAD_JOINABLE);
+    if (!migrate_use_rdma()) {
+        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                           QEMU_THREAD_JOINABLE);
+    } else {
+        qemu_thread_create(&p->thread, p->name, multifd_rdma_recv_thread, p,
+                           QEMU_THREAD_JOINABLE);
+    }
     atomic_inc(&multifd_recv_state->count);
     return atomic_read(&multifd_recv_state->count) ==
            migrate_multifd_channels();
diff --git a/migration/multifd.h b/migration/multifd.h
index d8b0205977..c9c11ad140 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+#include "migration/rdma.h"
+
 int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
 int multifd_load_setup(Error **errp);
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1c3a358a14..f0ed8f1381 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
     f->iovcnt = 0;
 }
 
+void *getQIOChannel(QEMUFile *f)
+{
+    return f->opaque;
+}
+
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
 {
     int ret = 0;
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6ccb7..fc656a3b72 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -161,6 +161,7 @@ int qemu_file_shutdown(QEMUFile *f);
 QEMUFile *qemu_file_get_return_path(QEMUFile *f);
 void qemu_fflush(QEMUFile *f);
 void qemu_file_set_blocking(QEMUFile *f, bool block);
+void *getQIOChannel(QEMUFile *f);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration/rdma.c b/migration/rdma.c
index 2379b8345b..f086ab5a82 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -34,6 +34,7 @@
 #include <arpa/inet.h>
 #include <rdma/rdma_cma.h>
 #include "trace.h"
+#include "multifd.h"
 
 /*
  * Print and error on both the Monitor and the Log file.
@@ -3975,6 +3976,34 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, 
const char *mode)
     return rioc->file;
 }
 
+static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
+    QIOChannel *ioc = NULL;
+    bool start_migration;
+
+    if (!mis->from_src_file) {
+        mis->from_src_file = f;
+        qemu_file_set_blocking(f, false);
+
+        start_migration = migrate_use_multifd();
+    } else {
+        ioc = QIO_CHANNEL(getQIOChannel(f));
+        /* Multiple connections */
+        assert(migrate_use_multifd());
+        start_migration = multifd_recv_new_channel(ioc, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+
+    if (start_migration) {
+        migration_incoming_process();
+    }
+}
+
 static void rdma_accept_incoming_migration(void *opaque)
 {
     RDMAContext *rdma = opaque;
@@ -4003,8 +4032,12 @@ static void rdma_accept_incoming_migration(void *opaque)
         return;
     }
 
-    rdma->migration_started_on_destination = 1;
-    migration_fd_process_incoming(f, errp);
+    if (migrate_use_multifd()) {
+        migration_rdma_process_incoming(f, errp);
+    } else {
+        rdma->migration_started_on_destination = 1;
+        migration_fd_process_incoming(f, errp);
+    }
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -4048,6 +4081,15 @@ void rdma_start_incoming_migration(const char 
*host_port, Error **errp)
         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
     }
 
+    if (multifd_load_setup(&local_err) != 0) {
+        /*
+         * We haven't been able to create multifd threads
+         * nothing better to do
+         */
+        error_report_err(local_err);
+        goto err;
+    }
+
     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                         NULL, (void *)(intptr_t)rdma);
     return;
@@ -4118,3 +4160,45 @@ err:
     g_free(rdma);
     g_free(rdma_return_path);
 }
+
+void *multifd_rdma_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem_sync);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
+void *multifd_rdma_send_thread(void *opaque)
+{
+    MultiFDSendParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
diff --git a/migration/rdma.h b/migration/rdma.h
index de2ba09dc5..3a00573083 100644
--- a/migration/rdma.h
+++ b/migration/rdma.h
@@ -17,6 +17,9 @@
 #ifndef QEMU_MIGRATION_RDMA_H
 #define QEMU_MIGRATION_RDMA_H
 
+void *multifd_rdma_recv_thread(void *opaque);
+void *multifd_rdma_send_thread(void *opaque);
+
 void rdma_start_outgoing_migration(void *opaque, const char *host_port,
                                    Error **errp);
 
-- 
2.19.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]