qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer


From: Peter Xu
Subject: Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer
Date: Wed, 20 Jun 2018 13:55:35 +0800
User-agent: Mutt/1.10.0 (2018-05-17)

On Mon, Jun 04, 2018 at 05:55:17PM +0800, address@hidden wrote:

[...]

(Some more comments/questions for the MP implementation...)

> +static inline int ring_mp_put(Ring *ring, void *data)
> +{
> +    unsigned int index, in, in_next, out;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +        out = atomic_read(&ring->out);

[0]

Do we need to fetch "out" with load_acquire()?  Otherwise what's the
pairing of below store_release() at [1]?

This barrier exists in SP-SC case which makes sense to me, I assume
that's also needed for MP-SC case, am I right?

> +
> +        if (__ring_is_full(ring, in, out)) {
> +            if (atomic_read(&ring->in) == in &&
> +                atomic_read(&ring->out) == out) {

Why read again?  After all the ring API seems to be designed as
non-blocking.  E.g., I see the poll at [2] below makes more sense
since when reaches [2] it means that there must be a producer that is
_doing_ the queuing, so polling is very possible to complete fast.
However here it seems to be a pure busy poll without any hint.  Then
not sure whether we should just let the caller decide whether it wants
to call ring_put() again.

> +                return -ENOBUFS;
> +            }
> +
> +            /* a entry has been fetched out, retry. */
> +            continue;
> +        }
> +
> +        in_next = in + 1;
> +    } while (atomic_cmpxchg(&ring->in, in, in_next) != in);
> +
> +    index = ring_index(ring, in);
> +
> +    /*
> +     * smp_rmb() paired with the memory barrier of (A) in ring_mp_get()
> +     * is implied in atomic_cmpxchg() as we should read ring->out first
> +     * before fetching the entry, otherwise this assert will fail.

Thanks for all these comments!  These are really helpful for
reviewers.

However I'm not sure whether I understand it correctly here on MB of
(A) for ring_mp_get() - AFAIU that should corresponds to a smp_rmb()
at [0] above when reading the "out" variable rather than this
assertion, and that's why I thought at [0] we should have something
like a load_acquire() there (which contains a rmb()).

>From content-wise, I think the code here is correct, since
atomic_cmpxchg() should have one implicit smp_mb() after all so we
don't need anything further barriers here.

> +     */
> +    assert(!atomic_read(&ring->data[index]));
> +
> +    /*
> +     * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is
> +     * implied in atomic_cmpxchg(), that is needed here as  we should read
> +     * ring->out before updating the entry, it is the same as we did in
> +     * __ring_put().
> +     *
> +     * smp_wmb() paired with the memory barrier of (C) in ring_mp_get()
> +     * is implied in atomic_cmpxchg(), that is needed as we should increase
> +     * ring->in before updating the entry.
> +     */
> +    atomic_set(&ring->data[index], data);
> +
> +    return 0;
> +}
> +
> +static inline void *ring_mp_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +
> +        /*
> +         * (C) should read ring->in first to make sure the entry pointed by 
> this
> +         * index is available
> +         */
> +        smp_rmb();
> +
> +        if (!__ring_is_empty(in, ring->out)) {
> +            break;
> +        }
> +
> +        if (atomic_read(&ring->in) == in) {
> +            return NULL;
> +        }
> +        /* new entry has been added in, retry. */
> +    } while (1);
> +
> +    index = ring_index(ring, ring->out);
> +
> +    do {
> +        data = atomic_read(&ring->data[index]);
> +        if (data) {
> +            break;
> +        }
> +        /* the producer is updating the entry, retry */
> +        cpu_relax();

[2]

> +    } while (1);
> +
> +    atomic_set(&ring->data[index], NULL);
> +
> +    /*
> +     * (B) smp_mb() is needed as we should read the entry out before
> +     * updating ring->out as we did in __ring_get().
> +     *
> +     * (A) smp_wmb() is needed as we should make the entry be NULL before
> +     * updating ring->out (which will make the entry be visible and usable).
> +     */
> +    atomic_store_release(&ring->out, ring->out + 1);

[1]

> +
> +    return data;
> +}
> +
> +static inline int ring_put(Ring *ring, void *data)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_put(ring, data);
> +    }
> +    return __ring_put(ring, data);
> +}
> +
> +static inline void *ring_get(Ring *ring)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_get(ring);
> +    }
> +    return __ring_get(ring);
> +}
> +#endif
> -- 
> 2.14.4
> 

Thanks,

-- 
Peter Xu



reply via email to

[Prev in Thread] Current Thread [Next in Thread]