[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH 4/6] coroutine-lock: reimplement CoRwLock to fix downgrade bu
From: |
David Edmondson |
Subject: |
Re: [PATCH 4/6] coroutine-lock: reimplement CoRwLock to fix downgrade bug |
Date: |
Wed, 17 Mar 2021 15:17:33 +0000 |
On Wednesday, 2021-03-17 at 13:16:39 +01, Paolo Bonzini wrote:
> An invariant of the current rwlock is that if multiple coroutines hold a
> reader lock, all must be runnable. The unlock implementation relies on
> this, choosing to wake a single coroutine when the final read lock
> holder exits the critical section, assuming that it will wake a
> coroutine attempting to acquire a write lock.
>
> The downgrade implementation violates this assumption by creating a
> read lock owning coroutine that is exclusively runnable - any other
> coroutines that are waiting to acquire a read lock are *not* made
> runnable when the write lock holder converts its ownership to read
> only.
>
> More in general, the old implementation had lots of other fairness bugs.
> The root cause of the bugs was that CoQueue would wake up readers even
> if there were pending writers, and would wake up writers even if there
> were readers. In that case, the coroutine would go back to sleep *at
> the end* of the CoQueue, losing its place at the head of the line.
>
> To fix this, keep the queue of waiters explicitly in the CoRwLock
> instead of using CoQueue, and store for each whether it is a
> potential reader or a writer. This way, downgrade can look at the
> first queued coroutines and wake it only if it is a reader, causing
> all other readers in line to be released in turn.
>
> Reported-by: David Edmondson <david.edmondson@oracle.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
A couple of nits below, but it looks good overall.
Reviewed-by: David Edmondson <david.edmondson@oracle.com>
> ---
> v3->v4: clean up the code and fix upgrade logic. Fix upgrade comment too.
>
> include/qemu/coroutine.h | 17 +++--
> util/qemu-coroutine-lock.c | 148 ++++++++++++++++++++++++-------------
> 2 files changed, 106 insertions(+), 59 deletions(-)
>
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index 84eab6e3bf..7919d3bb62 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue,
> QemuLockable *lock);
> bool qemu_co_queue_empty(CoQueue *queue);
>
>
> +typedef struct CoRwTicket CoRwTicket;
> typedef struct CoRwlock {
> - int pending_writer;
> - int reader;
> CoMutex mutex;
> - CoQueue queue;
> +
> + /* Number of readers, of -1 if owned for writing. */
> + int owners;
s/, of/, or/
> +
> + /* Waiting coroutines. */
> + QSIMPLEQ_HEAD(, CoRwTicket) tickets;
> } CoRwlock;
>
> /**
> @@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock);
> /**
> * Write Locks the CoRwlock from a reader. This is a bit more efficient than
> * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock.
> - * However, if the lock cannot be upgraded immediately, control is
> transferred
> - * to the caller of the current coroutine. Also, @qemu_co_rwlock_upgrade
> - * only overrides CoRwlock fairness if there are no concurrent readers, so
> - * another writer might run while @qemu_co_rwlock_upgrade blocks.
> + * Note that if the lock cannot be upgraded immediately, control is
> transferred
> + * to the caller of the current coroutine; another writer might run while
> + * @qemu_co_rwlock_upgrade blocks.
This is better, thanks.
> */
> void qemu_co_rwlock_upgrade(CoRwlock *lock);
>
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index eb73cf11dc..2669403839 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -327,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
> trace_qemu_co_mutex_unlock_return(mutex, self);
> }
>
> +struct CoRwTicket {
> + bool read;
> + Coroutine *co;
> + QSIMPLEQ_ENTRY(CoRwTicket) next;
> +};
> +
> void qemu_co_rwlock_init(CoRwlock *lock)
> {
> - memset(lock, 0, sizeof(*lock));
> - qemu_co_queue_init(&lock->queue);
> qemu_co_mutex_init(&lock->mutex);
> + lock->owners = 0;
> + QSIMPLEQ_INIT(&lock->tickets);
> +}
> +
> +/* Releases the internal CoMutex. */
> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
> +{
> + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
> + Coroutine *co = NULL;
> +
> + /*
> + * Setting lock->owners here prevents rdlock and wrlock from
> + * sneaking in between unlock and wake.
> + */
> +
> + if (tkt) {
> + if (tkt->read) {
> + if (lock->owners >= 0) {
> + lock->owners++;
> + co = tkt->co;
> + }
> + } else {
> + if (lock->owners == 0) {
> + lock->owners = -1;
> + co = tkt->co;
> + }
> + }
> + }
> +
> + if (co) {
> + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + aio_co_wake(co);
> + } else {
> + qemu_co_mutex_unlock(&lock->mutex);
> + }
This block could be pushed up into the earlier block, but I imagine that
the compiler will do it for you.
> }
>
> void qemu_co_rwlock_rdlock(CoRwlock *lock)
> @@ -340,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
>
> qemu_co_mutex_lock(&lock->mutex);
> /* For fairness, wait if a writer is in line. */
> - while (lock->pending_writer) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> + if (lock->owners == 0 || (lock->owners > 0 &&
> QSIMPLEQ_EMPTY(&lock->tickets))) {
> + lock->owners++;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + CoRwTicket my_ticket = { true, self };
> +
> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + qemu_coroutine_yield();
> + assert(lock->owners >= 1);
> +
> + /* Possibly wake another reader, which will wake the next in line.
> */
> + qemu_co_mutex_lock(&lock->mutex);
> + qemu_co_rwlock_maybe_wake_one(lock);
> }
> - lock->reader++;
> - qemu_co_mutex_unlock(&lock->mutex);
>
> - /* The rest of the read-side critical section is run without the mutex.
> */
> self->locks_held++;
> }
>
> @@ -355,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
> Coroutine *self = qemu_coroutine_self();
>
> assert(qemu_in_coroutine());
> - if (!lock->reader) {
> - /* The critical section started in qemu_co_rwlock_wrlock. */
> - qemu_co_queue_restart_all(&lock->queue);
> - } else {
> - self->locks_held--;
> + self->locks_held--;
>
> - qemu_co_mutex_lock(&lock->mutex);
> - lock->reader--;
> - assert(lock->reader >= 0);
> - /* Wakeup only one waiting writer */
> - if (!lock->reader) {
> - qemu_co_queue_next(&lock->queue);
> - }
> + qemu_co_mutex_lock(&lock->mutex);
> + if (lock->owners > 0) {
> + lock->owners--;
> + } else {
> + assert(lock->owners == -1);
> + lock->owners = 0;
> }
> - qemu_co_mutex_unlock(&lock->mutex);
> +
> + qemu_co_rwlock_maybe_wake_one(lock);
> }
>
> void qemu_co_rwlock_downgrade(CoRwlock *lock)
> {
> - Coroutine *self = qemu_coroutine_self();
> -
> - /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
> - * qemu_co_rwlock_upgrade.
> - */
> - assert(lock->reader == 0);
> - lock->reader++;
> - qemu_co_mutex_unlock(&lock->mutex);
> + qemu_co_mutex_lock(&lock->mutex);
> + assert(lock->owners == -1);
> + lock->owners = 1;
>
> - /* The rest of the read-side critical section is run without the mutex.
> */
> - self->locks_held++;
> + /* Possibly wake another reader, which will wake the next in line. */
> + qemu_co_rwlock_maybe_wake_one(lock);
> }
>
> void qemu_co_rwlock_wrlock(CoRwlock *lock)
> {
> + Coroutine *self = qemu_coroutine_self();
> +
> qemu_co_mutex_lock(&lock->mutex);
> - lock->pending_writer++;
> - while (lock->reader) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> + if (lock->owners == 0) {
> + lock->owners = -1;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + CoRwTicket my_ticket = { false, qemu_coroutine_self() };
> +
> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + qemu_coroutine_yield();
> + assert(lock->owners == -1);
> }
> - lock->pending_writer--;
>
> - /* The rest of the write-side critical section is run with
> - * the mutex taken, so that lock->reader remains zero.
> - * There is no need to update self->locks_held.
> - */
> + self->locks_held++;
> }
>
> void qemu_co_rwlock_upgrade(CoRwlock *lock)
> {
> - Coroutine *self = qemu_coroutine_self();
> -
> qemu_co_mutex_lock(&lock->mutex);
> - assert(lock->reader > 0);
> - lock->reader--;
> - lock->pending_writer++;
> - while (lock->reader) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> - }
> - lock->pending_writer--;
> + assert(lock->owners > 0);
> + /* For fairness, wait if a writer is in line. */
> + if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
> + lock->owners = -1;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + CoRwTicket my_ticket = { false, qemu_coroutine_self() };
>
> - /* The rest of the write-side critical section is run with
> - * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
> - * not account for the lock twice in self->locks_held.
> - */
> - self->locks_held--;
> + lock->owners--;
> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> + qemu_co_rwlock_maybe_wake_one(lock);
> + qemu_coroutine_yield();
> + assert(lock->owners == -1);
> + }
> }
> --
> 2.29.2
dme.
--
Today is different. Today is not the same.
- [PATCH v4 0/6] coroutine rwlock downgrade fix, minor VDI changes, Paolo Bonzini, 2021/03/17
- [PATCH 3/6] coroutine/mutex: Store the coroutine in the CoWaitRecord only once, Paolo Bonzini, 2021/03/17
- [PATCH 1/6] block/vdi: When writing new bmap entry fails, don't leak the buffer, Paolo Bonzini, 2021/03/17
- [PATCH 5/6] test-coroutine: add rwlock upgrade test, Paolo Bonzini, 2021/03/17
- [PATCH 4/6] coroutine-lock: reimplement CoRwLock to fix downgrade bug, Paolo Bonzini, 2021/03/17
- Re: [PATCH 4/6] coroutine-lock: reimplement CoRwLock to fix downgrade bug,
David Edmondson <=
- [PATCH 2/6] block/vdi: Don't assume that blocks are larger than VdiHeader, Paolo Bonzini, 2021/03/17
- [PATCH 6/6] test-coroutine: Add rwlock downgrade test, Paolo Bonzini, 2021/03/17