qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v4 07/11] virtio: fill/flush/pop for packed ring


From: Jason Wang
Subject: Re: [Qemu-devel] [PATCH v4 07/11] virtio: fill/flush/pop for packed ring
Date: Mon, 18 Feb 2019 15:51:05 +0800
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Thunderbird/60.4.0


On 2019/2/14 下午12:26, address@hidden wrote:
From: Wei Xu <address@hidden>

last_used_idx/wrap_counter should be equal to last_avail_idx/wrap_counter
after a successful flush.

Batching in vhost-net & dpdk testpmd is not equivalently supported in
userspace backend, but a chained descriptors for Rx is similarly presented
as a lightweight batch, so a write barrier is nailed only for the
first(head) descriptor.

Signed-off-by: Wei Xu <address@hidden>
---
  hw/virtio/virtio.c | 291 +++++++++++++++++++++++++++++++++++++++++++++++++----
  1 file changed, 274 insertions(+), 17 deletions(-)

diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
index 832287b..7e276b4 100644
--- a/hw/virtio/virtio.c
+++ b/hw/virtio/virtio.c
@@ -379,6 +379,25 @@ static void vring_packed_desc_read(VirtIODevice *vdev, 
VRingPackedDesc *desc,
      virtio_tswap16s(vdev, &desc->id);
  }
+static void vring_packed_desc_write_data(VirtIODevice *vdev,
+                    VRingPackedDesc *desc, MemoryRegionCache *cache, int i)
+{
+    virtio_tswap32s(vdev, &desc->len);
+    virtio_tswap16s(vdev, &desc->id);
+    address_space_write_cached(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, id),
+              &desc->id, sizeof(desc->id));
+    address_space_cache_invalidate(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, id),
+              sizeof(desc->id));
+    address_space_write_cached(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, len),
+              &desc->len, sizeof(desc->len));
+    address_space_cache_invalidate(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, len),
+              sizeof(desc->len));
+}
+
  static void vring_packed_desc_read_flags(VirtIODevice *vdev,
                      VRingPackedDesc *desc, MemoryRegionCache *cache, int i)
  {
@@ -388,6 +407,18 @@ static void vring_packed_desc_read_flags(VirtIODevice 
*vdev,
      virtio_tswap16s(vdev, &desc->flags);
  }
+static void vring_packed_desc_write_flags(VirtIODevice *vdev,
+                    VRingPackedDesc *desc, MemoryRegionCache *cache, int i)
+{
+    virtio_tswap16s(vdev, &desc->flags);
+    address_space_write_cached(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, flags),
+              &desc->flags, sizeof(desc->flags));
+    address_space_cache_invalidate(cache,
+              i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc, flags),
+              sizeof(desc->flags));
+}
+
  static inline bool is_desc_avail(struct VRingPackedDesc *desc,
                                  bool wrap_counter)
  {
@@ -554,19 +585,11 @@ bool virtqueue_rewind(VirtQueue *vq, unsigned int num)
  }
