qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets


From: Venkateswararao Jujjuri (JV)
Subject: Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
Date: Tue, 19 Oct 2010 14:00:24 -0700
User-agent: Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.9) Gecko/20100915 Thunderbird/3.1.4

On 10/19/2010 11:36 AM, Balbir Singh wrote:
> * Arun R B <address@hidden> [2010-10-19 23:12:45]:
> 
>> From: Aneesh Kumar K.V <address@hidden>
>>
>> This patch creates a generic asynchronous-task-offloading infrastructure 
>> named
>> threadlets. The core idea has been borrowed from the threading framework that
>> is being used by paio.
>>
>> The reason for creating this generic infrastructure is so that other 
>> subsystems,
>> such as virtio-9p could make use of it for offloading tasks that could block.
>>
>> The patch creates a global queue on-to which subsystems can queue their 
>> tasks to
>> be executed asynchronously.
>>
>> The patch also provides API's that allow a subsystem to create a private 
>> queue
>> with an associated pool of threads.
>>
>> address@hidden: Facelift of the code, Documentation, cancel_threadlet
>> and other helpers]
>>
>> Signed-off-by: Aneesh Kumar K.V <address@hidden>
>> Signed-off-by: Gautham R Shenoy <address@hidden>
>> Signed-off-by: Sripathi Kodi <address@hidden>
>> Signed-off-by: Arun R Bharadwaj <address@hidden>
>> ---
>>  Makefile.objs          |    3 +
>>  docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++++
>>  qemu-threadlets.c      |  165 
>> ++++++++++++++++++++++++++++++++++++++++++++++++
>>  qemu-threadlets.h      |   48 ++++++++++++++
>>  4 files changed, 356 insertions(+), 1 deletions(-)
>>  create mode 100644 docs/async-support.txt
>>  create mode 100644 qemu-threadlets.c
>>  create mode 100644 qemu-threadlets.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index cd5a24b..2cf8aba 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>>
>>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
>> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
>>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> @@ -124,7 +126,6 @@ endif
>>  common-obj-y += $(addprefix ui/, $(ui-obj-y))
>>
>>  common-obj-y += iov.o acl.o
>> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
>>  common-obj-y += notify.o event_notifier.o
>>  common-obj-y += qemu-timer.o
>>
>> diff --git a/docs/async-support.txt b/docs/async-support.txt
>> new file mode 100644
>> index 0000000..9f22b9a
>> --- /dev/null
>> +++ b/docs/async-support.txt
>> @@ -0,0 +1,141 @@
>> +== How to use the threadlets infrastructure supported in Qemu ==
>> +
>> +== Threadlets ==
>> +
>> +Q.1: What are threadlets ?
>> +A.1: Threadlets is an infrastructure within QEMU that allows other 
>> subsystems
>> +     to offload possibly blocking work to a queue to be processed by a pool
>> +     of threads asynchronously.
>> +
>> +Q.2: When would one want to use threadlets ?
>> +A.2: Threadlets are useful when there are operations that can be performed
>> +     outside the context of the VCPU/IO threads inorder to free these latter
>> +     to service any other guest requests.
>> +
>> +Q.3: I have some work that can be executed in an asynchronous context. How
>> +     should I go about it ?
>> +A.3: One could follow the steps listed below:
>> +
>> +     - Define a function which would do the asynchronous work.
>> +    static void my_threadlet_func(ThreadletWork *work)
>> +    {
>> +    }
>> +
>> +     - Declare an object of type ThreadletWork;
>> +    ThreadletWork work;
>> +
>> +
>> +     - Assign a value to the "func" member of ThreadletWork object.
>> +    work.func = my_threadlet_func;
>> +
>> +     - Submit the threadlet to the global queue.
>> +    submit_threadletwork(&work);
>> +
>> +     - Continue servicing some other guest operations.
>> +
>> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
>> +     that ?
>> +A.4: Suppose you want my_threadlet_func to access some non-global 
>> data-object
>> +     of type myPrivateData. In that case one could follow the following 
>> steps.
>> +
>> +     - Define a member of the type ThreadletWork within myPrivateData.
>> +    typedef struct MyPrivateData {
>> +            ...;
>> +            ...;
>> +            ...;
>> +            ThreadletWork work;
>> +    } MyPrivateData;
>> +
>> +    MyPrivateData my_data;
>> +
>> +     - Initialize myData.work as described in A.3
>> +    myData.work.func = my_threadlet_func;
>> +    submit_threadletwork(&myData.work);
>> +
>> +     - Access the myData object inside my_threadlet_func() using 
>> container_of
>> +       primitive
>> +    static void my_threadlet_func(ThreadletWork *work)
>> +    {
>> +            myPrivateData *mydata_ptr;
>> +            mydata_ptr = container_of(work, myPrivateData, work);
>> +
>> +            /* mydata_ptr now points to myData object */
>> +    }
>> +
>> +Q.5: Are there any precautions one must take while sharing data with the
>> +     Asynchronous thread-pool ?
>> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
>> +     does not access/modify data when it can be accessed or modified in the
>> +     context of VCPU thread or IO thread. This is because the asynchronous
>> +     threads in the pool can run in parallel with the VCPU/IOThreads as 
>> shown
>> +     in the figure.
>> +
>> +     A typical workflow is as follows:
>> +
>> +              VCPU/IOThread
>> +                   |
>> +                   | (1)
>> +                   |
>> +                   V
>> +                Offload work              (2)
>> +      |-------> to threadlets -----------------------------> Helper thread
>> +      |            |                                               |
>> +      |            |                                               |
>> +      |            | (3)                                           | (4)
>> +      |            |                                               |
>> +      |         Handle other Guest requests                        |
>> +      |            |                                               |
>> +      |            |                                               V
>> +      |            | (3)                                  Signal the I/O 
>> Thread
>> +      |(6)         |                                               |
>> +      |            |                                              /
>> +      |            |                                             /
>> +      |            V                                            /
>> +      |          Do the post <---------------------------------/
>> +      |          processing               (5)
>> +      |            |
>> +      |            | (6)
>> +      |            V
>> +      |-Yes------ More async work?
>> +                   |
>> +                   | (7)
>> +               No
>> +                   |
>> +                   |
>> +                   .
>> +                   .
>> +
>> +    Hence one needs to make sure that in the steps (3) and (4) which run in
>> +    parallel, any global data is accessed within only one context.
>> +
>> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
>> +A.6: Threadlets framework provides the API cancel_threadlet:
>> +       - int cancel_threadletwork(ThreadletWork *work)
>> +
>> +     The API scans the ThreadletQueue to see if (work) is present. If it 
>> finds
>> +     work, it'll dequeue work and return 0.
>> +
>> +     On the other hand, if it does not find the (work) in the 
>> ThreadletQueue,
>> +     then it'll return 1. This can imply two things. Either the work is 
>> being
>> +     processed by one of the helper threads or it has been processed. The
>> +     threadlet infrastructure currently _does_not_ distinguish between these
>> +     two and the onus is on the caller to do that.
>> +
>> +Q.7: Apart from the global pool of threads, can I have my own private Queue 
>> ?
>> +A.7: Yes, the threadlets framework allows subsystems to create their own 
>> private
>> +     queues with associated pools of threads.
>> +
>> +     - Define a PrivateQueue
>> +       ThreadletQueue myQueue;
>> +
>> +     - Initialize it:
>> +       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
>> +       where my_max_threads is the maximum number of threads that can be in 
>> the
>> +       thread pool and my_min_threads is the minimum number of active 
>> threads
>> +       that can be in the thread-pool.
>> +
>> +     - Submit work:
>> +       submit_threadletwork_to_queue(&myQueue, &my_work);
>> +
>> +     - Cancel work:
>> +       cancel_threadletwork_on_queue(&myQueue, &my_work);
>> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
>> new file mode 100644
>> index 0000000..fd33752
>> --- /dev/null
>> +++ b/qemu-threadlets.c
>> @@ -0,0 +1,165 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Anthony Liguori     <address@hidden>
>> + *  Aneesh Kumar K.V    <address@hidden>
>> + *  Gautham R Shenoy    <address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu-threadlets.h"
>> +#include "osdep.h"
>> +
>> +#define MAX_GLOBAL_THREADS  64
>> +#define MIN_GLOBAL_THREADS  64
>> +static ThreadletQueue globalqueue;
>> +static int globalqueue_init;
>> +
>> +static void *threadlet_worker(void *data)
>> +{
>> +    ThreadletQueue *queue = data;
>> +
> Ideally you need
> 
>         s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
> 
> But qemu will need to wrap this around as well.
>> +    qemu_mutex_lock(&(queue->lock));
>> +    while (1) {
>> +        ThreadletWork *work;
>> +        int ret = 0;
>> +
>> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
>> +               (ret != ETIMEDOUT)) {
>> +            ret = qemu_cond_timedwait(&(queue->cond),
>> +                                     &(queue->lock), 10*100000);
> 
> Ewww... what is 10*100000, can we use something more meaningful
> please?

Or at least some comment...
> 
>> +        }
>> +
>> +        assert(queue->idle_threads != 0);
> 
> This assertion holds because we believe one of the idle_threads
> actually did the dequeuing, right?

Correct.. or there is no work..and we are woken up by the time out.
Of course in that case #of worker threads should be min_threads.
> 
>> +        if (QTAILQ_EMPTY(&(queue->request_list))) {
>> +            if (queue->cur_threads > queue->min_threads) {
>> +                /* We retain the minimum number of threads */
>> +                break;
>> +            }
>> +        } else {
>> +            work = QTAILQ_FIRST(&(queue->request_list));
>> +            QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +
>> +            queue->idle_threads--;
>> +            qemu_mutex_unlock(&(queue->lock));
>> +
>> +            /* execute the work function */
>> +            work->func(work);
>> +
>> +            qemu_mutex_lock(&(queue->lock));
>> +            queue->idle_threads++;
>> +        }
>> +    }
>> +
>> +    queue->idle_threads--;
>> +    queue->cur_threads--;
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    return NULL;
> Does anybody do a join on the exiting thread from the pool?
Not sure what you mean here. We keep min threads and float
threads up to max limit on need basis.
> 
>> +}
>> +
>> +static void spawn_threadlet(ThreadletQueue *queue)
>> +{
>> +    QemuThread thread;
>> +
>> +    queue->cur_threads++;
>> +    queue->idle_threads++;
>> +
>> +    qemu_thread_create(&thread, threadlet_worker, queue);
> 
>> +}
>> +
>> +/**
>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>> + *                            executed asynchronously.
>> + * @queue: Per-subsystem private queue to which the new task needs
>> + *         to be submitted.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork 
>> *work)
>> +{
>> +    qemu_mutex_lock(&(queue->lock));
>> +    if (queue->idle_threads == 0 && queue->cur_threads < 
>> queue->max_threads) {
>> +        spawn_threadlet(queue);
> 
> So we hold queue->lock, spawn the thread, the spawned thread tries to
> acquire queue->lock
> 
>> +    }
>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> +    qemu_mutex_unlock(&(queue->lock));
>> +    qemu_cond_signal(&(queue->cond));
> 
> In the case that we just spawned the threadlet, the cond_signal is
> spurious. If we need predictable scheduling behaviour,
> qemu_cond_signal needs to happen with queue->lock held.
> 
> I'd rewrite the function as 
> 
> /**
>  * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>  *                            executed asynchronously.
>  * @queue: Per-subsystem private queue to which the new task needs
>  *         to be submitted.
>  * @work: Contains information about the task that needs to be submitted.
>  */
> void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> {
>     qemu_mutex_lock(&(queue->lock));
>     if (queue->idle_threads == 0 && (queue->cur_threads < 
> queue->max_threads)) {
>         spawn_threadlet(queue);
>     } else {
>         qemu_cond_signal(&(queue->cond));
>     }
>     QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>     qemu_mutex_unlock(&(queue->lock));
> }
> 
This looks fine to me. may be more cleaner than the previous one..but 
functionally
not much different.

