[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PULL 18/21] ram.c: Move core decompression code into its own file
From: |
Juan Quintela |
Subject: |
[PULL 18/21] ram.c: Move core decompression code into its own file |
Date: |
Fri, 28 Apr 2023 21:12:00 +0200 |
From: Lukas Straub <lukasstraub2@web.de>
No functional changes intended.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Philippe Mathieu-Daudé <philmd@linaro.org>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/ram-compress.c | 203 ++++++++++++++++++++++++++++++++++++++
migration/ram-compress.h | 5 +
migration/ram.c | 204 ---------------------------------------
3 files changed, 208 insertions(+), 204 deletions(-)
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index d9bc67d075..c25562f12d 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -48,6 +48,24 @@ static QemuThread *compress_threads;
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
+struct DecompressParam {
+ bool done;
+ bool quit;
+ QemuMutex mutex;
+ QemuCond cond;
+ void *des;
+ uint8_t *compbuf;
+ int len;
+ z_stream stream;
+};
+typedef struct DecompressParam DecompressParam;
+
+static QEMUFile *decomp_file;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
+
static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf);
@@ -272,3 +290,188 @@ retry:
return pages;
}
+
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
+static void *do_data_decompress(void *opaque)
+{
+ DecompressParam *param = opaque;
+ unsigned long pagesize;
+ uint8_t *des;
+ int len, ret;
+
+ qemu_mutex_lock(¶m->mutex);
+ while (!param->quit) {
+ if (param->des) {
+ des = param->des;
+ len = param->len;
+ param->des = 0;
+ qemu_mutex_unlock(¶m->mutex);
+
+ pagesize = TARGET_PAGE_SIZE;
+
+ ret = qemu_uncompress_data(¶m->stream, des, pagesize,
+ param->compbuf, len);
+ if (ret < 0 && migrate_get_current()->decompress_error_check) {
+ error_report("decompress data failed");
+ qemu_file_set_error(decomp_file, ret);
+ }
+
+ qemu_mutex_lock(&decomp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&decomp_done_cond);
+ qemu_mutex_unlock(&decomp_done_lock);
+
+ qemu_mutex_lock(¶m->mutex);
+ } else {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ }
+ qemu_mutex_unlock(¶m->mutex);
+
+ return NULL;
+}
+
+int wait_for_decompress_done(void)
+{
+ int idx, thread_count;
+
+ if (!migrate_compress()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
+ for (idx = 0; idx < thread_count; idx++) {
+ while (!decomp_param[idx].done) {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(&decomp_done_lock);
+ return qemu_file_get_error(decomp_file);
+}
+
+void compress_threads_load_cleanup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_compress()) {
+ return;
+ }
+ thread_count = migrate_decompress_threads();
+ for (i = 0; i < thread_count; i++) {
+ /*
+ * we use it as a indicator which shows if the thread is
+ * properly init'd or not
+ */
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_mutex_lock(&decomp_param[i].mutex);
+ decomp_param[i].quit = true;
+ qemu_cond_signal(&decomp_param[i].cond);
+ qemu_mutex_unlock(&decomp_param[i].mutex);
+ }
+ for (i = 0; i < thread_count; i++) {
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_thread_join(decompress_threads + i);
+ qemu_mutex_destroy(&decomp_param[i].mutex);
+ qemu_cond_destroy(&decomp_param[i].cond);
+ inflateEnd(&decomp_param[i].stream);
+ g_free(decomp_param[i].compbuf);
+ decomp_param[i].compbuf = NULL;
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+ decomp_file = NULL;
+}
+
+int compress_threads_load_setup(QEMUFile *f)
+{
+ int i, thread_count;
+
+ if (!migrate_compress()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
+ decomp_file = f;
+ for (i = 0; i < thread_count; i++) {
+ if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ goto exit;
+ }
+
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ qemu_mutex_init(&decomp_param[i].mutex);
+ qemu_cond_init(&decomp_param[i].cond);
+ decomp_param[i].done = true;
+ decomp_param[i].quit = false;
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+ return 0;
+exit:
+ compress_threads_load_cleanup();
+ return -1;
+}
+
+void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
+{
+ int idx, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ QEMU_LOCK_GUARD(&decomp_done_lock);
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (decomp_param[idx].done) {
+ decomp_param[idx].done = false;
+ qemu_mutex_lock(&decomp_param[idx].mutex);
+ qemu_get_buffer(f, decomp_param[idx].compbuf, len);
+ decomp_param[idx].des = host;
+ decomp_param[idx].len = len;
+ qemu_cond_signal(&decomp_param[idx].cond);
+ qemu_mutex_unlock(&decomp_param[idx].mutex);
+ break;
+ }
+ }
+ if (idx < thread_count) {
+ break;
+ } else {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+}
diff --git a/migration/ram-compress.h b/migration/ram-compress.h
index 06570a799c..6f7fe2f472 100644
--- a/migration/ram-compress.h
+++ b/migration/ram-compress.h
@@ -62,4 +62,9 @@ void flush_compressed_data(int
(send_queued_data(CompressParam *)));
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)));
+int wait_for_decompress_done(void);
+void compress_threads_load_cleanup(void);
+int compress_threads_load_setup(QEMUFile *f);
+void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);
+
#endif
diff --git a/migration/ram.c b/migration/ram.c
index e4e50f5363..250cb01099 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -480,24 +480,6 @@ typedef struct MigrationOps MigrationOps;
MigrationOps *migration_ops;
-struct DecompressParam {
- bool done;
- bool quit;
- QemuMutex mutex;
- QemuCond cond;
- void *des;
- uint8_t *compbuf;
- int len;
- z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
-static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
-
static int ram_save_host_page_urgent(PageSearchStatus *pss);
/* NOTE: page is the PFN not real ram_addr_t. */
@@ -3465,192 +3447,6 @@ void ram_handle_compressed(void *host, uint8_t ch,
uint64_t size)
}
}
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
- const uint8_t *source, size_t source_len)
-{
- int err;
-
- err = inflateReset(stream);
- if (err != Z_OK) {
- return -1;
- }
-
- stream->avail_in = source_len;
- stream->next_in = (uint8_t *)source;
- stream->avail_out = dest_len;
- stream->next_out = dest;
-
- err = inflate(stream, Z_NO_FLUSH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- return stream->total_out;
-}
-
-static void *do_data_decompress(void *opaque)
-{
- DecompressParam *param = opaque;
- unsigned long pagesize;
- uint8_t *des;
- int len, ret;
-
- qemu_mutex_lock(¶m->mutex);
- while (!param->quit) {
- if (param->des) {
- des = param->des;
- len = param->len;
- param->des = 0;
- qemu_mutex_unlock(¶m->mutex);
-
- pagesize = TARGET_PAGE_SIZE;
-
- ret = qemu_uncompress_data(¶m->stream, des, pagesize,
- param->compbuf, len);
- if (ret < 0 && migrate_get_current()->decompress_error_check) {
- error_report("decompress data failed");
- qemu_file_set_error(decomp_file, ret);
- }
-
- qemu_mutex_lock(&decomp_done_lock);
- param->done = true;
- qemu_cond_signal(&decomp_done_cond);
- qemu_mutex_unlock(&decomp_done_lock);
-
- qemu_mutex_lock(¶m->mutex);
- } else {
- qemu_cond_wait(¶m->cond, ¶m->mutex);
- }
- }
- qemu_mutex_unlock(¶m->mutex);
-
- return NULL;
-}
-
-static int wait_for_decompress_done(void)
-{
- int idx, thread_count;
-
- if (!migrate_compress()) {
- return 0;
- }
-
- thread_count = migrate_decompress_threads();
- qemu_mutex_lock(&decomp_done_lock);
- for (idx = 0; idx < thread_count; idx++) {
- while (!decomp_param[idx].done) {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
- }
- qemu_mutex_unlock(&decomp_done_lock);
- return qemu_file_get_error(decomp_file);
-}
-
-static void compress_threads_load_cleanup(void)
-{
- int i, thread_count;
-
- if (!migrate_compress()) {
- return;
- }
- thread_count = migrate_decompress_threads();
- for (i = 0; i < thread_count; i++) {
- /*
- * we use it as a indicator which shows if the thread is
- * properly init'd or not
- */
- if (!decomp_param[i].compbuf) {
- break;
- }
-
- qemu_mutex_lock(&decomp_param[i].mutex);
- decomp_param[i].quit = true;
- qemu_cond_signal(&decomp_param[i].cond);
- qemu_mutex_unlock(&decomp_param[i].mutex);
- }
- for (i = 0; i < thread_count; i++) {
- if (!decomp_param[i].compbuf) {
- break;
- }
-
- qemu_thread_join(decompress_threads + i);
- qemu_mutex_destroy(&decomp_param[i].mutex);
- qemu_cond_destroy(&decomp_param[i].cond);
- inflateEnd(&decomp_param[i].stream);
- g_free(decomp_param[i].compbuf);
- decomp_param[i].compbuf = NULL;
- }
- g_free(decompress_threads);
- g_free(decomp_param);
- decompress_threads = NULL;
- decomp_param = NULL;
- decomp_file = NULL;
-}
-
-static int compress_threads_load_setup(QEMUFile *f)
-{
- int i, thread_count;
-
- if (!migrate_compress()) {
- return 0;
- }
-
- thread_count = migrate_decompress_threads();
- decompress_threads = g_new0(QemuThread, thread_count);
- decomp_param = g_new0(DecompressParam, thread_count);
- qemu_mutex_init(&decomp_done_lock);
- qemu_cond_init(&decomp_done_cond);
- decomp_file = f;
- for (i = 0; i < thread_count; i++) {
- if (inflateInit(&decomp_param[i].stream) != Z_OK) {
- goto exit;
- }
-
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
- qemu_mutex_init(&decomp_param[i].mutex);
- qemu_cond_init(&decomp_param[i].cond);
- decomp_param[i].done = true;
- decomp_param[i].quit = false;
- qemu_thread_create(decompress_threads + i, "decompress",
- do_data_decompress, decomp_param + i,
- QEMU_THREAD_JOINABLE);
- }
- return 0;
-exit:
- compress_threads_load_cleanup();
- return -1;
-}
-
-static void decompress_data_with_multi_threads(QEMUFile *f,
- void *host, int len)
-{
- int idx, thread_count;
-
- thread_count = migrate_decompress_threads();
- QEMU_LOCK_GUARD(&decomp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (decomp_param[idx].done) {
- decomp_param[idx].done = false;
- qemu_mutex_lock(&decomp_param[idx].mutex);
- qemu_get_buffer(f, decomp_param[idx].compbuf, len);
- decomp_param[idx].des = host;
- decomp_param[idx].len = len;
- qemu_cond_signal(&decomp_param[idx].cond);
- qemu_mutex_unlock(&decomp_param[idx].mutex);
- break;
- }
- }
- if (idx < thread_count) {
- break;
- } else {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
- }
-}
-
static void colo_init_ram_state(void)
{
ram_state_init(&ram_state);
--
2.40.0
- [PULL 06/21] migration/rdma: Unfold last user of acct_update_position(), (continued)
- [PULL 06/21] migration/rdma: Unfold last user of acct_update_position(), Juan Quintela, 2023/04/28
- [PULL 08/21] migration: Drop unused parameter for migration_tls_client_create(), Juan Quintela, 2023/04/28
- [PULL 10/21] qtest/migration-test.c: Add postcopy tests with compress enabled, Juan Quintela, 2023/04/28
- [PULL 09/21] qtest/migration-test.c: Add tests with compress enabled, Juan Quintela, 2023/04/28
- [PULL 11/21] ram.c: Let the compress threads return a CompressResult enum, Juan Quintela, 2023/04/28
- [PULL 14/21] ram.c: Do not call save_page_header() from compress threads, Juan Quintela, 2023/04/28
- [PULL 12/21] ram.c: Dont change param->block in the compress thread, Juan Quintela, 2023/04/28
- [PULL 13/21] ram.c: Reset result after sending queued data, Juan Quintela, 2023/04/28
- [PULL 16/21] ram.c: Remove last ram.c dependency from the core compress code, Juan Quintela, 2023/04/28
- [PULL 15/21] ram.c: Call update_compress_thread_counts from compress_send_queued_data, Juan Quintela, 2023/04/28
- [PULL 18/21] ram.c: Move core decompression code into its own file,
Juan Quintela <=
- [PULL 19/21] ram compress: Assert that the file buffer matches the result, Juan Quintela, 2023/04/28
- [PULL 17/21] ram.c: Move core compression code into its own file, Juan Quintela, 2023/04/28
- [PULL 21/21] migration: Initialize and cleanup decompression in migration.c, Juan Quintela, 2023/04/28
- [PULL 20/21] ram-compress.c: Make target independent, Juan Quintela, 2023/04/28
- Re: [PULL 00/21] Migration 20230428 patches, Richard Henderson, 2023/04/29