qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH v2 4/5] migration: implement bi-directional RDMA QIOChannel
Date: Thu, 26 Apr 2018 18:36:42 +0100
User-agent: Mutt/1.9.5 (2018-04-13)

* Lidong Chen (address@hidden) wrote:
> This patch implements bi-directional RDMA QIOChannel. Because different
> threads may access RDMAQIOChannel concurrently, this patch use RCU to protect 
> it.
> 
> Signed-off-by: Lidong Chen <address@hidden>

I'm a bit confused by this.

I can see it's adding RCU to protect the rdma structures against
deletion from multiple threads; that I'm OK with in principal; is that
the only locking we need? (I guess the two directions are actually
separate RDMAContext's so maybe).

But is there nothing else to make the QIOChannel bidirectional?

Also, a lot seems dependent on listen_id, can you explain how that's
being used.

Finally, I don't think you have anywhere that destroys the new mutex you
added.

Dave
P.S. Please cc Daniel Berrange on this series, since it's so much
IOChannel stuff.

> ---
>  migration/rdma.c | 162 
> +++++++++++++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 146 insertions(+), 16 deletions(-)
> 
> diff --git a/migration/rdma.c b/migration/rdma.c
> index f5c1d02..0652224 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -86,6 +86,7 @@ static uint32_t known_capabilities = 
> RDMA_CAPABILITY_PIN_ALL;
>                                  " to abort!"); \
>                  rdma->error_reported = 1; \
>              } \
> +            rcu_read_unlock(); \
>              return rdma->error_state; \
>          } \
>      } while (0)
> @@ -405,6 +406,7 @@ struct QIOChannelRDMA {
>      RDMAContext *rdma;
>      QEMUFile *file;
>      bool blocking; /* XXX we don't actually honour this yet */
> +    QemuMutex lock;
>  };
>  
>  /*
> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel 
> *ioc,
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>      QEMUFile *f = rioc->file;
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      int ret;
>      ssize_t done = 0;
>      size_t i;
>      size_t len = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
> +    if (rdma->listen_id) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      /*
> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>      ret = qemu_rdma_write_flush(f, rdma);
>      if (ret < 0) {
>          rdma->error_state = ret;
> +        rcu_read_unlock();
>          return ret;
>      }
>  
> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>  
>              if (ret < 0) {
>                  rdma->error_state = ret;
> +                rcu_read_unlock();
>                  return ret;
>              }
>  
> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>          }
>      }
>  
> +    rcu_read_unlock();
>      return done;
>  }
>  
> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>                                        Error **errp)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      RDMAControlHeader head;
>      int ret = 0;
>      ssize_t i;
>      size_t done = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
> +    if (!rdma->listen_id) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      for (i = 0; i < niov; i++) {
> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>           * were given and dish out the bytes until we run
>           * out of bytes.
>           */
> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>          done += ret;
>          want -= ret;
>          /* Got what we needed, so go to next iovec */
> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>  
>          if (ret < 0) {
>              rdma->error_state = ret;
> +            rcu_read_unlock();
>              return ret;
>          }
>  
>          /*
>           * SEND was received with new bytes, now try again.
>           */
> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>          done += ret;
>          want -= ret;
>  
>          /* Still didn't get enough, so lets just return */
>          if (want) {
>              if (done == 0) {
> +                rcu_read_unlock();
>                  return QIO_CHANNEL_ERR_BLOCK;
>              } else {
>                  break;
>              }
>          }
>      }
> +    rcu_read_unlock();
>      return done;
>  }
>  
> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
>      GIOCondition cond = 0;
>      *timeout = -1;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when prepare Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
>      RDMAContext *rdma = rsource->rioc->rdma;
>      GIOCondition cond = 0;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when check Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
>      RDMAContext *rdma = rsource->rioc->rdma;
>      GIOCondition cond = 0;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when dispatch Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
>                                    Error **errp)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    RDMAContext *rdma;
>      trace_qemu_rdma_close();
> -    if (rioc->rdma) {
> -        if (!rioc->rdma->error_state) {
> -            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
> -        }
> -        qemu_rdma_cleanup(rioc->rdma);
> -        g_free(rioc->rdma);
> -        rioc->rdma = NULL;
> +
> +    qemu_mutex_lock(&rioc->lock);
> +    rdma = rioc->rdma;
> +    if (!rdma) {
> +        qemu_mutex_unlock(&rioc->lock);
> +        return 0;
> +    }
> +    atomic_rcu_set(&rioc->rdma, NULL);
> +    qemu_mutex_unlock(&rioc->lock);
> +
> +    if (!rdma->error_state) {
> +        rdma->error_state = qemu_file_get_error(rioc->file);
> +    }
> +    qemu_rdma_cleanup(rdma);
> +
> +    if (rdma->return_path) {
> +        qemu_rdma_cleanup(rdma->return_path);
> +        g_free(rdma->return_path);
>      }
> +
> +    g_free(rdma);
>      return 0;
>  }
>  
> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
> *opaque,
>                                    size_t size, uint64_t *bytes_sent)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      int ret;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return RAM_SAVE_CONTROL_NOT_SUPP;
>      }
>  
> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void 
> *opaque,
>          }
>      }
>  
> +    rcu_read_unlock();
>      return RAM_SAVE_CONTROL_DELAYED;
>  err:
>      rdma->error_state = ret;
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
> void *opaque)
>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>                                   .repeat = 1 };
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> -    RDMALocalBlocks *local = &rdma->local_ram_blocks;
> +    RDMAContext *rdma;
> +    RDMALocalBlocks *local;
>      RDMAControlHeader head;
>      RDMARegister *reg, *registers;
>      RDMACompress *comp;
> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
> void *opaque)
>      int count = 0;
>      int i = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
> +    local = &rdma->local_ram_blocks;
>      do {
>          trace_qemu_rdma_registration_handle_wait();
>  
> @@ -3469,6 +3575,7 @@ out:
>      if (ret < 0) {
>          rdma->error_state = ret;
>      }
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, 
> void *opaque,
>                                          uint64_t flags, void *data)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
> +
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
>  
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return 0;
>      }
>  
> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, 
> void *opaque,
>      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>      qemu_fflush(f);
>  
> +    rcu_read_unlock();
>      return 0;
>  }
>  
> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>  {
>      Error *local_err = NULL, **errp = &local_err;
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>      int ret = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return 0;
>      }
>  
> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>                      qemu_rdma_reg_whole_ram_blocks : NULL);
>          if (ret < 0) {
>              ERROR(errp, "receiving remote info!");
> +            rcu_read_unlock();
>              return ret;
>          }
>  
> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>                          "not identical on both the source and destination.",
>                          local->nb_blocks, nb_dest_blocks);
>              rdma->error_state = -EINVAL;
> +            rcu_read_unlock();
>              return -EINVAL;
>          }
>  
> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>                              local->block[i].length,
>                              rdma->dest_blocks[i].length);
>                  rdma->error_state = -EINVAL;
> +                rcu_read_unlock();
>                  return -EINVAL;
>              }
>              local->block[i].remote_host_addr =
> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
> void *opaque,
>          goto err;
>      }
>  
> +    rcu_read_unlock();
>      return 0;
>  err:
>      rdma->error_state = ret;
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, 
> const char *mode)
>  
>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>      rioc->rdma = rdma;
> +    qemu_mutex_init(&rioc->lock);
>  
>      if (mode[0] == 'w') {
>          rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
> -- 
> 1.8.3.1
> 
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]