>> +/**
>> + * submit_threadletwork: Submit to the global queue a new task to be 
>> executed
>> + *                   asynchronously.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork(ThreadletWork *work)
>> +{
>> +    if (unlikely(!globalqueue_init)) {
>> +        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>> +                                MIN_GLOBAL_THREADS);
>> +        globalqueue_init = 1;
>> +    }
> 
> What protects globalqueue_init?

It should be called in vCPU thread context which is serialized by definition.

- JV

> 
>> +
>> +    submit_threadletwork_to_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
>> + * @queue: The queue containing the task to be cancelled.
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + *          1 otherwise.
>> + */
>> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork 
>> *work)
>> +{
>> +    ThreadletWork *ret_work;
>> +    int ret = 0;
>> +
>> +    qemu_mutex_lock(&(queue->lock));
>> +    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
>> +        if (ret_work == work) {
>> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
>> +            ret = 1;
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    return ret;
>> +}
>> +
>> +/**
>> + * cancel_threadletwork: Cancel a task queued on the global queue.
> 
> NOTE: cancel is a confusing term, thread cancel is different from
> cancelling a job on the global queue, I'd preferrably call this
> dequeue_threadletwork
> 
> Generic question, is thread a reason to use threadletwork as one word,
> instead of threadlet_work? Specially since the data structure is
> called ThreadletWork.
> 
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + *          1 otherwise.
>> + */
>> +int cancel_threadletwork(ThreadletWork *work)
>> +{
>> +    return cancel_threadletwork_on_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * threadlet_queue_init: Initialize a threadlet queue.
>> + * @queue: The threadlet queue to be initialized.
>> + * @max_threads: Maximum number of threads processing the queue.
>> + * @min_threads: Minimum number of threads processing the queue.
>> + */
>> +void threadlet_queue_init(ThreadletQueue *queue,
>> +                                int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
>> new file mode 100644
>> index 0000000..9c8f9e5
>> --- /dev/null
>> +++ b/qemu-threadlets.h
>> @@ -0,0 +1,48 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Anthony Liguori     <address@hidden>
>> + *  Aneesh Kumar K.V    <address@hidden>
>> + *  Gautham R Shenoy    <address@hidden>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_ASYNC_WORK_H
>> +#define QEMU_ASYNC_WORK_H
>> +
>> +#include "qemu-queue.h"
>> +#include "qemu-common.h"
>> +#include "qemu-thread.h"
>> +
>> +typedef struct ThreadletQueue
>> +{
>> +    QemuMutex lock;
>> +    QemuCond cond;
>> +    int max_threads;
>> +    int min_threads;
>> +    int cur_threads;
>> +    int idle_threads;
>> +    QTAILQ_HEAD(, ThreadletWork) request_list;
>> +} ThreadletQueue;
>> +
>> +typedef struct ThreadletWork
>> +{
>> +    QTAILQ_ENTRY(ThreadletWork) node;
>> +    void (*func)(struct ThreadletWork *work);
>> +} ThreadletWork;
>> +
>> +extern void submit_threadletwork_to_queue(ThreadletQueue *queue,
>> +                                      ThreadletWork *work);
>> +extern void submit_threadletwork(ThreadletWork *work);
>> +extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
>> +                                        ThreadletWork *work);
>> +extern int cancel_threadletwork(ThreadletWork *work);
>> +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
>> +                                 int min_threads);
>> +#endif
>>
>>
> 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]