/* Called within rcu_read_lock(). */
-void virtqueue_fill(VirtQueue *vq, const VirtQueueElement *elem,
+static void virtqueue_split_fill(VirtQueue *vq, const VirtQueueElement *elem,
                      unsigned int len, unsigned int idx)
  {
      VRingUsedElem uelem;
- trace_virtqueue_fill(vq, elem, len, idx);
-
-    virtqueue_unmap_sg(vq, elem, len);
-
-    if (unlikely(vq->vdev->broken)) {
-        return;
-    }
-
      if (unlikely(!vq->vring.used)) {
          return;
      }
@@ -578,16 +601,71 @@ void virtqueue_fill(VirtQueue *vq, const VirtQueueElement 
*elem,
      vring_used_write(vq, &uelem, idx);
  }
-/* Called within rcu_read_lock(). */
-void virtqueue_flush(VirtQueue *vq, unsigned int count)
+static void virtqueue_packed_fill(VirtQueue *vq, const VirtQueueElement *elem,
+                        unsigned int len, unsigned int idx)
  {
-    uint16_t old, new;
+    uint16_t head;
+    VRingMemoryRegionCaches *caches;
+    VRingPackedDesc desc = {
+        .flags = 0,
+        .id = elem->index,
+        .len = len,
+    };
+    bool wrap_counter = vq->used_wrap_counter;
+
+    if (unlikely(!vq->vring.desc)) {
+        return;
+    }
+
+    head = vq->used_idx + idx;
+    if (head >= vq->vring.num) {
+        head -= vq->vring.num;
+        wrap_counter ^= 1;
+    }
+    if (wrap_counter) {
+        desc.flags |= (1 << VRING_PACKED_DESC_F_AVAIL);
+        desc.flags |= (1 << VRING_PACKED_DESC_F_USED);
+    } else {
+        desc.flags &= ~(1 << VRING_PACKED_DESC_F_AVAIL);
+        desc.flags &= ~(1 << VRING_PACKED_DESC_F_USED);
+    }
+
+    caches = vring_get_region_caches(vq);
+    vring_packed_desc_write_data(vq->vdev, &desc, &caches->desc, head);
+    if (idx == 0) {
+        /*
+         * Make sure descriptor id and len is written before
+         * flags for the first used buffer.
+         */
+        smp_wmb();
+    }


I suspect you need to do this unconditionally since this function doesn't do any batched writing to used ring. What it did is to write used descriptors at idx. So there's no reason to supress wmb for the idx != 0. See its usage in virtio-net.c


+
+    vring_packed_desc_write_flags(vq->vdev, &desc, &caches->desc, head);
+}
+
+void virtqueue_fill(VirtQueue *vq, const VirtQueueElement *elem,
+                    unsigned int len, unsigned int idx)
+{
+    trace_virtqueue_fill(vq, elem, len, idx);
+
+    virtqueue_unmap_sg(vq, elem, len);
if (unlikely(vq->vdev->broken)) {
-        vq->inuse -= count;
          return;
      }
+ if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) {
+        virtqueue_packed_fill(vq, elem, len, idx);
+    } else {
+        virtqueue_split_fill(vq, elem, len, idx);
+    }
+}
+
+/* Called within rcu_read_lock().  */
+static void virtqueue_split_flush(VirtQueue *vq, unsigned int count)
+{
+    uint16_t old, new;
+
      if (unlikely(!vq->vring.used)) {
          return;
      }
@@ -603,6 +681,31 @@ void virtqueue_flush(VirtQueue *vq, unsigned int count)
          vq->signalled_used_valid = false;
  }
+static void virtqueue_packed_flush(VirtQueue *vq, unsigned int count)
+{
+    if (unlikely(!vq->vring.desc)) {
+        return;
+    }
+
+    vq->inuse -= count;
+    vq->used_idx = vq->last_avail_idx;
+    vq->used_wrap_counter = vq->last_avail_wrap_counter;
+}
+
+void virtqueue_flush(VirtQueue *vq, unsigned int count)
+{
+    if (unlikely(vq->vdev->broken)) {
+        vq->inuse -= count;
+        return;
+    }
+
+    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) {
+        virtqueue_packed_flush(vq, count);
+    } else {
+        virtqueue_split_flush(vq, count);
+    }
+}
+
  void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
                      unsigned int len)
  {
@@ -1077,7 +1180,7 @@ static void *virtqueue_alloc_element(size_t sz, unsigned 
out_num, unsigned in_nu
      return elem;
  }
-void *virtqueue_pop(VirtQueue *vq, size_t sz)
+static void *virtqueue_split_pop(VirtQueue *vq, size_t sz)
  {
      unsigned int i, head, max;
      VRingMemoryRegionCaches *caches;
@@ -1092,9 +1195,6 @@ void *virtqueue_pop(VirtQueue *vq, size_t sz)
      VRingDesc desc;
      int rc;
- if (unlikely(vdev->broken)) {
-        return NULL;
-    }
      rcu_read_lock();
      if (virtio_queue_empty_rcu(vq)) {
          goto done;
@@ -1212,6 +1312,163 @@ err_undo_map:
      goto done;
  }
+static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz)
+{
+    unsigned int i, head, max;
+    VRingMemoryRegionCaches *caches;
+    MemoryRegionCache indirect_desc_cache = MEMORY_REGION_CACHE_INVALID;
+    MemoryRegionCache *cache;
+    int64_t len;
+    VirtIODevice *vdev = vq->vdev;
+    VirtQueueElement *elem = NULL;
+    unsigned out_num, in_num, elem_entries;
+    hwaddr addr[VIRTQUEUE_MAX_SIZE];
+    struct iovec iov[VIRTQUEUE_MAX_SIZE];
+    VRingPackedDesc desc;
+    uint16_t id;
+
+    rcu_read_lock();
+    if (virtio_queue_packed_empty_rcu(vq)) {
+        goto done;
+    }
+
+    /* When we start there are none of either input nor output. */
+    out_num = in_num = elem_entries = 0;
+
+    max = vq->vring.num;
+
+    if (vq->inuse >= vq->vring.num) {
+        virtio_error(vdev, "Virtqueue size exceeded");
+        goto done;
+    }
+
+    head = vq->last_avail_idx;
+    i = head;
+
+    caches = vring_get_region_caches(vq);
+    cache = &caches->desc;
+
+    /* make sure flags is read before all the other fields */
+    smp_rmb();
+    vring_packed_desc_read(vdev, &desc, cache, i);
+
+    id = desc.id;
+    if (desc.flags & VRING_DESC_F_INDIRECT) {
+
+        if (desc.len % sizeof(VRingPackedDesc)) {
+            virtio_error(vdev, "Invalid size for indirect buffer table");
+            goto done;
+        }
+
+        /* loop over the indirect descriptor table */
+        len = address_space_cache_init(&indirect_desc_cache, vdev->dma_as,
+                                       desc.addr, desc.len, false);
+        cache = &indirect_desc_cache;
+        if (len < desc.len) {
+            virtio_error(vdev, "Cannot map indirect buffer");
+            goto done;
+        }
+
+        max = desc.len / sizeof(VRingPackedDesc);
+        i = 0;
+        vring_packed_desc_read(vdev, &desc, cache, i);
+        /* Make sure we see all the fields*/
+        smp_rmb();


This looks unnecessary, all indirect descriptor should be available now.


+    }
+
+    /* Collect all the descriptors */
+    while (1) {
+        bool map_ok;
+
+        if (desc.flags & VRING_DESC_F_WRITE) {
+            map_ok = virtqueue_map_desc(vdev, &in_num, addr + out_num,
+                                        iov + out_num,
+                                        VIRTQUEUE_MAX_SIZE - out_num, true,
+                                        desc.addr, desc.len);
+        } else {
+            if (in_num) {
+                virtio_error(vdev, "Incorrect order for descriptors");
+                goto err_undo_map;
+            }
+            map_ok = virtqueue_map_desc(vdev, &out_num, addr, iov,
+                                        VIRTQUEUE_MAX_SIZE, false,
+                                        desc.addr, desc.len);
+        }
+        if (!map_ok) {
+            goto err_undo_map;
+        }
+
+        /* If we've got too many, that implies a descriptor loop. */
+        if (++elem_entries > max) {
+            virtio_error(vdev, "Looped descriptor");
+            goto err_undo_map;
+        }
+
+        if (++i >= vq->vring.num) {
+            i -= vq->vring.num;


Do we allow chain more descriptors than vq size in the case of indirect? According to the spec:

"

The device limits the number of descriptors in a list through a
transport-specific and/or device-specific value. If not limited,
the maximum number of descriptors in a list is the virt queue
size.
"

This looks possible, so the above is probably wrong if the max number of chained descriptors is negotiated through a device specific way.


+        }
+
+        if (cache == &indirect_desc_cache) {
+            if (i == max) {
+                break;
+            }
+            vring_packed_desc_read(vq->vdev, &desc, cache, i);
+        } else if (desc.flags & VRING_DESC_F_NEXT) {
+            vring_packed_desc_read(vq->vdev, &desc, cache, i);
+        } else {
+            break;
+        }
+    }
+
+    /* Now copy what we have collected and mapped */
+    elem = virtqueue_alloc_element(sz, out_num, in_num);
+    elem->index = id;
+    for (i = 0; i < out_num; i++) {
+        elem->out_addr[i] = addr[i];
+        elem->out_sg[i] = iov[i];
+    }
+    for (i = 0; i < in_num; i++) {
+        elem->in_addr[i] = addr[head + out_num + i];
+        elem->in_sg[i] = iov[out_num + i];
+    }
+
+    vq->last_avail_idx += (cache == &indirect_desc_cache) ?
+                          1 : elem_entries;
+    if (vq->last_avail_idx >= vq->vring.num) {
+        vq->last_avail_idx -= vq->vring.num;
+        vq->last_avail_wrap_counter ^= 1;
+    }
+    vq->inuse++;
+
+    vq->shadow_avail_idx = vq->last_avail_idx;
+    vq->avail_wrap_counter = vq->last_avail_wrap_counter;


It's better to name this to "shadow_avail_wrap_counter".

Thanks


+
+    trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num);
+done:
+    address_space_cache_destroy(&indirect_desc_cache);
+    rcu_read_unlock();
+
+    return elem;
+
+err_undo_map:
+    virtqueue_undo_map_desc(out_num, in_num, iov);
+    g_free(elem);
+    goto done;
+}
+
+void *virtqueue_pop(VirtQueue *vq, size_t sz)
+{
+    if (unlikely(vq->vdev->broken)) {
+        return NULL;
+    }
+
+    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) {
+        return virtqueue_packed_pop(vq, sz);
+    } else {
+        return virtqueue_split_pop(vq, sz);
+    }
+}
+
  /* virtqueue_drop_all:
   * @vq: The #VirtQueue
   * Drops all queued buffers and indicates them to the guest



reply via email to

[Prev in Thread] Current Thread [Next in Thread]