[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH RFC 04/12] migration/rdma: Create multiRDMA migration threads
From: |
Zhimin Feng |
Subject: |
[PATCH RFC 04/12] migration/rdma: Create multiRDMA migration threads |
Date: |
Thu, 9 Jan 2020 12:59:14 +0800 |
From: fengzhimin <address@hidden>
Creation of the RDMA threads, nothing inside yet.
Signed-off-by: fengzhimin <address@hidden>
---
migration/migration.c | 1 +
migration/migration.h | 2 +
migration/rdma.c | 283 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 286 insertions(+)
diff --git a/migration/migration.c b/migration/migration.c
index 5756a4806e..f8d4eb657e 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1546,6 +1546,7 @@ static void migrate_fd_cleanup(MigrationState *s)
qemu_mutex_lock_iothread();
multifd_save_cleanup();
+ multiRDMA_save_cleanup();
qemu_mutex_lock(&s->qemu_file_lock);
tmp = s->to_dst_file;
s->to_dst_file = NULL;
diff --git a/migration/migration.h b/migration/migration.h
index 4192c22d8c..d69a3fe4e9 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -272,6 +272,8 @@ void migration_incoming_process(void);
bool migration_has_all_channels(void);
int migrate_multiRDMA_channels(void);
+int multiRDMA_save_cleanup(void);
+int multiRDMA_load_cleanup(void);
uint64_t migrate_max_downtime(void);
diff --git a/migration/rdma.c b/migration/rdma.c
index e241dcb992..992e5abfed 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -395,6 +395,58 @@ typedef struct RDMAContext {
bool is_return_path;
} RDMAContext;
+typedef struct {
+ /* this fields are not changed once the thread is created */
+ /* channel number */
+ uint8_t id;
+ /* channel thread name */
+ char *name;
+ /* channel thread id */
+ QemuThread thread;
+ /* sem where to wait for more work */
+ QemuSemaphore sem;
+ /* this mutex protects the following parameters */
+ QemuMutex mutex;
+ /* is this channel thread running */
+ bool running;
+ /* should this thread finish */
+ bool quit;
+} MultiRDMASendParams;
+
+struct {
+ MultiRDMASendParams *params;
+ /* number of created threads */
+ int count;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+} *multiRDMA_send_state;
+
+typedef struct {
+ /* this fields are not changed once the thread is created */
+ /* channel number */
+ uint8_t id;
+ /* channel thread name */
+ char *name;
+ /* channel thread id */
+ QemuThread thread;
+ /* sem where to wait for more work */
+ QemuSemaphore sem;
+ /* this mutex protects the following parameters */
+ QemuMutex mutex;
+ /* is this channel thread running */
+ bool running;
+ /* should this thread finish */
+ bool quit;
+} MultiRDMARecvParams;
+
+struct {
+ MultiRDMARecvParams *params;
+ /* number of created threads */
+ int count;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+} *multiRDMA_recv_state;
+
#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
#define QIO_CHANNEL_RDMA(obj) \
OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
@@ -3018,6 +3070,7 @@ static void qio_channel_rdma_close_rcu(struct
rdma_close_rcu *rcu)
if (rcu->rdmaout) {
qemu_rdma_cleanup(rcu->rdmaout);
}
+ multiRDMA_load_cleanup();
g_free(rcu->rdmain);
g_free(rcu->rdmaout);
@@ -3919,6 +3972,7 @@ static void qio_channel_rdma_finalize(Object *obj)
g_free(rioc->rdmaout);
rioc->rdmaout = NULL;
}
+ multiRDMA_load_cleanup();
}
static void qio_channel_rdma_class_init(ObjectClass *klass,
@@ -4007,6 +4061,59 @@ static void rdma_accept_incoming_migration(void *opaque)
migration_fd_process_incoming(f);
}
+static void *multiRDMA_recv_thread(void *opaque)
+{
+ MultiRDMARecvParams *p = opaque;
+
+ while (true) {
+ qemu_mutex_lock(&p->mutex);
+ if (p->quit) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_wait(&p->sem);
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->running = false;
+ qemu_mutex_unlock(&p->mutex);
+
+ return NULL;
+}
+
+static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma,
+ Error **errp)
+{
+ uint8_t i;
+ int thread_count;
+
+ thread_count = migrate_multiRDMA_channels();
+ if (multiRDMA_recv_state == NULL) {
+ multiRDMA_recv_state = g_malloc0(sizeof(*multiRDMA_recv_state));
+ multiRDMA_recv_state->params = g_new0(MultiRDMARecvParams,
+ thread_count);
+ atomic_set(&multiRDMA_recv_state->count, 0);
+ qemu_sem_init(&multiRDMA_recv_state->sem_sync, 0);
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+ qemu_mutex_init(&p->mutex);
+ qemu_sem_init(&p->sem, 0);
+ p->quit = false;
+ p->id = i;
+ p->running = true;
+ p->name = g_strdup_printf("multiRDMARecv_%d", i);
+ qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread,
+ p, QEMU_THREAD_JOINABLE);
+ atomic_inc(&multiRDMA_recv_state->count);
+ }
+ }
+
+ return 0;
+}
+
void rdma_start_incoming_migration(const char *host_port, Error **errp)
{
int ret;
@@ -4048,6 +4155,13 @@ void rdma_start_incoming_migration(const char
*host_port, Error **errp)
qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
}
+ if (migrate_use_multiRDMA()) {
+ if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) {
+ ERROR(errp, "init multiRDMA failure!");
+ goto err;
+ }
+ }
+
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
@@ -4055,6 +4169,167 @@ err:
error_propagate(errp, local_err);
g_free(rdma);
g_free(rdma_return_path);
+ multiRDMA_load_cleanup();
+}
+
+static void *multiRDMA_send_thread(void *opaque)
+{
+ MultiRDMASendParams *p = opaque;
+
+ while (true) {
+ qemu_mutex_lock(&p->mutex);
+ if (p->quit) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_wait(&p->sem);
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->running = false;
+ qemu_mutex_unlock(&p->mutex);
+
+ return NULL;
+}
+
+static int multiRDMA_save_setup(const char *host_port, Error **errp)
+{
+ int thread_count;
+ uint8_t i;
+
+ thread_count = migrate_multiRDMA_channels();
+ multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state));
+ multiRDMA_send_state->params = g_new0(MultiRDMASendParams,
+ thread_count);
+ atomic_set(&multiRDMA_send_state->count, 0);
+ qemu_sem_init(&multiRDMA_send_state->sem_sync, 0);
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+ qemu_mutex_init(&p->mutex);
+ qemu_sem_init(&p->sem, 0);
+ p->quit = false;
+ p->id = i;
+ p->running = true;
+ p->name = g_strdup_printf("multiRDMASend_%d", i);
+
+ qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p,
+ QEMU_THREAD_JOINABLE);
+ atomic_inc(&multiRDMA_send_state->count);
+ }
+
+ return 0;
+}
+
+static void multiRDMA_send_terminate_threads(void)
+{
+ int i;
+ int thread_count = migrate_multiRDMA_channels();
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+
+ qemu_mutex_lock(&p->mutex);
+ p->quit = true;
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem);
+ }
+}
+
+int multiRDMA_save_cleanup(void)
+{
+ int i;
+ int ret = 0;
+ int thread_count = migrate_multiRDMA_channels();
+
+ if (!migrate_use_multiRDMA()) {
+ return 0;
+ }
+
+ /* prevent double free */
+ if (multiRDMA_send_state == NULL) {
+ return 0;
+ }
+
+ /* notify multi RDMA threads to exit */
+ multiRDMA_send_terminate_threads();
+
+ /* wait for multi RDMA send threads to be exit */
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+
+ qemu_thread_join(&p->thread);
+ }
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem);
+ g_free(p->name);
+ p->name = NULL;
+ }
+
+ qemu_sem_destroy(&multiRDMA_send_state->sem_sync);
+ g_free(multiRDMA_send_state);
+ multiRDMA_send_state = NULL;
+
+ return ret;
+}
+
+static void multiRDMA_recv_terminate_threads(void)
+{
+ int i;
+ int thread_count = migrate_multiRDMA_channels();
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+ qemu_mutex_lock(&p->mutex);
+ p->quit = true;
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem);
+ }
+}
+
+int multiRDMA_load_cleanup(void)
+{
+ int i;
+ int ret = 0;
+ int thread_count = migrate_multiRDMA_channels();
+
+ if (!migrate_use_multiRDMA()) {
+ return 0;
+ }
+
+ /* prevent double free */
+ if (multiRDMA_recv_state == NULL) {
+ return 0;
+ }
+
+ /* notify multi RDMA recv threads to exit */
+ multiRDMA_recv_terminate_threads();
+
+ /* wait for multi RDMA threads to be exit */
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+ qemu_thread_join(&p->thread);
+ }
+
+ for (i = 0; i < thread_count; i++) {
+ MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem);
+ g_free(p->name);
+ p->name = NULL;
+ }
+
+ qemu_sem_destroy(&multiRDMA_recv_state->sem_sync);
+ g_free(multiRDMA_recv_state);
+ multiRDMA_recv_state = NULL;
+
+ return ret;
}
void rdma_start_outgoing_migration(void *opaque,
@@ -4111,10 +4386,18 @@ void rdma_start_outgoing_migration(void *opaque,
trace_rdma_start_outgoing_migration_after_rdma_connect();
+ if (migrate_use_multiRDMA()) {
+ if (multiRDMA_save_setup(host_port, errp) != 0) {
+ ERROR(errp, "init multiRDMA channels failure!");
+ goto err;
+ }
+ }
+
s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
migrate_fd_connect(s, NULL);
return;
err:
g_free(rdma);
g_free(rdma_return_path);
+ multiRDMA_save_cleanup();
}
--
2.19.1
- [PATCH RFC 00/12] *** mulitple RDMA channels for migration ***, Zhimin Feng, 2020/01/09
- [PATCH RFC 08/12] migration/rdma: register memory for multiRDMA channels, Zhimin Feng, 2020/01/09
- [PATCH RFC 03/12] migration: Create the multi-rdma-channels parameter, Zhimin Feng, 2020/01/09
- [PATCH RFC 09/12] migration/rdma: Wait for all multiRDMA to complete registration, Zhimin Feng, 2020/01/09
- [PATCH RFC 04/12] migration/rdma: Create multiRDMA migration threads,
Zhimin Feng <=
- [PATCH RFC 06/12] migration/rdma: Transmit initial package, Zhimin Feng, 2020/01/09
- [PATCH RFC 11/12] migration/rdma: use multiRDMA to send RAM block for NOT rdma-pin-all mode, Zhimin Feng, 2020/01/09
- [PATCH RFC 01/12] migration: Add multiRDMA capability support, Zhimin Feng, 2020/01/09