qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA


From: 858585 jemmy
Subject: Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA QIOChannel
Date: Fri, 27 Apr 2018 15:56:38 +0800

On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert
<address@hidden> wrote:
> * Lidong Chen (address@hidden) wrote:
>> This patch implements bi-directional RDMA QIOChannel. Because different
>> threads may access RDMAQIOChannel concurrently, this patch use RCU to 
>> protect it.
>>
>> Signed-off-by: Lidong Chen <address@hidden>
>
> I'm a bit confused by this.
>
> I can see it's adding RCU to protect the rdma structures against
> deletion from multiple threads; that I'm OK with in principal; is that
> the only locking we need? (I guess the two directions are actually
> separate RDMAContext's so maybe).

The qio_channel_rdma_close maybe invoked by migration thread and
return path thread
concurrently, so I use a mutex to protect it.

If one thread invoke qio_channel_rdma_writev, another thread invokes
qio_channel_rdma_readv,
two threads will use separate RDMAContext, so it does not need a lock.

If two threads invoke qio_channel_rdma_writev concurrently, it will
need a lock to protect.
but I find source qemu migration thread only invoke
qio_channel_rdma_writev, the return path
thread only invokes qio_channel_rdma_readv.

The destination qemu only invoked qio_channel_rdma_readv by main
thread before postcopy and or
listen thread after postcopy.

The destination qemu have already protected it by using
qemu_mutex_lock(&mis->rp_mutex) when writing data to
source qemu.

But should we use qemu_mutex_lock to protect qio_channel_rdma_writev
and qio_channel_rdma_readv?
to avoid some change in future invoke qio_channel_rdma_writev or
qio_channel_rdma_readv concurrently?

>
> But is there nothing else to make the QIOChannel bidirectional?
>
> Also, a lot seems dependent on listen_id, can you explain how that's
> being used.

The destination qemu is server side, so listen_id is not zero. the
source qemu is client side,
the listen_id is zero.
I use listen_id to determine whether qemu is destination or source.

for the destination qemu, if write data to source, it need use the
return_path rdma, like this:
    if (rdma->listen_id) {
        rdma = rdma->return_path;
    }

for the source qemu, if read data from destination, it also need use
the return_path rdma.
    if (!rdma->listen_id) {
        rdma = rdma->return_path;
    }

>
> Finally, I don't think you have anywhere that destroys the new mutex you
> added.
I will fix this next version.

>
> Dave
> P.S. Please cc Daniel Berrange on this series, since it's so much
> IOChannel stuff.
>
>> ---
>>  migration/rdma.c | 162 
>> +++++++++++++++++++++++++++++++++++++++++++++++++------
>>  1 file changed, 146 insertions(+), 16 deletions(-)
>>
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index f5c1d02..0652224 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -86,6 +86,7 @@ static uint32_t known_capabilities = 
>> RDMA_CAPABILITY_PIN_ALL;
>>                                  " to abort!"); \
>>                  rdma->error_reported = 1; \
>>              } \
>> +            rcu_read_unlock(); \
>>              return rdma->error_state; \
>>          } \
>>      } while (0)
>> @@ -405,6 +406,7 @@ struct QIOChannelRDMA {
>>      RDMAContext *rdma;
>>      QEMUFile *file;
>>      bool blocking; /* XXX we don't actually honour this yet */
>> +    QemuMutex lock;
>>  };
>>
>>  /*
>> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel 
>> *ioc,
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>>      QEMUFile *f = rioc->file;
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      int ret;
>>      ssize_t done = 0;
>>      size_t i;
>>      size_t len = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>> +    if (rdma->listen_id) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      /*
>> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>      ret = qemu_rdma_write_flush(f, rdma);
>>      if (ret < 0) {
>>          rdma->error_state = ret;
>> +        rcu_read_unlock();
>>          return ret;
>>      }
>>
>> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>
>>              if (ret < 0) {
>>                  rdma->error_state = ret;
>> +                rcu_read_unlock();
>>                  return ret;
>>              }
>>
>> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>          }
>>      }
>>
>> +    rcu_read_unlock();
>>      return done;
>>  }
>>
>> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel 
>> *ioc,
>>                                        Error **errp)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      RDMAControlHeader head;
>>      int ret = 0;
>>      ssize_t i;
>>      size_t done = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>> +    if (!rdma->listen_id) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      for (i = 0; i < niov; i++) {
>> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>>           * were given and dish out the bytes until we run
>>           * out of bytes.
>>           */
>> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>>          done += ret;
>>          want -= ret;
>>          /* Got what we needed, so go to next iovec */
>> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel 
>> *ioc,
>>
>>          if (ret < 0) {
>>              rdma->error_state = ret;
>> +            rcu_read_unlock();
>>              return ret;
>>          }
>>
>>          /*
>>           * SEND was received with new bytes, now try again.
>>           */
>> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>>          done += ret;
>>          want -= ret;
>>
>>          /* Still didn't get enough, so lets just return */
>>          if (want) {
>>              if (done == 0) {
>> +                rcu_read_unlock();
>>                  return QIO_CHANNEL_ERR_BLOCK;
>>              } else {
>>                  break;
>>              }
>>          }
>>      }
>> +    rcu_read_unlock();
>>      return done;
>>  }
>>
>> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
>>      GIOCondition cond = 0;
>>      *timeout = -1;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when prepare Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
>>      RDMAContext *rdma = rsource->rioc->rdma;
>>      GIOCondition cond = 0;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when check Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
>>      RDMAContext *rdma = rsource->rioc->rdma;
>>      GIOCondition cond = 0;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when dispatch Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
>>                                    Error **errp)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> +    RDMAContext *rdma;
>>      trace_qemu_rdma_close();
>> -    if (rioc->rdma) {
>> -        if (!rioc->rdma->error_state) {
>> -            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
>> -        }
>> -        qemu_rdma_cleanup(rioc->rdma);
>> -        g_free(rioc->rdma);
>> -        rioc->rdma = NULL;
>> +
>> +    qemu_mutex_lock(&rioc->lock);
>> +    rdma = rioc->rdma;
>> +    if (!rdma) {
>> +        qemu_mutex_unlock(&rioc->lock);
>> +        return 0;
>> +    }
>> +    atomic_rcu_set(&rioc->rdma, NULL);
>> +    qemu_mutex_unlock(&rioc->lock);
>> +
>> +    if (!rdma->error_state) {
>> +        rdma->error_state = qemu_file_get_error(rioc->file);
>> +    }
>> +    qemu_rdma_cleanup(rdma);
>> +
>> +    if (rdma->return_path) {
>> +        qemu_rdma_cleanup(rdma->return_path);
>> +        g_free(rdma->return_path);
>>      }
>> +
>> +    g_free(rdma);
>>      return 0;
>>  }
>>
>> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
>> *opaque,
>>                                    size_t size, uint64_t *bytes_sent)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      int ret;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return RAM_SAVE_CONTROL_NOT_SUPP;
>>      }
>>
>> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
>> *opaque,
>>          }
>>      }
>>
>> +    rcu_read_unlock();
>>      return RAM_SAVE_CONTROL_DELAYED;
>>  err:
>>      rdma->error_state = ret;
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
>> void *opaque)
>>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>>                                   .repeat = 1 };
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> -    RDMALocalBlocks *local = &rdma->local_ram_blocks;
>> +    RDMAContext *rdma;
>> +    RDMALocalBlocks *local;
>>      RDMAControlHeader head;
>>      RDMARegister *reg, *registers;
>>      RDMACompress *comp;
>> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
>> void *opaque)
>>      int count = 0;
>>      int i = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>> +    local = &rdma->local_ram_blocks;
>>      do {
>>          trace_qemu_rdma_registration_handle_wait();
>>
>> @@ -3469,6 +3575,7 @@ out:
>>      if (ret < 0) {
>>          rdma->error_state = ret;
>>      }
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, 
>> void *opaque,
>>                                          uint64_t flags, void *data)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>> +
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>>
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return 0;
>>      }
>>
>> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, 
>> void *opaque,
>>      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>>      qemu_fflush(f);
>>
>> +    rcu_read_unlock();
>>      return 0;
>>  }
>>
>> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
>> void *opaque,
>>  {
>>      Error *local_err = NULL, **errp = &local_err;
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>>      int ret = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return 0;
>>      }
>>
>> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
>> void *opaque,
>>                      qemu_rdma_reg_whole_ram_blocks : NULL);
>>          if (ret < 0) {
>>              ERROR(errp, "receiving remote info!");
>> +            rcu_read_unlock();
>>              return ret;
>>          }
>>
>> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
>> void *opaque,
>>                          "not identical on both the source and destination.",
>>                          local->nb_blocks, nb_dest_blocks);
>>              rdma->error_state = -EINVAL;
>> +            rcu_read_unlock();
>>              return -EINVAL;
>>          }
>>
>> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
>> void *opaque,
>>                              local->block[i].length,
>>                              rdma->dest_blocks[i].length);
>>                  rdma->error_state = -EINVAL;
>> +                rcu_read_unlock();
>>                  return -EINVAL;
>>              }
>>              local->block[i].remote_host_addr =
>> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
>> void *opaque,
>>          goto err;
>>      }
>>
>> +    rcu_read_unlock();
>>      return 0;
>>  err:
>>      rdma->error_state = ret;
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, 
>> const char *mode)
>>
>>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>>      rioc->rdma = rdma;
>> +    qemu_mutex_init(&rioc->lock);
>>
>>      if (mode[0] == 'w') {
>>          rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
>> --
>> 1.8.3.1
>>
> --
> Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]