[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH RFC 4/4] Curling: the receiver
From: |
Juan Quintela |
Subject: |
Re: [Qemu-devel] [PATCH RFC 4/4] Curling: the receiver |
Date: |
Tue, 10 Sep 2013 16:19:48 +0200 |
User-agent: |
Gnus/5.13 (Gnus v5.13) Emacs/24.3 (gnu/linux) |
Jules Wang <address@hidden> wrote:
> The receiver does migration loop until the migration connection is
> lost. Then, it is started as a backup.
>
> The receiver does not load vm state once a migration begins,
> instead, it perfetches one whole migration data into a buffer,
> then loads vm state from that buffer afterwards.
>
> Signed-off-by: Jules Wang <address@hidden>
> ---
> include/migration/qemu-file.h | 1 +
> include/sysemu/sysemu.h | 1 +
> migration.c | 22 ++++--
> savevm.c | 154
> ++++++++++++++++++++++++++++++++++++++++--
> 4 files changed, 168 insertions(+), 10 deletions(-)
>
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 0f757fb..f01ff10 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -92,6 +92,7 @@ typedef struct QEMUFileOps {
> QEMURamHookFunc *after_ram_iterate;
> QEMURamHookFunc *hook_ram_load;
> QEMURamSaveFunc *save_page;
> + QEMUFileGetBufferFunc *get_prefetch_buffer;
> } QEMUFileOps;
>
> QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
> diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
> index b1aa059..44f23d0 100644
> --- a/include/sysemu/sysemu.h
> +++ b/include/sysemu/sysemu.h
> @@ -81,6 +81,7 @@ void qemu_savevm_state_complete(QEMUFile *f);
> void qemu_savevm_state_cancel(void);
> uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
> int qemu_loadvm_state(QEMUFile *f);
> +int qemu_loadvm_state_ft(QEMUFile *f);
>
> /* SLIRP */
> void do_info_slirp(Monitor *mon);
> diff --git a/migration.c b/migration.c
> index d8a9b2d..9be22a4 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -19,6 +19,7 @@
> #include "monitor/monitor.h"
> #include "migration/qemu-file.h"
> #include "sysemu/sysemu.h"
> +#include "sysemu/cpus.h"
> #include "block/block.h"
> #include "qemu/sockets.h"
> #include "migration/block.h"
> @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque)
> {
> QEMUFile *f = opaque;
> int ret;
> + int count = 0;
>
> - ret = qemu_loadvm_state(f);
> - qemu_fclose(f);
> - if (ret < 0) {
> - fprintf(stderr, "load of migration failed\n");
> - exit(EXIT_FAILURE);
> + if (ft_enabled()) {
> + while (qemu_loadvm_state_ft(f) >= 0) {
> + count++;
> + DPRINTF("incoming count %d\r", count);
> + }
> + qemu_fclose(f);
> + fprintf(stderr, "ft connection lost, launching self..\n");
Obviously, here we are needing something more that an fprintf,, right?
We are not checking either if it is one error.
> + } else {
> + ret = qemu_loadvm_state(f);
> + qemu_fclose(f);
> + if (ret < 0) {
> + fprintf(stderr, "load of migration failed\n");
> + exit(EXIT_FAILURE);
> + }
> }
> + cpu_synchronize_all_post_init();
> qemu_announce_self();
> DPRINTF("successfully loaded vm state\n");
>
> diff --git a/savevm.c b/savevm.c
> index 6daf690..d5bf153 100644
> --- a/savevm.c
> +++ b/savevm.c
> @@ -52,6 +52,8 @@
> #define ARP_PTYPE_IP 0x0800
> #define ARP_OP_REQUEST_REV 0x3
>
> +#define PFB_SIZE 0x010000
> +
> static int announce_self_create(uint8_t *buf,
> uint8_t *mac_addr)
> {
> @@ -135,6 +137,10 @@ struct QEMUFile {
> unsigned int iovcnt;
>
> int last_error;
> +
> + uint8_t *pfb; /* pfb -> PerFetch Buffer */
s/PreFetch/Prefetcth/
prefetch_buffer as name? not used in so many places, makes things
clearer or more convoluted? Other comments?
> +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
> + int64_t pos, int size)
> +{
> + QEMUFile *f = opaque;
> +
> + if (f->pfb_size - pos <= 0) {
> + return 0;
> + }
> +
> + if (f->pfb_size - pos < size) {
> + size = f->pfb_size - pos;
> + }
> +
> + memcpy(buf, f->pfb+pos, size);
> +
> + return size;
> +}
> +
> +
> static int socket_close(void *opaque)
> {
> QEMUFileSocket *s = opaque;
> @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
> static const QEMUFileOps socket_read_ops = {
> .get_fd = socket_get_fd,
> .get_buffer = socket_get_buffer,
> + .get_prefetch_buffer = socket_get_prefetch_buffer,
> .close = socket_close
> };
>
> if (f->last_error) {
> ret = f->last_error;
> }
> +
> + if (f->pfb) {
> + g_free(f->pfb);
g_free(f->pfb);
It already checks for NULL.
> + }
> +
> g_free(f);
> return ret;
> }
> @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v)
>
> static void qemu_file_skip(QEMUFile *f, int size)
> {
> + if (f->pfb_index + size <= f->pfb_size) {
> + f->pfb_index += size;
> + return;
> + } else {
> + size -= f->pfb_size - f->pfb_index;
> + f->pfb_index = f->pfb_size;
> + }
> +
> if (f->buf_index + size <= f->buf_size) {
> f->buf_index += size;
> }
> @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf,
> int size, size_t offset)
> {
> int pending;
> int index;
> + int done;
> +
> + if (f->ops->get_prefetch_buffer) {
> + if (f->pfb_index + offset < f->pfb_size) {
> + done = f->ops->get_prefetch_buffer(f, buf, f->pfb_index + offset,
> + size);
> + if (done == size) {
> + return size;
> + }
> + size -= done;
> + buf += done;
> + } else {
> + offset -= f->pfb_size - f->pfb_index;
> + }
> + }
>
> assert(!qemu_file_is_writable(f));
>
> @@ -875,7 +929,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
>
> static int qemu_peek_byte(QEMUFile *f, int offset)
> {
> - int index = f->buf_index + offset;
> + int index;
> +
> + if (f->pfb_index + offset < f->pfb_size) {
> + return f->pfb[f->pfb_index + offset];
> + } else {
> + offset -= f->pfb_size - f->pfb_index;
> + }
> +
> + index = f->buf_index + offset;
>
> assert(!qemu_file_is_writable(f));
>
> @@ -1851,7 +1913,7 @@ void qemu_savevm_state_begin(QEMUFile *f,
> }
> se->ops->set_params(params, se->opaque);
> }
> -
> +
> qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
> qemu_put_be32(f, QEMU_VM_FILE_VERSION);
>
> @@ -2294,8 +2356,6 @@ int qemu_loadvm_state(QEMUFile *f)
> }
> }
>
> - cpu_synchronize_all_post_init();
> -
> ret = 0;
>
> out:
> @@ -2311,6 +2371,89 @@ out:
> return ret;
> }
>
> +int qemu_loadvm_state_ft(QEMUFile *f)
> +{
> + int ret = 0;
> + int i = 0;
> + int j = 0;
> + int done = 0;
> + uint64_t size = 0;
> + uint64_t count = 0;
> + uint8_t *pfb = NULL;
> + uint8_t *buf = NULL;
> +
> + uint64_t max_mem = last_ram_offset() * 1.5;
> +
> + if (!f->ops->get_prefetch_buffer) {
> + fprintf(stderr, "Fault tolerant is not supported by this
> protocol.\n");
> + return EINVAL;
> + }
> +
> + size = PFB_SIZE;
> + pfb = g_malloc(size);
> +
> + while (true) {
> + if (count + TARGET_PAGE_SIZE >= size) {
> + if (size*2 > max_mem) {
> + fprintf(stderr, "qemu_loadvm_state_ft: warning:" \
> + "Prefetch buffer becomes too large.\n" \
> + "Fault tolerant is unstable when you see this,\n" \
> + "please increase the bandwidth or increase " \
> + "the max down time.\n");
> + break;
> + }
> + size = size * 2;
> + buf = g_try_realloc(pfb, size);
> + if (!buf) {
> + error_report("qemu_loadvm_state_ft: out of memory.\n");
> + g_free(pfb);
> + return ENOMEM;
You are not handling this error in the caller. Notice that qemu
normally
> + }
> +
> + pfb = buf;
> + }
> +
> + done = qemu_get_buffer(f, pfb + count, TARGET_PAGE_SIZE);
> +
> + ret = qemu_file_get_error(f);
> + if (ret != 0) {
> + g_free(pfb);
> + return ret;
> + }
> +
> + buf = pfb + count;
> + count += done;
> + for (i = 0; i < done; i++) {
> + if (buf[i] != 0xfe) {
> + continue;
> + }
> + if (buf[i-1] != 0xCa) {
> + continue;
> + }
> + if (buf[i-2] != 0xed) {
> + continue;
> + }
> + if (buf[i-3] == 0xFe) {
> + goto out;
> + }
Using consistent capitalation here?
Better way to look for the signature? Or, what happens if it just
happens that the data contains that magic constant?
> + }
> + }
> + out:
> + if (f->pfb) {
> + free(f->pfb);
> + }
> + f->pfb_size = count;
> + f->pfb_index = 0;
> + f->pfb = pfb;
> +
> + ret = qemu_loadvm_state(f);
> +
> + /* Skip magic number */
> + qemu_get_be32(f);
> +
> + return ret;
> +}
> +
> static BlockDriverState *find_vmstate_bs(void)
> {
> BlockDriverState *bs = NULL;
> @@ -2419,6 +2562,7 @@ void do_savevm(Monitor *mon, const QDict *qdict)
> goto the_end;
> }
> ret = qemu_savevm_state(f);
> + cpu_synchronize_all_post_init();
> vm_state_size = qemu_ftell(f);
> qemu_fclose(f);
> if (ret < 0) {