qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [v3 08/13] migration: Add the core code of multi-thread com


From: Liang Li
Subject: [Qemu-devel] [v3 08/13] migration: Add the core code of multi-thread compresion
Date: Fri, 12 Dec 2014 09:29:01 +0800

At this point, multiple thread compression can't co-work with xbzrle.

Signed-off-by: Liang Li <address@hidden>
Signed-off-by: Yang Zhang <address@hidden>
---
 arch_init.c | 164 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 157 insertions(+), 7 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 0a575ed..4109ad7 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -369,23 +369,43 @@ static QemuMutex *mutex;
 static QemuCond *cond;
 static QEMUFileOps *empty_ops;
 static bool quit_thread;
+static int one_byte_count;
 static decompress_param *decomp_param;
 static QemuThread *decompress_threads;
 
+static int do_compress_ram_page(compress_param *param);
+
 static void *do_data_compress(void *opaque)
 {
+    compress_param *param = opaque;
     while (!quit_thread) {
-
-    /* To be done */
-
+        qemu_mutex_lock(&param->mutex);
+        while (param->state != START) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            if (quit_thread) {
+                break;
+            }
+            do_compress_ram_page(param);
+            qemu_mutex_lock(mutex);
+            param->state = DONE;
+            qemu_cond_signal(cond);
+            qemu_mutex_unlock(mutex);
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
+
     return NULL;
 }
 
 static inline void terminate_compression_threads(void)
 {
+    int idx, thread_count;
+
+    thread_count = migrate_compress_threads();
     quit_thread = true;
-    /* To be done */
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_cond_signal(&comp_param[idx].cond);
+    }
 }
 
 void migrate_compress_threads_join(MigrationState *s)
@@ -770,13 +790,142 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
ram_addr_t offset,
     return bytes_sent;
 }
 
+static int do_compress_ram_page(compress_param *param)
+{
+    int bytes_sent;
+    int blen = COMPRESS_BUF_SIZE;
+    int cont;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    p = memory_region_get_ram_ptr(block->mr) + offset;
+
+    bytes_sent = save_block_hdr(param->file, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = migrate_qemu_add_compression_data(param->file, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(compress_param *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->state = START;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].state != DONE) {
+            qemu_mutex_lock(mutex);
+            while (comp_param[idx].state != DONE) {
+                qemu_cond_wait(cond, mutex);
+            }
+            qemu_mutex_unlock(mutex);
+        }
+        len = migrate_qemu_flush(f, comp_param[idx].file);
+        bytes_transferred += len;
+    }
+    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
+        bytes_transferred -= one_byte_count;
+        one_byte_count = 0;
+    }
+}
+
+static inline void set_compress_params(compress_param *param,
+    RAMBlock *block, ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+
+static int compress_page_with_multi_thread(QEMUFile *f,
+        RAMBlock *block, ram_addr_t offset)
+{
+    int idx, thread_count, bytes_sent = 0;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(mutex);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (comp_param[idx].state == DONE) {
+                bytes_sent = migrate_qemu_flush(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx],
+                        block, offset);
+                start_compression(&comp_param[idx]);
+                if (bytes_sent == 0) {
+                    /* set bytes_sent to 1 in this case to prevent migration
+                     * from terminating, this 1 byte whill be added to
+                     * bytes_transferred later, minus 1 to keep the
+                     * bytes_transferred accurate */
+                    bytes_sent = 1;
+                    if (bytes_transferred <= 0) {
+                        one_byte_count++;
+                    } else {
+                        bytes_transferred -= 1;
+                    }
+                }
+                break;
+            }
+        }
+        if (bytes_sent > 0) {
+            break;
+        } else {
+            qemu_cond_wait(cond, mutex);
+        }
+    }
+    qemu_mutex_unlock(mutex);
+    return bytes_sent;
+}
+
 static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block,
         ram_addr_t offset, bool last_stage)
 {
     int bytes_sent = 0;
 
-    /* To be done*/
-
+    /* When starting the process of a new block, the first page of
+     * the block should be sent out before other pages in the same
+     * block, and all the pages in last block should have been sent
+     * out, keeping this order is important.
+     */
+    if (block != last_sent_block) {
+        flush_compressed_data(f);
+        bytes_sent = save_zero_and_xbzrle_page(f, block, offset,
+                last_stage, NULL);
+        if (bytes_sent == -1) {
+            set_compress_params(&comp_param[0], block, offset);
+            /* Use the qemu thread to compress the data to make sure the
+             * first page is sent out before other pages
+             */
+            bytes_sent = do_compress_ram_page(&comp_param[0]);
+            if (bytes_sent > 0) {
+                migrate_qemu_flush(f, comp_param[0].file);
+            }
+        }
+    } else {
+        bytes_sent = save_zero_and_xbzrle_page(f, block, offset,
+                last_stage, NULL);
+        if (bytes_sent == -1) {
+            bytes_sent = compress_page_with_multi_thread(f, block, offset);
+        }
+    }
     return bytes_sent;
 }
 
@@ -834,7 +983,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
 
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
@@ -1043,6 +1191,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -1089,6 +1238,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-- 
1.8.3.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]