[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 04/15] rcu: add call_rcu
From: |
Fam Zheng |
Subject: |
Re: [Qemu-devel] [PATCH 04/15] rcu: add call_rcu |
Date: |
Mon, 26 Jan 2015 14:04:07 +0800 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Asynchronous callbacks provided by call_rcu are particularly important
> for QEMU, because the BQL makes it hard to use synchronize_rcu.
>
> In addition, the current RCU implementation is not particularly friendly
> to multiple concurrent synchronize_rcu callers, making call_rcu even
> more important.
>
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
> docs/rcu.txt | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
> include/qemu/rcu.h | 22 ++++++++++
> util/rcu.c | 119
> +++++++++++++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 247 insertions(+), 4 deletions(-)
>
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> index 9938ad3..61752b9 100644
> --- a/docs/rcu.txt
> +++ b/docs/rcu.txt
> @@ -82,7 +82,50 @@ The core RCU API is small:
> Note that it would be valid for another update to come while
> synchronize_rcu is running. Because of this, it is better that
> the updater releases any locks it may hold before calling
> - synchronize_rcu.
> + synchronize_rcu. If this is not possible (for example, because
> + the updater is protected by the BQL), you can use call_rcu.
> +
> + void call_rcu1(struct rcu_head * head,
> + void (*func)(struct rcu_head *head));
> +
> + This function invokes func(head) after all pre-existing RCU
> + read-side critical sections on all threads have completed. This
> + marks the end of the removal phase, with func taking care
> + asynchronously of the reclamation phase.
> +
> + The foo struct needs to have an rcu_head structure added,
> + perhaps as follows:
> +
> + struct foo {
> + struct rcu_head rcu;
> + int a;
> + char b;
> + long c;
> + };
> +
> + so that the reclaimer function can fetch the struct foo address
> + and free it:
> +
> + call_rcu1(&foo.rcu, foo_reclaim);
> +
> + void foo_reclaim(struct rcu_head *rp)
> + {
> + struct foo *fp = container_of(rp, struct foo, rcu);
> + g_free(fp);
> + }
> +
> + For the common case where the rcu_head member is the first of the
> + struct, you can use the following macro.
> +
> + void call_rcu(T *p,
> + void (*func)(T *p),
> + field-name);
> +
> + call_rcu1 is typically used through this macro, in the common case
> + where the "struct rcu_head" is the first field in the struct. In
> + the above case, one could have written simply:
> +
> + call_rcu(foo_reclaim, g_free, rcu);
>
> typeof(*p) atomic_rcu_read(p);
>
> @@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
> - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
> rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
>
> +- call_rcu is a macro that has an extra argument (the name of the first
> + field in the struct, which must be a struct rcu_head), and expects the
> + type of the callback's argument to be the type of the first argument.
> + call_rcu1 is the same as Linux's call_rcu.
> +
>
> RCU PATTERNS
> ============
> @@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate
> locking):
> synchronize_rcu();
> free(old);
>
> -Note that the same idiom would be possible with reader/writer
> +If the processing cannot be done purely within the critical section, it
> +is possible to combine this idiom with a "real" reference count:
> +
> + rcu_read_lock();
> + p = atomic_rcu_read(&foo);
> + foo_ref(p);
> + rcu_read_unlock();
> + /* do something with p. */
> + foo_unref(p);
> +
> +The write side can be like this:
> +
> + qemu_mutex_lock(&foo_mutex);
> + old = foo;
> + atomic_rcu_set(&foo, new);
> + qemu_mutex_unlock(&foo_mutex);
> + synchronize_rcu();
> + foo_unref(old);
> +
> +or with call_rcu:
> +
> + qemu_mutex_lock(&foo_mutex);
> + old = foo;
> + atomic_rcu_set(&foo, new);
> + qemu_mutex_unlock(&foo_mutex);
> + call_rcu(foo_unref, old, rcu);
> +
> +In both cases, the write side only performs removal. Reclamation
> +happens when the last reference to a "foo" object is dropped.
> +Using synchronize_rcu() is undesirably expensive, because the
> +last reference may be dropped on the read side. Hence you can
> +use call_rcu() instead:
> +
> + foo_unref(struct foo *p) {
> + if (atomic_fetch_dec(&p->refcount) == 1) {
> + call_rcu(foo_destroy, p, rcu);
> + }
> + }
> +
> +
> +Note that the same idioms would be possible with reader/writer
> locks:
>
> read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
> @@ -216,13 +304,27 @@ locks:
> write_mutex_unlock(&foo_rwlock);
> free(p);
>
> + ------------------------------------------------------------------
> +
> + read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
> + p = foo; old = foo;
> + foo_ref(p); foo = new;
> + read_unlock(&foo_rwlock); foo_unref(old);
> + /* do something with p. */ write_mutex_unlock(&foo_rwlock);
> + read_lock(&foo_rwlock);
> + foo_unref(p);
> + read_unlock(&foo_rwlock);
> +
> +foo_unref could use a mechanism such as bottom halves to move deallocation
> +out of the write-side critical section.
> +
>
> RCU resizable arrays
> --------------------
>
> Resizable arrays can be used with RCU. The expensive RCU synchronization
> -only needs to take place when the array is resized. The two items to
> -take care of are:
> +(or call_rcu) only needs to take place when the array is resized.
> +The two items to take care of are:
>
> - ensuring that the old version of the array is available between removal
> and reclamation;
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> index da043f2..068a279 100644
> --- a/include/qemu/rcu.h
> +++ b/include/qemu/rcu.h
> @@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
> extern void rcu_register_thread(void);
> extern void rcu_unregister_thread(void);
>
> +struct rcu_head;
> +typedef void RCUCBFunc(struct rcu_head *head);
> +
> +struct rcu_head {
> + struct rcu_head *next;
> + RCUCBFunc *func;
> +};
> +
> +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
> +
> +/* The operands of the minus operator must have the same type,
> + * which must be the one that we specify in the cast.
> + */
> +#define call_rcu(head, func, field) \
> + call_rcu1(({ \
> + char __attribute__((unused)) \
> + offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
> + func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
> + &(head)->field; \
> + }), \
> + (RCUCBFunc *)(func))
> +
> #ifdef __cplusplus
> }
> #endif
> diff --git a/util/rcu.c b/util/rcu.c
> index 1f737d5..c9c3e6e 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -26,6 +26,7 @@
> * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> */
>
> +#include "qemu-common.h"
> #include <stdio.h>
> #include <assert.h>
> #include <stdlib.h>
> @@ -33,6 +34,7 @@
> #include <errno.h>
> #include "qemu/rcu.h"
> #include "qemu/atomic.h"
> +#include "qemu/thread.h"
>
> /*
> * Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
> @@ -149,6 +151,116 @@ void synchronize_rcu(void)
> qemu_mutex_unlock(&rcu_gp_lock);
> }
>
> +
> +#define RCU_CALL_MIN_SIZE 30
> +
> +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
> + * from liburcu. Note that head is only used by the consumer.
> + */
> +static struct rcu_head dummy;
> +static struct rcu_head *head = &dummy, **tail = &dummy.next;
> +static int rcu_call_count;
> +static QemuEvent rcu_call_ready_event;
> +
> +static void enqueue(struct rcu_head *node)
> +{
> + struct rcu_head **old_tail;
> +
> + node->next = NULL;
> + old_tail = atomic_xchg(&tail, &node->next);
> + atomic_mb_set(old_tail, node);
> +}
> +
> +static struct rcu_head *try_dequeue(void)
> +{
> + struct rcu_head *node, *next;
> +
> +retry:
> + /* Test for an empty list, which we do not expect. Note that for
> + * the consumer head and tail are always consistent. The head
> + * is consistent because only the consumer reads/writes it.
> + * The tail, because it is the first step in the enqueuing.
> + * It is only the next pointers that might be inconsistent.
> + */
> + if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
> + abort();
> + }
> +
> + /* If the head node has NULL in its next pointer, the value is
> + * wrong and we need to wait until its enqueuer finishes the update.
> + */
> + node = head;
> + next = atomic_mb_read(&head->next);
> + if (!next) {
> + return NULL;
> + }
> +
> + /* Since we are the sole consumer, and we excluded the empty case
> + * above, the queue will always have at least two nodes: the
> + * dummy node, and the one being removed. So we do not need to update
> + * the tail pointer.
> + */
> + head = next;
> +
> + /* If we dequeued the dummy node, add it back at the end and retry. */
> + if (node == &dummy) {
> + enqueue(node);
> + goto retry;
> + }
> +
> + return node;
> +}
> +
> +static void *call_rcu_thread(void *opaque)
> +{
> + struct rcu_head *node;
> +
> + for (;;) {
> + int tries = 0;
> + int n = atomic_read(&rcu_call_count);
> +
> + /* Heuristically wait for a decent number of callbacks to pile up.
> + * Fetch rcu_call_count now, we only must process elements that were
> + * added before synchronize_rcu() starts.
> + */
> + while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
> + g_usleep(100000);
> + qemu_event_reset(&rcu_call_ready_event);
> + n = atomic_read(&rcu_call_count);
> + if (n < RCU_CALL_MIN_SIZE) {
> + qemu_event_wait(&rcu_call_ready_event);
> + n = atomic_read(&rcu_call_count);
> + }
> + }
> +
> + atomic_sub(&rcu_call_count, n);
> + synchronize_rcu();
> + while (n > 0) {
> + node = try_dequeue();
> + while (!node) {
> + qemu_event_reset(&rcu_call_ready_event);
> + node = try_dequeue();
> + if (!node) {
> + qemu_event_wait(&rcu_call_ready_event);
> + node = try_dequeue();
> + }
> + }
> +
> + n--;
> + node->func(node);
> + }
> + }
> + abort();
> +}
> +
> +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> +{
> + node->func = func;
> + enqueue(node);
> + atomic_inc(&rcu_call_count);
> + qemu_event_set(&rcu_call_ready_event);
> +}
> +
> void rcu_register_thread(void)
> {
> assert(rcu_reader.ctr == 0);
> @@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
>
> static void __attribute__((__constructor__)) rcu_init(void)
> {
> + QemuThread thread;
> +
> qemu_mutex_init(&rcu_gp_lock);
> qemu_event_init(&rcu_gp_event, true);
> +
> + qemu_event_init(&rcu_call_ready_event, false);
> + qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
> + NULL, QEMU_THREAD_DETACHED);
> +
> rcu_register_thread();
> }
> --
> 1.8.3.1
>
>
>
Reviewed-by: Fam Zheng <address@hidden>
- Re: [Qemu-devel] [PATCH 01/15] rcu: add rcu library, (continued)
- [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 04/15] rcu: add call_rcu, Paolo Bonzini, 2015/01/22
- Re: [Qemu-devel] [PATCH 04/15] rcu: add call_rcu,
Fam Zheng <=
- [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly, Paolo Bonzini, 2015/01/22
- [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU, Paolo Bonzini, 2015/01/22