[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return
From: |
David Gibson |
Subject: |
Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path |
Date: |
Tue, 10 Mar 2015 17:08:24 +1100 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git) wrote:
> From: "Dr. David Alan Gilbert" <address@hidden>
>
> Open a return path, and handle messages that are received upon it.
>
> Signed-off-by: Dr. David Alan Gilbert <address@hidden>
> ---
> include/migration/migration.h | 8 ++
> migration/migration.c | 178
> +++++++++++++++++++++++++++++++++++++++++-
> trace-events | 13 +++
> 3 files changed, 198 insertions(+), 1 deletion(-)
>
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 6775747..5242ead 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -73,6 +73,14 @@ struct MigrationState
>
> int state;
> MigrationParams params;
> +
> + /* State related to return path */
> + struct {
> + QEMUFile *file;
> + QemuThread rp_thread;
> + bool error;
> + } rp_state;
> +
> double mbps;
> int64_t total_time;
> int64_t downtime;
> diff --git a/migration/migration.c b/migration/migration.c
> index 80d234c..34cd4fe 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -237,6 +237,23 @@ MigrationCapabilityStatusList
> *qmp_query_migrate_capabilities(Error **errp)
> return head;
> }
>
> +/*
> + * Return true if we're already in the middle of a migration
> + * (i.e. any of the active or setup states)
> + */
> +static bool migration_already_active(MigrationState *ms)
> +{
> + switch (ms->state) {
> + case MIG_STATE_ACTIVE:
> + case MIG_STATE_SETUP:
> + return true;
> +
> + default:
> + return false;
> +
> + }
> +}
> +
> static void get_xbzrle_cache_stats(MigrationInfo *info)
> {
> if (migrate_use_xbzrle()) {
> @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, int
> old_state, int new_state)
> }
> }
>
> +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> +{
> + QEMUFile *rp = ms->rp_state.file;
> +
> + /*
> + * When stuff goes wrong (e.g. failing destination) on the rp, it can get
> + * cleaned up from a few threads; make sure not to do it twice in
> parallel
> + */
> + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
A cmpxchg seems dangerously subtle for such a basic and infrequent
operation, but ok.
> + if (rp) {
> + trace_migrate_fd_cleanup_src_rp();
> + qemu_fclose(rp);
> + }
> +}
> +
> static void migrate_fd_cleanup(void *opaque)
> {
> MigrationState *s = opaque;
> @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque)
> qemu_bh_delete(s->cleanup_bh);
> s->cleanup_bh = NULL;
>
> + migrate_fd_cleanup_src_rp(s);
> +
> if (s->file) {
> trace_migrate_fd_cleanup();
> qemu_mutex_unlock_iothread();
> @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s)
> QEMUFile *f = migrate_get_current()->file;
> trace_migrate_fd_cancel();
>
> + if (s->rp_state.file) {
> + /* shutdown the rp socket, so causing the rp thread to shutdown */
> + qemu_file_shutdown(s->rp_state.file);
I missed where qemu_file_shutdown() was implemented. Does this
introduce a leftover socket dependency?
> + }
> +
> do {
> old_state = s->state;
> if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
> @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void)
> return s->xbzrle_cache_size;
> }
>
> -/* migration thread support */
> +/*
> + * Something bad happened to the RP stream, mark an error
> + * The caller shall print something to indicate why
> + */
> +static void source_return_path_bad(MigrationState *s)
> +{
> + s->rp_state.error = true;
> + migrate_fd_cleanup_src_rp(s);
> +}
> +
> +/*
> + * Handles messages sent on the return path towards the source VM
> + *
> + */
> +static void *source_return_path_thread(void *opaque)
> +{
> + MigrationState *ms = opaque;
> + QEMUFile *rp = ms->rp_state.file;
> + uint16_t expected_len, header_len, header_com;
> + const int max_len = 512;
> + uint8_t buf[max_len];
> + uint32_t tmp32;
> + int res;
> +
> + trace_source_return_path_thread_entry();
> + while (rp && !qemu_file_get_error(rp) &&
> + migration_already_active(ms)) {
> + trace_source_return_path_thread_loop_top();
> + header_com = qemu_get_be16(rp);
> + header_len = qemu_get_be16(rp);
> +
> + switch (header_com) {
> + case MIG_RP_CMD_SHUT:
> + case MIG_RP_CMD_PONG:
> + expected_len = 4;
Could the knowledge of expected lengths be folded into the switch
below? Switching twice on the same thing is a bit icky.
> + break;
> +
> + default:
> + error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
> + header_com, header_len);
> + source_return_path_bad(ms);
> + goto out;
> + }
>
> + if (header_len > expected_len) {
> + error_report("RP: Received command 0x%04x with"
> + "incorrect length %d expecting %d",
> + header_com, header_len,
> + expected_len);
> + source_return_path_bad(ms);
> + goto out;
> + }
> +
> + /* We know we've got a valid header by this point */
> + res = qemu_get_buffer(rp, buf, header_len);
> + if (res != header_len) {
> + trace_source_return_path_thread_failed_read_cmd_data();
> + source_return_path_bad(ms);
> + goto out;
> + }
> +
> + /* OK, we have the command and the data */
> + switch (header_com) {
> + case MIG_RP_CMD_SHUT:
> + tmp32 = be32_to_cpup((uint32_t *)buf);
> + trace_source_return_path_thread_shut(tmp32);
> + if (tmp32) {
> + error_report("RP: Sibling indicated error %d", tmp32);
> + source_return_path_bad(ms);
> + }
> + /*
> + * We'll let the main thread deal with closing the RP
> + * we could do a shutdown(2) on it, but we're the only user
> + * anyway, so there's nothing gained.
> + */
> + goto out;
> +
> + case MIG_RP_CMD_PONG:
> + tmp32 = be32_to_cpup((uint32_t *)buf);
> + trace_source_return_path_thread_pong(tmp32);
> + break;
> +
> + default:
> + /* This shouldn't happen because we should catch this above */
> + trace_source_return_path_bad_header_com();
> + }
> + /* Latest command processed, now leave a gap for the next one */
> + header_com = MIG_RP_CMD_INVALID;
This assignment will always get overwritten.
> + }
> + if (rp && qemu_file_get_error(rp)) {
> + trace_source_return_path_thread_bad_end();
> + source_return_path_bad(ms);
> + }
> +
> + trace_source_return_path_thread_end();
> +out:
> + return NULL;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static int open_outgoing_return_path(MigrationState *ms)
Uh.. surely this should be open_incoming_return_path(); it's designed
to be used on the source side, AFAICT.
> +{
> +
> + ms->rp_state.file = qemu_file_get_return_path(ms->file);
> + if (!ms->rp_state.file) {
> + return -1;
> + }
> +
> + trace_open_outgoing_return_path();
> + qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> + source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
> +
> + trace_open_outgoing_return_path_continue();
> +
> + return 0;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static void await_outgoing_return_path_close(MigrationState *ms)
Likewise "incoming" here, surely.
> +{
> + /*
> + * If this is a normal exit then the destination will send a SHUT and the
> + * rp_thread will exit, however if there's an error we need to cause
> + * it to exit, which we can do by a shutdown.
> + * (canceling must also shutdown to stop us getting stuck here if
> + * the destination died at just the wrong place)
> + */
> + if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
> + qemu_file_shutdown(ms->rp_state.file);
> + }
> + trace_await_outgoing_return_path_joining();
> + qemu_thread_join(&ms->rp_state.rp_thread);
> + trace_await_outgoing_return_path_close();
> +}
> +
> +/*
> + * Master migration thread on the source VM.
> + * It drives the migration and pumps the data down the outgoing channel.
> + */
> static void *migration_thread(void *opaque)
> {
> MigrationState *s = opaque;
> diff --git a/trace-events b/trace-events
> index 4f3eff8..1951b25 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1374,12 +1374,25 @@ flic_no_device_api(int err) "flic: no Device Contral
> API support %d"
> flic_reset_failed(int err) "flic: reset failed %d"
>
> # migration.c
> +await_outgoing_return_path_close(void) ""
> +await_outgoing_return_path_joining(void) ""
> migrate_set_state(int new_state) "new state %d"
> migrate_fd_cleanup(void) ""
> +migrate_fd_cleanup_src_rp(void) ""
> migrate_fd_error(void) ""
> migrate_fd_cancel(void) ""
> migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max
> %" PRIu64
> migrate_send_rp_message(int cmd, uint16_t len) "cmd=%d, len=%d"
> +open_outgoing_return_path(void) ""
> +open_outgoing_return_path_continue(void) ""
> +source_return_path_thread_bad_end(void) ""
> +source_return_path_bad_header_com(void) ""
> +source_return_path_thread_end(void) ""
> +source_return_path_thread_entry(void) ""
> +source_return_path_thread_failed_read_cmd_data(void) ""
> +source_return_path_thread_loop_top(void) ""
> +source_return_path_thread_pong(uint32_t val) "%x"
> +source_return_path_thread_shut(uint32_t val) "%x"
> migrate_transferred(uint64_t tranferred, uint64_t time_spent, double
> bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 "
> bandwidth %g max_size %" PRId64
>
> # migration/rdma.c
--
David Gibson | I'll have my music baroque, and my code
david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_ _other_
| _way_ _around_!
http://www.ozlabs.org/~dgibson
pgptCquGKLHqO.pgp
Description: PGP signature
- Re: [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path,
David Gibson <=