[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [Qemu-block] [PATCH v5 3/9] block: Add VFIO based NVMe
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [Qemu-block] [PATCH v5 3/9] block: Add VFIO based NVMe driver |
Date: |
Fri, 12 Jan 2018 12:00:10 +0100 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Thunderbird/52.5.0 |
On 12/01/2018 10:44, Stefan Hajnoczi wrote:
>> + if (progress) {
>> + /* Notify the device so it can post more completions. */
>> + smp_mb_release();
>> + *q->cq.doorbell = cpu_to_le32(q->cq.head);
>> + if (!qemu_co_queue_empty(&q->free_req_queue)) {
>> + aio_bh_schedule_oneshot(s->aio_context, nvme_free_req_queue_cb,
>> q);
>> + }
> This is not thread-safe because the queue producer does:
>
> 1 qemu_mutex_unlock(&q->lock);
> 2 qemu_co_queue_wait(&q->free_req_queue, NULL);
> 3 qemu_mutex_lock(&q->lock);
>
> We fail to call nvme_free_req_queue_cb() when if
> (!qemu_co_queue_empty(&q->free_req_queue)) runs after 1 but before 2.
Yes, it can happen. The right solution would be to do like block/curl.c
(which has more or less the same scenario):
next = QSIMPLEQ_FIRST(&s->s->free_state_waitq);
if (next) {
QSIMPLEQ_REMOVE_HEAD(&s->s->free_state_waitq, next);
qemu_mutex_unlock(&s->s->mutex);
aio_co_wake(next->co);
qemu_mutex_lock(&s->s->mutex);
}
(where the "if" would be here, and the QSIMPLEQ_REMOVE_HEAD in
nvme_free_req_queue_cb; by the way, nvme_free_req_queue_cb also needs to
take q->lock).
I think we should have a variant of CoQueue that uses a QemuMutex. Of
course in C you have to choose between code duplication, lack of
type-safety, or ugly code, while in C++ it would be an easy application
of templates. But maybe something like this (with QemuTypedMutex moved
to another header file) is acceptable:
diff --git a/block/curl.c b/block/curl.c
index 35cf417..cd578d3 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -101,8 +101,6 @@ typedef struct CURLAIOCB {
size_t start;
size_t end;
-
- QSIMPLEQ_ENTRY(CURLAIOCB) next;
} CURLAIOCB;
typedef struct CURLSocket {
@@ -138,7 +136,7 @@ typedef struct BDRVCURLState {
bool accept_range;
AioContext *aio_context;
QemuMutex mutex;
- QSIMPLEQ_HEAD(, CURLAIOCB) free_state_waitq;
+ CoQueue free_state_waitq;
char *username;
char *password;
char *proxyusername;
@@ -538,7 +536,6 @@ static int curl_init_state(BDRVCURLState *s, CURLState
*state)
/* Called with s->mutex held. */
static void curl_clean_state(CURLState *s)
{
- CURLAIOCB *next;
int j;
for (j = 0; j < CURL_NUM_ACB; j++) {
assert(!s->acb[j]);
@@ -556,13 +553,7 @@ static void curl_clean_state(CURLState *s)
s->in_use = 0;
- next = QSIMPLEQ_FIRST(&s->s->free_state_waitq);
- if (next) {
- QSIMPLEQ_REMOVE_HEAD(&s->s->free_state_waitq, next);
- qemu_mutex_unlock(&s->s->mutex);
- aio_co_wake(next->co);
- qemu_mutex_lock(&s->s->mutex);
- }
+ qemu_co_enter_next(&s->s->free_state_waitq, &s->s->mutex);
}
static void curl_parse_filename(const char *filename, QDict *options,
@@ -784,7 +775,7 @@ static int curl_open(BlockDriverState *bs, QDict *options,
int flags,
}
DPRINTF("CURL: Opening %s\n", file);
- QSIMPLEQ_INIT(&s->free_state_waitq);
+ qemu_co_queue_init(&s->free_state_waitq);
s->aio_context = bdrv_get_aio_context(bs);
s->url = g_strdup(file);
qemu_mutex_lock(&s->mutex);
@@ -888,10 +879,7 @@ static void curl_setup_preadv(BlockDriverState *bs,
CURLAIOCB *acb)
if (state) {
break;
}
- QSIMPLEQ_INSERT_TAIL(&s->free_state_waitq, acb, next);
- qemu_mutex_unlock(&s->mutex);
- qemu_coroutine_yield();
- qemu_mutex_lock(&s->mutex);
+ qemu_co_queue_wait(&s->free_state_waitq, &s->mutex);
}
if (curl_init_state(s, state) < 0) {
diff --git a/fsdev/qemu-fsdev-throttle.c b/fsdev/qemu-fsdev-throttle.c
index 49eebb5..1dc07fb 100644
--- a/fsdev/qemu-fsdev-throttle.c
+++ b/fsdev/qemu-fsdev-throttle.c
@@ -20,13 +20,13 @@
static void fsdev_throttle_read_timer_cb(void *opaque)
{
FsThrottle *fst = opaque;
- qemu_co_enter_next(&fst->throttled_reqs[false]);
+ qemu_co_enter_next(&fst->throttled_reqs[false], NULL);
}
static void fsdev_throttle_write_timer_cb(void *opaque)
{
FsThrottle *fst = opaque;
- qemu_co_enter_next(&fst->throttled_reqs[true]);
+ qemu_co_enter_next(&fst->throttled_reqs[true], NULL);
}
void fsdev_throttle_parse_opts(QemuOpts *opts, FsThrottle *fst, Error **errp)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index ce2eb73..ec2831a 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -178,15 +178,46 @@ typedef struct CoQueue {
*/
void qemu_co_queue_init(CoQueue *queue);
+typedef struct QemuTypedMutex {
+ void *mutex;
+ void (*lock)(void *);
+ void (*unlock)(void *);
+} QemuTypedMutex;
+
+extern void *qemu_unknown_mutex_type(void *);
+#define QEMU_LOCK_FUNC(mutex)
\
+ ((void (*)(void *))
\
+ __builtin_choose_expr(
\
+ __builtin_types_compatible_p(typeof(mutex), QemuMutex *),
qemu_mutex_lock, \
+ __builtin_choose_expr(
\
+ __builtin_types_compatible_p(typeof(mutex), CoMutex *),
qemu_co_mutex_lock, \
+ (mutex) ? qemu_unknown_mutex_type : NULL)))
+
+#define QEMU_UNLOCK_FUNC(mutex)
\
+ ((void (*)(void *))
\
+ __builtin_choose_expr(
\
+ __builtin_types_compatible_p(typeof(mutex), QemuMutex *),
qemu_mutex_unlock, \
+ __builtin_choose_expr(
\
+ __builtin_types_compatible_p(typeof(mutex), CoMutex *),
qemu_co_mutex_unlock, \
+ (mutex) ? qemu_unknown_mutex_type : NULL)))
+
+#define QEMU_TYPED_MUTEX(mutex)
\
+ ((QemuTypedMutex) { mutex, QEMU_LOCK_FUNC(mutex), QEMU_UNLOCK_FUNC(mutex)
})
+
+
/**
* Adds the current coroutine to the CoQueue and transfers control to the
* caller of the coroutine. The mutex is unlocked during the wait and
* locked again afterwards.
*/
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex);
+void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuTypedMutex
mutex);
+
+#define qemu_co_queue_wait(queue, mutex)
\
+ qemu_co_queue_wait_impl(queue, QEMU_TYPED_MUTEX(mutex))
/**
* Restarts the next coroutine in the CoQueue and removes it from the queue.
+ * The mutex passed to qemu_co_queue_wait must be taken.
*
* Returns true if a coroutine was restarted, false if the queue is empty.
*/
@@ -198,9 +229,15 @@ bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
/**
- * Enter the next coroutine in the queue
+ * Enter the next coroutine in the queue.
+ * The mutex passed to qemu_co_queue_wait must be taken.
+ *
+ * Returns true if a coroutine was restarted, false if the queue is empty.
*/
-bool qemu_co_enter_next(CoQueue *queue);
+bool qemu_co_enter_next_impl(CoQueue *queue, QemuTypedMutex mutex);
+
+#define qemu_co_enter_next(queue, mutex)
\
+ qemu_co_enter_next_impl(queue, QEMU_TYPED_MUTEX(mutex))
/**
* Checks if the CoQueue is empty.
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 846ff91..f6bf952 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -40,13 +40,13 @@ void qemu_co_queue_init(CoQueue *queue)
QSIMPLEQ_INIT(&queue->entries);
}
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
+void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuTypedMutex mutex)
{
Coroutine *self = qemu_coroutine_self();
QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
- if (mutex) {
- qemu_co_mutex_unlock(mutex);
+ if (mutex.mutex) {
+ mutex.unlock(mutex.mutex);
}
/* There is no race condition here. Other threads will call
@@ -61,8 +61,8 @@ void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex
*mutex)
* primitive automatically places the woken coroutine on the
* mutex's queue. This avoids the thundering herd effect.
*/
- if (mutex) {
- qemu_co_mutex_lock(mutex);
+ if (mutex.mutex) {
+ mutex.lock(mutex.mutex);
}
}
@@ -130,7 +130,7 @@ void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
qemu_co_queue_do_restart(queue, false);
}
-bool qemu_co_enter_next(CoQueue *queue)
+bool qemu_co_enter_next_impl(CoQueue *queue, QemuTypedMutex mutex)
{
Coroutine *next;
@@ -140,7 +140,16 @@ bool qemu_co_enter_next(CoQueue *queue)
}
QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
- qemu_coroutine_enter(next);
+ /* The coroutine will need the mutex: release it to
+ * avoid a deadlock.
+ */
+ if (mutex.mutex) {
+ mutex.unlock(mutex.mutex);
+ }
+ aio_co_wake(next);
+ if (mutex.mutex) {
+ mutex.lock(mutex.mutex);
+ }
return true;
}
If that's okay, I can prepare a proper series for Fam next week.
Paolo
- [Qemu-devel] [PATCH v5 0/9] block: Add VFIO based driver for NVMe device, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 1/9] stubs: Add stubs for ram block API, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 2/9] util: Introduce vfio helpers, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 4/9] block: Introduce buf register API, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 3/9] block: Add VFIO based NVMe driver, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 5/9] block/nvme: Implement .bdrv_(un)register_buf, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 6/9] qemu-img: Map bench buffer, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 8/9] docs: Add section for NVMe VFIO driver, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 7/9] block: Move NVMe constants to a separate header, Fam Zheng, 2018/01/12
- [Qemu-devel] [PATCH v5 9/9] qapi: Add NVMe driver options to the schema, Fam Zheng, 2018/01/12
- Re: [Qemu-devel] [PATCH v5 0/9] block: Add VFIO based driver for NVMe device, Stefan Hajnoczi, 2018/01/12