qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PULL 08/16] migration: Synchronize multifd threads with ma


From: Juan Quintela
Subject: [Qemu-devel] [PULL 08/16] migration: Synchronize multifd threads with main thread
Date: Wed, 27 Jun 2018 14:55:56 +0200

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <address@hidden>
Reviewed-by: Dr. David Alan Gilbert <address@hidden>

--
seq needs to be atomic now, will also be accessed from main thread.
Fix the if (true || ...) leftover
We are back to non-atomics
---
 migration/ram.c        | 147 ++++++++++++++++++++++++++++++++---------
 migration/trace-events |   6 ++
 2 files changed, 122 insertions(+), 31 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 76410f9de8..77c66a4391 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -510,6 +510,8 @@ exit:
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -577,6 +579,8 @@ typedef struct {
     uint64_t num_packets;
     /* pages sent through this channel */
     uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -614,6 +618,8 @@ typedef struct {
     uint64_t num_packets;
     /* pages sent through this channel */
     uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -801,6 +807,10 @@ struct {
     int count;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -848,6 +858,7 @@ int multifd_save_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -856,6 +867,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     multifd_pages_clear(multifd_send_state->pages);
@@ -865,6 +877,33 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_send_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_signal(p->id);
+
+        qemu_mutex_lock(&p->mutex);
+        p->flags |= MULTIFD_FLAG_SYNC;
+        p->pending_job++;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_send_state->sem_sync);
+    }
+    trace_multifd_send_sync_main(multifd_send_state->packet_num);
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -901,15 +940,17 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
-            continue;
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_send_state->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_send_thread: Unknown command");
-        break;
     }
 
 out:
@@ -961,12 +1002,14 @@ int multifd_save_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
     multifd_send_state->pages = multifd_pages_init(page_count);
+    qemu_sem_init(&multifd_send_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
@@ -984,6 +1027,10 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -1029,6 +1076,7 @@ int multifd_load_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -1037,6 +1085,7 @@ int multifd_load_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
@@ -1045,6 +1094,42 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_mutex_lock(&p->mutex);
+        p->pending_job = true;
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_recv_state->sem_sync);
+        qemu_mutex_lock(&p->mutex);
+        if (multifd_recv_state->packet_num < p->packet_num) {
+            multifd_recv_state->packet_num = p->packet_num;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+
+        qemu_sem_post(&p->sem_sync);
+    }
+    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
@@ -1054,37 +1139,30 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
-        qemu_sem_wait(&p->sem);
+        uint32_t used;
+        uint32_t flags;
+
+        /* ToDo: recv packet here */
+
         qemu_mutex_lock(&p->mutex);
-        if (p->pending_job) {
-            uint32_t used;
-            uint32_t flags;
-            qemu_mutex_unlock(&p->mutex);
-
-            /* ToDo: recv packet here */
-
-            qemu_mutex_lock(&p->mutex);
-            ret = multifd_recv_unfill_packet(p, &local_err);
-            if (ret) {
-                qemu_mutex_unlock(&p->mutex);
-                break;
-            }
-
-            used = p->pages->used;
-            flags = p->flags;
-            trace_multifd_recv(p->id, p->packet_num, used, flags);
-            p->pending_job = false;
-            p->num_packets++;
-            p->num_pages += used;
-            qemu_mutex_unlock(&p->mutex);
-        } else if (p->quit) {
+        ret = multifd_recv_unfill_packet(p, &local_err);
+        if (ret) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+
+        used = p->pages->used;
+        flags = p->flags;
+        trace_multifd_recv(p->id, p->packet_num, used, flags);
+        p->pending_job = false;
+        p->num_packets++;
+        p->num_pages += used;
         qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_recv_thread: Unknown command");
-        break;
+
+        if (flags & MULTIFD_FLAG_SYNC) {
+            qemu_sem_post(&multifd_recv_state->sem_sync);
+            qemu_sem_wait(&p->sem_sync);
+        }
     }
 
     if (local_err) {
@@ -1112,12 +1190,14 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = false;
         p->id = i;
@@ -2875,6 +2955,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -2955,6 +3036,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
      */
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
+    multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     ram_counters.transferred += 8;
@@ -3008,6 +3090,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     rcu_read_unlock();
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -3497,6 +3580,7 @@ static int ram_load_postcopy(QEMUFile *f)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             error_report("Unknown combination of migration flags: %#x"
@@ -3685,6 +3769,7 @@ static int ram_load(QEMUFile *f, void *opaque, int 
version_id)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 4aad26feed..8b9edfbfef 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
 migration_throttle(void) ""
 multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) 
"channel %d packet number %" PRIu64 " pages %d flags 0x%x"
+multifd_recv_sync_main(long packet_num) "packet num %ld"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel 
%d packets %" PRIu64 " pages %" PRIu64
 multifd_recv_thread_start(uint8_t id) "%d"
 multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) 
"channel %d packet_num %" PRIu64 " pages %d flags 0x%x"
+multifd_send_sync_main(long packet_num) "packet num %ld"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
 multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel 
%d packets %" PRIu64 " pages %"  PRIu64
 multifd_send_thread_start(uint8_t id) "%d"
 ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: 
%" PRIx64 " %zx"
-- 
2.17.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]