qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression thre


From: ChenLiang
Subject: Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
Date: Fri, 21 Nov 2014 15:29:10 +0800
User-agent: Mozilla/5.0 (Windows NT 6.1; rv:11.0) Gecko/20120327 Thunderbird/11.0.1

On 2014/11/6 19:08, Li Liang wrote:

> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
> 
> Reviewed-by: Eric Blake <address@hidden>
> Signed-off-by: Li Liang <address@hidden>
> ---
>  arch_init.c                   | 435 
> ++++++++++++++++++++++++++++++++++++++++--
>  hmp-commands.hx               |  56 ++++++
>  hmp.c                         |  57 ++++++
>  hmp.h                         |   6 +
>  include/migration/migration.h |  12 +-
>  include/migration/qemu-file.h |   1 +
>  migration.c                   |  99 ++++++++++
>  monitor.c                     |  21 ++
>  qapi-schema.json              |  88 ++++++++-
>  qmp-commands.hx               | 131 +++++++++++++
>  10 files changed, 890 insertions(+), 16 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> +    int buf_index;
> +    uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> +    f->buf[f->buf_index] = v;
> +    f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 24);
> +    migrate_put_byte(f, v >> 16);
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> +    migrate_put_be32(f, v >> 32);
> +    migrate_put_be32(f, v);
> +}
> +
> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> +    int l;
> +
> +    while (size > 0) {
> +        l = MIG_BUF_SIZE - f->buf_index;
> +        if (l > size) {
> +            l = size;
> +        }
> +        memcpy(f->buf + f->buf_index, buf, l);
> +        f->buf_index += l;
> +        buf += l;
> +        size -= l;
> +    }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> +        ram_addr_t offset, int cont, int flag)
> +{
> +    size_t size;
> +
> +    migrate_put_be64(f, offset | cont | flag);
> +    size = 8;
> +
> +    if (!cont) {
> +        migrate_put_byte(f, strlen(block->idstr));
> +        migrate_put_buffer(f, (uint8_t *)block->idstr,
> +                        strlen(block->idstr));
> +        size += 1 + strlen(block->idstr);
> +    }
> +    return size;
> +}
> +
> +static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
> +        int size, int level)
> +{
> +    uLong  blen = COMPRESS_BUF_SIZE;
> +    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> +            size, level) != Z_OK) {
> +        error_report("Compress Failed!\n");
> +        return 0;
> +    }
> +    migrate_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int);
> +}
> +
> +enum {
> +    COM_DONE = 0,
> +    COM_START,
> +};
> +
> +static int  compress_thread_count;
> +static int  decompress_thread_count;
> +
> +struct compress_param {
> +    int state;
> +    MigBuf migbuf;
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +    bool last_stage;
> +    int ret;
> +    int bytes_sent;
> +    uint8_t *p;
> +    int cont;
> +    bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> +    int state;
> +    void *des;
> +    uint8 compbuf[COMPRESS_BUF_SIZE];
> +    int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    compress_param *param = opaque;
> +    while (!quit_thread) {
> +        if (param->state == COM_START) {
> +            save_compress_ram_page(param);
> +            param->state = COM_DONE;
> +         } else {
> +             g_usleep(1);
> +         }
> +    }
> +
> +    return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = true;
> +    for (i = 0; i < compress_thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    compress_thread_count = s->compress_thread_count;
> +    s->compress_thread = g_malloc0(sizeof(QemuThread)
> +        * s->compress_thread_count);
> +    comp_param = g_malloc0(sizeof(compress_param) * 
> s->compress_thread_count);
> +    for (i = 0; i < s->compress_thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t 
> current_addr)
>  
>  #define ENCODING_FLAG_XBZRLE 0x1
>  
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
>                              ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset, int cont, bool last_stage)
> +                            ram_addr_t offset, int cont, bool last_stage,
> +                            bool save_to_buf)
>  {
>      int encoded_len = 0, bytes_sent = -1;
>      uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t 
> **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_sent = save_block_hdr(f, block, offset, cont, 
> RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(f, encoded_len);
> -    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> +    if (save_to_buf) {
> +        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> +        migrate_put_be16((MigBuf *)f, encoded_len);
> +        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> +    } else {
> +        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> +        qemu_put_be16((QEMUFile *)f, encoded_len);
> +        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> +    }
>      bytes_sent += encoded_len + 1 + 2;
>      acct_info.xbzrle_pages++;
>      acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
> ram_addr_t offset,
>          xbzrle_cache_zero_page(current_addr);
>      } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
>          bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> +                                      offset, cont, last_stage, false);
>          if (!last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
> ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int save_compress_ram_page(compress_param *param)
> +{
> +    int bytes_sent = param->bytes_sent;
> +    int blen = COMPRESS_BUF_SIZE;
> +    int cont = param->cont;
> +    uint8_t *p = param->p;
> +    int ret = param->ret;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +    bool last_stage = param->last_stage;
> +    /* In doubt sent page as normal */
> +    XBZRLE_cache_lock();
> +    ram_addr_t current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                atomic_inc(&acct_info.norm_pages);
> +             } else if (bytes_sent == 0) {
> +                atomic_inc(&acct_info.dup_pages);
> +             }
> +        }
> +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> +        atomic_inc(&acct_info.dup_pages);
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, 
> cont,
> +                             RAM_SAVE_FLAG_COMPRESS);
> +        migrate_put_byte(&param->migbuf, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, 
> block,
> +                              offset, cont, last_stage, true);
> +    }
> +    XBZRLE_cache_unlock();
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> +    }
> +    return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +
> +    for (idx = 0; idx < compress_thread_count; idx++) {
> +        while (comp_param[idx].state != COM_DONE) {
> +            g_usleep(0);
> +        }
> +        if (comp_param[idx].migbuf.buf_index > 0) {
> +            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                comp_param[idx].migbuf.buf_index);
> +            bytes_transferred += comp_param[idx].migbuf.buf_index;
> +            comp_param[idx].migbuf.buf_index = 0;
> +        }
> +    }
> +}
> +
> +static inline void set_common_compress_params(compress_param *param,
> +    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> +    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> +    param->ret = ret;
> +    param->bytes_sent = bytes_sent;
> +    param->block = block;
> +    param->offset = offset;
> +    param->last_stage = last_stage;
> +    param->cont = cont;
> +    param->p = p;
> +    param->bulk_stage = bulk_stage;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
> last_stage)
>      bool complete_round = false;
>      int bytes_sent = 0;
>      MemoryRegion *mr;
> +    int cont, idx, ret, len = -1;
> +    uint8_t *p;
>  
>      if (!block)
>          block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
> last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* terminate the used thread at this point*/
> +                    flush_compressed_data(f);
> +                    quit_thread = true;
> +                }
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> -            /* if page is unmodified, continue to the next */
> -            if (bytes_sent > 0) {
> -                last_sent_block = block;
> -                break;
> +            if (!migrate_use_compress()) {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +                /* if page is unmodified, continue to the next */
> +                if (bytes_sent > 0) {
> +                    last_sent_block = block;
> +                    break;
> +                }
> +            } else {
> +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 
> 0;
> +                p = memory_region_get_ram_ptr(block->mr) + offset;
> +                ret = ram_control_save_page(f, block->offset,
> +                           offset, TARGET_PAGE_SIZE, &len);
> +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> +                    if (cont == 0) {
> +                        flush_compressed_data(f);
> +                    }
> +                    set_common_compress_params(&comp_param[0],
> +                        ret, len, block, offset, last_stage, cont,
> +                        p, ram_bulk_stage);
> +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> +                    if (bytes_sent > 0) {
> +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> +                            comp_param[0].migbuf.buf_index);
> +                        comp_param[0].migbuf.buf_index = 0;
> +                        last_sent_block = block;
> +                        break;
> +                    }
> +                } else {
> +retry:
> +                    for (idx = 0; idx < compress_thread_count; idx++) {
> +                        if (comp_param[idx].state == COM_DONE) {
> +                            bytes_sent = comp_param[idx].migbuf.buf_index;
> +                            if (bytes_sent == 0) {
> +                                set_common_compress_params(&comp_param[idx],
> +                                    ret, len, block, offset, last_stage,
> +                                    cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                bytes_sent = 1;
> +                                bytes_transferred -= 1;
> +                                break;
> +                            } else if (bytes_sent > 0) {
> +                                qemu_put_buffer(f, 
> comp_param[idx].migbuf.buf,
> +                                    comp_param[idx].migbuf.buf_index);
> +                                comp_param[idx].migbuf.buf_index = 0;
> +                                set_common_compress_params(&comp_param[idx],
> +                                   ret, len, block, offset, last_stage,
> +                                   cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if (idx < compress_thread_count) {
> +                        last_sent_block = block;
> +                        break;
> +                    } else {
> +                        g_usleep(0);
> +                        goto retry;
> +                    }
> +                }
>              }
>          }
>      }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
> last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
>  
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, 
> uint64_t size)
>      }
>  }
>  
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> +    decompress_param *param = opaque;
> +    while (incomming_migration_done == false) {
> +        if (param->state == COM_START) {
> +            uLong pagesize = TARGET_PAGE_SIZE;
> +            if (uncompress((Bytef *)param->des, &pagesize,
> +                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
> +                error_report("Uncompress Failed!\n");
> +                break;
> +            }
> +            param->state = COM_DONE;
> +        } else {
> +            if (quit_thread) {
> +                break;
> +            }
> +            g_usleep(1);
> +        }
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +    decompress_thread_count = count;
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i;
> +    for (i = 0; i < decompress_thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int 
> version_id)
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>              break;
>          case RAM_SAVE_FLAG_PAGE:
> +            quit_thread = true;
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
>                  error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int 
> version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);
> +            int idx;
> +retry:
> +            for (idx = 0; idx < decompress_thread_count; idx++) {
> +                if (decomp_param[idx].state == COM_DONE)  {
> +                    memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                    decomp_param[idx].des = host;
> +                    decomp_param[idx].len = len;
> +                    decomp_param[idx].state = COM_START;
> +                    break;
> +                }
> +            }
> +            if (idx == decompress_thread_count) {
> +                g_usleep(0);
> +                goto retry;
> +            }
> +            break;
>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle 
> migrations.
>  ETEXI
>  
>      {
> +        .name       = "migrate_set_compress_level",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress level for compress migrations,"
> +                      "the level is a number between 0 and 9, 0 stands for "
> +                      "no compression.\n"
> +                      "1 stands for the fast compress speed while 9 stands 
> for"
> +                      "the highest compress ratio.",
> +        .mhandler.cmd = hmp_migrate_set_compress_level,
> +    },
> +
> +STEXI
> address@hidden migrate_set_compress_level @var{value}
> address@hidden migrate_set_compress_level
> +Set compress level to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_compress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration 
> speed,"
> +                      "the threads should be between 1 and the CPUS of your 
> system",
> +        .mhandler.cmd = hmp_migrate_set_compress_threads,
> +    },
> +
> +STEXI
> address@hidden migrate_set_compress_threads @var{value}
> address@hidden migrate_set_compress_threads
> +Set compress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_decompress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set decompress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration 
> speed,"
> +                      "the threads should be between 1 and the CPUS of your 
> system",
> +        .mhandler.cmd = hmp_migrate_set_decompress_threads,
> +    },
> +
> +STEXI
> address@hidden migrate_set_decompress_threads @var{value}
> address@hidden migrate_set_decompress_threads
> +Set decompress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
>          .params     = "value",
> @@ -1766,6 +1816,12 @@ show migration status
>  show current migration capabilities
>  @item info migrate_cache_size
>  show current migration XBZRLE cache size
> address@hidden info migrate_compress_level
> +show current migration compress level
> address@hidden info migrate_compress_threads
> +show current migration compress threads
> address@hidden info migrate_decompress_threads
> +show current migration decompress threads
>  @item info balloon
>  show balloon information
>  @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const 
> QDict *qdict)
>                     qmp_query_migrate_cache_size(NULL) >> 10);
>  }
>  
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress level: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_decompress_threads(NULL));
> +}
> +
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict)
>  {
>      CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const 
> QDict *qdict)
>      }
>  }
>  
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_level(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_decompress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict);
>  void hmp_info_block(Monitor *mon, const QDict *qdict);
>  void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict 
> *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_set_password(Monitor *mon, const QDict *qdict);
>  void hmp_expire_password(Monitor *mon, const QDict *qdict);
>  void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
>      int64_t dirty_sync_count;
>  };
>  
> +extern bool incomming_migration_done;
>  void process_incoming_migration(QEMUFile *f);
>  
>  void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>  
>  bool migrate_rdma_pin_all(void);
>  bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
>  bool migrate_auto_converge(void);
>  
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t 
> *dst, int dlen);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer 
> *input);
>  int qemu_get_fd(QEMUFile *f);
>  int qemu_fclose(QEMUFile *f);
>  int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
>  void qemu_put_byte(QEMUFile *f, int v);
>  /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>  
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
>          error_report("load of migration failed: %s", strerror(-ret));
>          exit(EXIT_FAILURE);
>      }
> +    incomming_migration_done = true;
>      qemu_announce_self();
>  
>      /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    incomming_migration_done = false;
> +    migrate_decompress_threads_create(uncompress_thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
>  
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams 
> *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams 
> *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
>      return migrate_xbzrle_cache_size();
>  }
>  
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 9 || value < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> +                  "is invalid, please input a integer between 0 and 9. ");
> +        return;
> +    }
> +
> +    s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> +    return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread 
> count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread 
> count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> +    return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> +    return uncompress_thread_count;
> +}
> +
>  void qmp_migrate_set_speed(int64_t value, Error **errp)
>  {
>      MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compress(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>  
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
> +    migrate_compress_threads_create(s);


don't create compress_threads always.
It may be better:

if (!migrate_use_xbzrle()) {
    migrate_compress_threads_create(s);
}

BTW, this patch is too big to review. Spliting it into some patch will be 
welcome.

>  }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
>          .mhandler.cmd = hmp_info_migrate_cache_size,
>      },
>      {
> +        .name       = "migrate_compress_level",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress level",
> +        .mhandler.cmd = hmp_info_migrate_compress_level,
> +    },
> +    {
> +        .name       = "migrate_compress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress thread count",
> +        .mhandler.cmd = hmp_info_migrate_compress_threads,
> +    },
> +    {
> +        .name       = "migrate_decompress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration decompress thread count",
> +        .mhandler.cmd = hmp_info_migrate_decompress_threads,
> +    },
> +    {
>          .name       = "balloon",
>          .args_type  = "",
>          .params     = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is 
> disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live 
> migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
> 'compress'] }
>  
>  ##
>  # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
>  { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>  
>  ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
>  # @ObjectPropertyInfo:
>  #
>  # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is 
> an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 
> 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-level",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> +    },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-compress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> +    },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress 
> thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 
> 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-decompress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> +    },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress 
> thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 
> 536870912 } }
> +<- { "return": {} }
>  
> +EQMP
> +    {
> +        .name       = "query-migrate-decompress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = 
> qmp_marshal_input_query_migrate_decompress_threads,
> +    },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
>      {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",






reply via email to

[Prev in Thread] Current Thread [Next in Thread]