[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd thread
From: |
Dr. David Alan Gilbert |
Subject: |
Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread |
Date: |
Fri, 11 May 2018 17:32:19 +0100 |
User-agent: |
Mutt/1.9.5 (2018-04-13) |
* Juan Quintela (address@hidden) wrote:
> "Dr. David Alan Gilbert" <address@hidden> wrote:
> > * Juan Quintela (address@hidden) wrote:
> >> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap
> >> synchronizations don't happen inside a ram section, so we are safe
> >> about two channels trying to overwrite the same memory.
> >
> > OK, that's quite neat - so you don't need any extra flags in the stream
> > to do the sync; it probably needs a comment in the code somewhere so we
> > don't forget!
>
> Thanks.
>
> >> Signed-off-by: Juan Quintela <address@hidden>
> >> ---
> >> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++----
> >> migration/trace-events | 6 +++
> >> 2 files changed, 113 insertions(+), 11 deletions(-)
> >>
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index c4c185cc4c..398cb0af3b 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
> >> #define MULTIFD_MAGIC 0x11223344U
> >> #define MULTIFD_VERSION 1
> >>
> >> +#define MULTIFD_FLAG_SYNC (1 << 0)
> >> +
> >> typedef struct {
> >> uint32_t magic;
> >> uint32_t version;
> >> @@ -471,6 +473,8 @@ typedef struct {
> >> uint32_t num_packets;
> >> /* pages sent through this channel */
> >> uint32_t num_pages;
> >> + /* syncs main thread and channels */
> >> + QemuSemaphore sem_sync;
> >> } MultiFDSendParams;
> >>
> >> typedef struct {
> >> @@ -507,6 +511,8 @@ typedef struct {
> >> uint32_t num_packets;
> >> /* pages sent through this channel */
> >> uint32_t num_pages;
> >> + /* syncs main thread and channels */
> >> + QemuSemaphore sem_sync;
> >> } MultiFDRecvParams;
> >>
> >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> >> @@ -682,6 +688,10 @@ struct {
> >> int count;
> >> /* array of pages to sent */
> >> MultiFDPages_t *pages;
> >> + /* syncs main thread and channels */
> >> + QemuSemaphore sem_sync;
> >> + /* global number of generated multifd packets */
> >> + uint32_t seq;
> >
> > It's interesting you use the same comment for 'seq' in
> > MultiFDSendParams - but I guess that means only this one is the global
> > version and the others aren't really global number - they're just
> > local to that thread?
>
> Only place that "increases/generates" seq is multifd_send_pages(), that
> is what creates a new packet to be sent. So, if we see _any_ packet on
> the wire, we know the real global ordering. They are only used for
> traces, to se that packet 42 was sent through channel 3, and on
> reception you check that packet 42 is what you received through channel
> 3. They only appears on traces, but I find they useful for debugging
> synchcronization errors.
Ah, and multifd_send_pages is the main thread, and it always operates
on the multifd_send_state->seq and then passes it to the SendParams; OK.
I'm not sure how to explain that better; but it's a little confusing.
> >> + for (i = 0; i < migrate_multifd_channels(); i++) {
> >> + MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> + trace_multifd_send_sync_main_signal(p->id);
> >> +
> >> + qemu_mutex_lock(&p->mutex);
> >> + p->flags |= MULTIFD_FLAG_SYNC;
> >> + p->pending_job++;
> >> + qemu_mutex_unlock(&p->mutex);
> >> + qemu_sem_post(&p->sem);
> >> + }
>
> [1]
>
> >> + for (i = 0; i < migrate_multifd_channels(); i++) {
> >> + MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> + trace_multifd_send_sync_main_wait(p->id);
> >> + qemu_sem_wait(&multifd_send_state->sem_sync);
> >> + }
>
> [2]
>
> >> + trace_multifd_send_sync_main(multifd_send_state->seq);
> >> +}
> >> +
> >
> > OK, so this just makes each of the sending threads ack, so that seems
> > OK.
> > But what happens with an error? multifd_send_sync_main exits it's
> > loop with a 'break' if the writes fail, and that could mean they never
> > come and post the flag-sync sem.
>
> Let's see.
>
> [1]: we are just doing mutex_lock/sem_post(), if we are not able to do
> that, we have got a big race that needs to be fixed. So that bit is ok.
>
> [2]: We do an unconditional sem_wait(). Looking at the worker code.
> In this patch level, we are ok, but I agree with you than in later
> patches, we need to also do the post on the error case. Changing.
K.
> >> +
> >> + trace_multifd_recv_sync_main_wait(p->id);
> >> + qemu_sem_wait(&multifd_recv_state->sem_sync);
> >> + qemu_mutex_lock(&p->mutex);
> >> + if (multifd_recv_state->seq < p->seq) {
> >> + multifd_recv_state->seq = p->seq;
> >> + }
> >
> > Can you explain what this is for?
> > Something like the latest received block?
>
> When we are at a synhronization point, we don't know on the main thread
> when that synchronization happened (at what packet considered as a
> logical list of packages). So, we choose 'seq' from the channel with
> the highest number. That is the one that we want. We only use this
> for tracing, so we can "match" that we did a synchronization on the send
> side at packet N and we see the trace at reception side that we did it
> at packet N also.
OK, I think I see; again, this code is main thread, and it's
going around all the subthreads; so it's updating the central copy
seeing who has been received - OK.
> Remember than in a previous patch you asked me what happened if this
> does a wark around? At that point nothing. But now I need to change
> this code to be.
>
>
> multifd_recv_state->seq = 0;
> for (i = 0; i < migrate_multifd_channels(); i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
> ...
> if (multifd_recv_state->seq < p->seq) {
> multifd_recv_state->seq = p->seq;
> }
>
> And I have fixed the workaround problem, no?
Yes. Adding a note somewhat saying it's just for debug would help as
well.
> >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
> >> trace_multifd_recv_thread_start(p->id);
> >>
> >> while (true) {
> >> - qemu_sem_wait(&p->sem);
> >> qemu_mutex_lock(&p->mutex);
> >> - if (p->pending_job) {
> >> + if (true || p->pending_job) {
> >
> > A TODO I guess???
>
> Oops, that should be out.
>
> Fixed on next version.
>
> >> uint32_t used;
> >> uint32_t flags;
> >> qemu_mutex_unlock(&p->mutex);
> >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
> >> p->num_packets++;
> >> p->num_pages += used;
> >> qemu_mutex_unlock(&p->mutex);
> >> +
> >> + if (flags & MULTIFD_FLAG_SYNC) {
> >> + qemu_sem_post(&multifd_recv_state->sem_sync);
> >> + qemu_sem_wait(&p->sem_sync);
> >> + }
> >
> > Can you explain the receive side logic - I think this is waiting for all
> > receive threads to 'ack' - but how do we know that they've finished
> > receiving all data that was sent?
>
> Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And
> if they receive that flag, we know that is the last one of the sequence.
>
> synchrconization works like (2 channels to make things easy):
>
> main thread:
> we finish a RAM_SECTION;
> flush pending packets to one of the channels
> send packet with MULTIFD_FLAG_SYNC for all the channels
> wait unil all the channels have processesed the FLAG_SYNC
> At this point send the RAM_SECTION_EOS footer.
>
> worker1 worker 2
>
> if there is a pending packet, send it if there is a pending
> packet, send it
> (notice that there can't be more than one ever)
> send a pacet with SYNC flag set send a pacet with SYNC
> flag set
>
> On recetpion side
>
>
> main thread
> receives RAM_SECTION_EOS footer
> wait for works to receive a sync
>
> worker1 worker1
> process any pending packet(no sync) process any pending
> packet(no sync)
> process packet with SYNC process packet with
> SYNC
> post main thread post main thread
>
> now main thread can continue
>
> Notice that we don't care what happens first, receiving packet with SYNC
> in workeers or RAM_SECTION_EOS on main thread, all works as expected.
>
> Noticing how long took to explain this, I think that I am going to add
> this to migration documentation. Will wait for any question you had
> before adding it.
Thanks; that I think makes sense.
Dave
> Later, Juan.
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK