[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 6/6] aiopool: protect with a mutex
From: |
Emanuele Giuseppe Esposito |
Subject: |
[PATCH 6/6] aiopool: protect with a mutex |
Date: |
Mon, 10 May 2021 10:59:41 +0200 |
Divide the fields in AioTaskPool in IN and Status, and
introduce a CoQueue instead of .wait to take care of suspending
and resuming the calling coroutine, and a lock to protect the
busy_tasks counter accesses and the AioTask .ret field.
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
block/aio_task.c | 63 ++++++++++++++++++++++++----------------
include/block/aio_task.h | 2 +-
2 files changed, 39 insertions(+), 26 deletions(-)
diff --git a/block/aio_task.c b/block/aio_task.c
index 88989fa248..7ac6b5dd72 100644
--- a/block/aio_task.c
+++ b/block/aio_task.c
@@ -27,62 +27,70 @@
#include "block/aio_task.h"
struct AioTaskPool {
- Coroutine *main_co;
- int status;
+ /* IN: just set in aio_task_pool_new and never modified */
int max_busy_tasks;
+
+ /* Status: either atomic or protected by the lock */
+ int status;
int busy_tasks;
- bool waiting;
+ CoQueue queue;
+ CoMutex lock;
};
static void coroutine_fn aio_task_co(void *opaque)
{
+ int ret;
AioTask *task = opaque;
AioTaskPool *pool = task->pool;
- assert(pool->busy_tasks < pool->max_busy_tasks);
- pool->busy_tasks++;
+ WITH_QEMU_LOCK_GUARD(&pool->lock) {
+ assert(pool->busy_tasks < pool->max_busy_tasks);
+ pool->busy_tasks++;
- task->ret = task->func(task);
+ ret = task->func(task);
+ task->ret = ret;
- pool->busy_tasks--;
+ pool->busy_tasks--;
+ }
- if (task->ret < 0 && pool->status == 0) {
- pool->status = task->ret;
+ if (ret < 0) {
+ qatomic_cmpxchg(&pool->status, 0, ret);
}
g_free(task);
- if (pool->waiting) {
- pool->waiting = false;
- aio_co_wake(pool->main_co);
- }
+ qemu_co_queue_next(&pool->queue);
}
-void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool)
+/* Called with lock held */
+static void coroutine_fn aio_task_pool_wait_one_unlocked(AioTaskPool *pool)
{
assert(pool->busy_tasks > 0);
- assert(qemu_coroutine_self() == pool->main_co);
-
- pool->waiting = true;
- qemu_coroutine_yield();
-
- assert(!pool->waiting);
+ qemu_co_queue_wait(&pool->queue, &pool->lock);
assert(pool->busy_tasks < pool->max_busy_tasks);
}
+void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool)
+{
+ QEMU_LOCK_GUARD(&pool->lock);
+ aio_task_pool_wait_one_unlocked(pool);
+}
+
void coroutine_fn aio_task_pool_wait_slot(AioTaskPool *pool)
{
+ QEMU_LOCK_GUARD(&pool->lock);
if (pool->busy_tasks < pool->max_busy_tasks) {
return;
}
- aio_task_pool_wait_one(pool);
+ aio_task_pool_wait_one_unlocked(pool);
}
void coroutine_fn aio_task_pool_wait_all(AioTaskPool *pool)
{
+ QEMU_LOCK_GUARD(&pool->lock);
while (pool->busy_tasks > 0) {
- aio_task_pool_wait_one(pool);
+ aio_task_pool_wait_one_unlocked(pool);
}
}
@@ -98,8 +106,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int
max_busy_tasks)
{
AioTaskPool *pool = g_new0(AioTaskPool, 1);
- pool->main_co = qemu_coroutine_self();
pool->max_busy_tasks = max_busy_tasks;
+ qemu_co_queue_init(&pool->queue);
return pool;
}
@@ -115,10 +123,15 @@ int aio_task_pool_status(AioTaskPool *pool)
return 0; /* Sugar for lazy allocation of aio pool */
}
- return pool->status;
+ return qatomic_read(&pool->status);
}
bool aio_task_pool_empty(AioTaskPool *pool)
{
- return pool->busy_tasks == 0;
+ int tasks;
+
+ qemu_co_mutex_lock(&pool->lock);
+ tasks = pool->busy_tasks;
+ qemu_co_mutex_unlock(&pool->lock);
+ return tasks == 0;
}
diff --git a/include/block/aio_task.h b/include/block/aio_task.h
index 50bc1e1817..b22a4310aa 100644
--- a/include/block/aio_task.h
+++ b/include/block/aio_task.h
@@ -33,7 +33,7 @@ typedef int coroutine_fn (*AioTaskFunc)(AioTask *task);
struct AioTask {
AioTaskPool *pool;
AioTaskFunc func;
- int ret;
+ int ret; /* atomic */
};
AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks);
--
2.30.2
- [PATCH 3/6] blockjob: let ratelimit handle a speed of 0, (continued)
Re: [PATCH 4/6] progressmeter: protect with a mutex, Stefan Hajnoczi, 2021/05/12
[PATCH 6/6] aiopool: protect with a mutex,
Emanuele Giuseppe Esposito <=
[PATCH 2/6] block-copy: let ratelimit handle a speed of 0, Emanuele Giuseppe Esposito, 2021/05/10
[PATCH 5/6] co-shared-resource: protect with a mutex, Emanuele Giuseppe Esposito, 2021/05/10