[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastr
From: |
Stefan Hajnoczi |
Subject: |
Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure |
Date: |
Wed, 10 Nov 2010 13:45:29 +0000 |
On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<address@hidden> wrote:
> @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb
> *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> + ssize_t ret = 0;
>
> pid = getpid();
>
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> -
> - while (QTAILQ_EMPTY(&request_list) &&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond, &lock, &ts);
> - }
> -
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> -
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> -
> - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> - ret = handle_aiocb_rw(aiocb);
> - break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> - ret = handle_aiocb_ioctl(aiocb);
> - break;
> - default:
> - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> - ret = -EINVAL;
> - break;
> - }
> -
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> -
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> + qemu_mutex_lock(&aiocb_mutex);
> + aiocb->ret = ret;
This is where qemu_cond_broadcast() is needed.
> + qemu_mutex_unlock(&aiocb_mutex);
>
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) {
> + die("kill failed");
> + }
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> + qemu_mutex_lock(&aiocb_mutex);
> aiocb->ret = -EINPROGRESS;
> - aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> + qemu_mutex_unlock(&aiocb_mutex);
> +
> + aiocb->work.func = aio_thread;
> + submit_work(&aiocb->work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> + qemu_mutex_lock(&aiocb_mutex);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> + qemu_mutex_unlock(&aiocb_mutex);
> + qemu_cond_broadcast(&aiocb_completion);
qemu_paio_return() gets the return value from qemu_paiocb. It
shouldn't have side effects and is the wrong place to broadcast
completion.
> return ret;
> }
>
> @@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
> static void paio_cancel(BlockDriverAIOCB *blockacb)
> {
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> - int active = 0;
> -
> - mutex_lock(&lock);
> - if (!acb->active) {
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> - } else if (acb->ret == -EINPROGRESS) {
> - active = 1;
> - }
> - mutex_unlock(&lock);
>
> - if (active) {
> - /* fail safe: if the aio could not be canceled, we wait for
> - it */
> - while (qemu_paio_error(acb) == EINPROGRESS)
> - ;
> + if (dequeue_work(&acb->work) != 0) {
> + /* Wait for running work item to complete */
> + qemu_mutex_lock(&aiocb_mutex);
> + while (acb->ret == -EINPROGRESS) {
> + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> + }
> + qemu_mutex_unlock(&aiocb_mutex);
I wonder if the condition variable has a measurable performance
overhead. We unconditionally broadcast on paiocb completion. One
idea would be to keep a counter of waiters (should only ever be 0 or
1) protected by aiocb_mutex and broadcast only when there is a waiter.
I just want to share this idea, I don't know if it's necessary to
implement it or if it could even work without a race condition.
> }
>
> paio_remove(acb);
> @@ -618,11 +692,12 @@ int paio_init(void)
> struct sigaction act;
> PosixAioState *s;
> int fds[2];
> - int ret;
>
> if (posix_aio_state)
> return 0;
>
> + qemu_mutex_init(&aiocb_mutex);
Also needs qemu_cond_init(&aiocb_completion).
Stefan
[Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c, Arun R Bharadwaj, 2010/11/10
[Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets, Arun R Bharadwaj, 2010/11/10