[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completio
From: |
Fabiano Rosas |
Subject: |
Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model. |
Date: |
Tue, 12 Dec 2023 16:36:13 -0300 |
Hao Xiang <hao.xiang@bytedance.com> writes:
> * Create a dedicated thread for DSA task completion.
> * DSA completion thread runs a loop and poll for completed tasks.
> * Start and stop DSA completion thread during DSA device start stop.
>
> User space application can directly submit task to Intel DSA
> accelerator by writing to DSA's device memory (mapped in user space).
> Once a task is submitted, the device starts processing it and write
> the completion status back to the task. A user space application can
> poll the task's completion status to check for completion. This change
> uses a dedicated thread to perform DSA task completion checking.
>
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
> util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 242 insertions(+), 1 deletion(-)
>
> diff --git a/util/dsa.c b/util/dsa.c
> index f82282ce99..0e68013ffb 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -44,6 +44,7 @@
>
> #define DSA_WQ_SIZE 4096
> #define MAX_DSA_DEVICES 16
> +#define DSA_COMPLETION_THREAD "dsa_completion"
>
> typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue;
>
> @@ -61,8 +62,18 @@ struct dsa_device_group {
> dsa_task_queue task_queue;
> };
>
> +struct dsa_completion_thread {
> + bool stopping;
> + bool running;
> + QemuThread thread;
> + int thread_id;
> + QemuSemaphore sem_init_done;
> + struct dsa_device_group *group;
> +};
> +
> uint64_t max_retry_count;
> static struct dsa_device_group dsa_group;
> +static struct dsa_completion_thread completion_thread;
>
>
> /**
> @@ -439,6 +450,234 @@ submit_batch_wi_async(struct buffer_zero_batch_task
> *batch_task)
> return dsa_task_enqueue(device_group, batch_task);
> }
>
> +/**
> + * @brief Poll for the DSA work item completion.
> + *
> + * @param completion A pointer to the DSA work item completion record.
> + * @param opcode The DSA opcode.
> + *
> + * @return Zero if successful, non-zero otherwise.
> + */
> +static int
> +poll_completion(struct dsa_completion_record *completion,
> + enum dsa_opcode opcode)
> +{
> + uint8_t status;
> + uint64_t retry = 0;
> +
> + while (true) {
> + // The DSA operation completes successfully or fails.
> + status = completion->status;
> + if (status == DSA_COMP_SUCCESS ||
Should we read directly from completion->status or is the compiler smart
enough to not optimize 'status' out?
> + status == DSA_COMP_PAGE_FAULT_NOBOF ||
> + status == DSA_COMP_BATCH_PAGE_FAULT ||
> + status == DSA_COMP_BATCH_FAIL) {
> + break;
> + } else if (status != DSA_COMP_NONE) {
> + /* TODO: Error handling here on unexpected failure. */
Let's make sure this is dealt with before merging.
> + fprintf(stderr, "DSA opcode %d failed with status = %d.\n",
> + opcode, status);
> + exit(1);
return instead of exiting.
> + }
> + retry++;
> + if (retry > max_retry_count) {
> + fprintf(stderr, "Wait for completion retry %lu times.\n", retry);
> + exit(1);
same here
> + }
> + _mm_pause();
> + }
> +
> + return 0;
> +}
> +
> +/**
> + * @brief Complete a single DSA task in the batch task.
> + *
> + * @param task A pointer to the batch task structure.
> + */
> +static void
> +poll_task_completion(struct buffer_zero_batch_task *task)
> +{
> + assert(task->task_type == DSA_TASK);
> +
> + struct dsa_completion_record *completion = &task->completions[0];
> + uint8_t status;
> +
> + poll_completion(completion, task->descriptors[0].opcode);
> +
> + status = completion->status;
> + if (status == DSA_COMP_SUCCESS) {
> + task->results[0] = (completion->result == 0);
> + return;
> + }
> +
> + assert(status == DSA_COMP_PAGE_FAULT_NOBOF);
> +}
> +
> +/**
> + * @brief Poll a batch task status until it completes. If DSA task doesn't
> + * complete properly, use CPU to complete the task.
> + *
> + * @param batch_task A pointer to the DSA batch task.
> + */
> +static void
> +poll_batch_task_completion(struct buffer_zero_batch_task *batch_task)
> +{
> + struct dsa_completion_record *batch_completion =
> &batch_task->batch_completion;
> + struct dsa_completion_record *completion;
> + uint8_t batch_status;
> + uint8_t status;
> + bool *results = batch_task->results;
> + uint32_t count = batch_task->batch_descriptor.desc_count;
> +
> + poll_completion(batch_completion,
> + batch_task->batch_descriptor.opcode);
> +
> + batch_status = batch_completion->status;
> +
> + if (batch_status == DSA_COMP_SUCCESS) {
> + if (batch_completion->bytes_completed == count) {
> + // Let's skip checking for each descriptors' completion status
> + // if the batch descriptor says all succedded.
> + for (int i = 0; i < count; i++) {
> + assert(batch_task->completions[i].status ==
> DSA_COMP_SUCCESS);
> + results[i] = (batch_task->completions[i].result == 0);
> + }
> + return;
> + }
> + } else {
> + assert(batch_status == DSA_COMP_BATCH_FAIL ||
> + batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> + }
> +
> + for (int i = 0; i < count; i++) {
> +
extra whitespace
> + completion = &batch_task->completions[i];
> + status = completion->status;
> +
> + if (status == DSA_COMP_SUCCESS) {
> + results[i] = (completion->result == 0);
> + continue;
> + }
> +
> + if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> + fprintf(stderr,
> + "Unexpected completion status = %u.\n", status);
> + assert(false);
return here
> + }
> + }
> +}
> +
> +/**
> + * @brief Handles an asynchronous DSA batch task completion.
> + *
> + * @param task A pointer to the batch buffer zero task structure.
> + */
> +static void
> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> +{
> + batch_task->status = DSA_TASK_COMPLETION;
> + batch_task->completion_callback(batch_task);
> +}
> +
> +/**
> + * @brief The function entry point called by a dedicated DSA
> + * work item completion thread.
> + *
> + * @param opaque A pointer to the thread context.
> + *
> + * @return void* Not used.
> + */
> +static void *
> +dsa_completion_loop(void *opaque)
> +{
> + struct dsa_completion_thread *thread_context =
> + (struct dsa_completion_thread *)opaque;
> + struct buffer_zero_batch_task *batch_task;
> + struct dsa_device_group *group = thread_context->group;
> +
> + rcu_register_thread();
> +
> + thread_context->thread_id = qemu_get_thread_id();
> + qemu_sem_post(&thread_context->sem_init_done);
> +
> + while (thread_context->running) {
> + batch_task = dsa_task_dequeue(group);
> + assert(batch_task != NULL || !group->running);
> + if (!group->running) {
> + assert(!thread_context->running);
This is racy if the compiler reorders "thread_context->running = false"
and "group->running = false". I'd put this under the task_queue_lock or
add a compiler barrier at dsa_completion_thread_stop().
> + break;
> + }
> + if (batch_task->task_type == DSA_TASK) {
> + poll_task_completion(batch_task);
> + } else {
> + assert(batch_task->task_type == DSA_BATCH_TASK);
> + poll_batch_task_completion(batch_task);
> + }
> +
> + dsa_batch_task_complete(batch_task);
> + }
> +
> + rcu_unregister_thread();
> + return NULL;
> +}
> +
> +/**
> + * @brief Initializes a DSA completion thread.
> + *
> + * @param completion_thread A pointer to the completion thread context.
> + * @param group A pointer to the DSA device group.
> + */
> +static void
> +dsa_completion_thread_init(
> + struct dsa_completion_thread *completion_thread,
> + struct dsa_device_group *group)
> +{
> + completion_thread->stopping = false;
> + completion_thread->running = true;
> + completion_thread->thread_id = -1;
> + qemu_sem_init(&completion_thread->sem_init_done, 0);
> + completion_thread->group = group;
> +
> + qemu_thread_create(&completion_thread->thread,
> + DSA_COMPLETION_THREAD,
> + dsa_completion_loop,
> + completion_thread,
> + QEMU_THREAD_JOINABLE);
> +
> + /* Wait for initialization to complete */
> + while (completion_thread->thread_id == -1) {
> + qemu_sem_wait(&completion_thread->sem_init_done);
> + }
This is racy, the thread can set 'thread_id' before this enters the loop
and the semaphore will be left unmatched. Not a huge deal but it might
cause confusion when debugging the initialization.
> +}
> +
> +/**
> + * @brief Stops the completion thread (and implicitly, the device group).
> + *
> + * @param opaque A pointer to the completion thread.
> + */
> +static void dsa_completion_thread_stop(void *opaque)
> +{
> + struct dsa_completion_thread *thread_context =
> + (struct dsa_completion_thread *)opaque;
> +
> + struct dsa_device_group *group = thread_context->group;
> +
> + qemu_mutex_lock(&group->task_queue_lock);
> +
> + thread_context->stopping = true;
> + thread_context->running = false;
> +
> + dsa_device_group_stop(group);
> +
> + qemu_cond_signal(&group->task_queue_cond);
> + qemu_mutex_unlock(&group->task_queue_lock);
> +
> + qemu_thread_join(&thread_context->thread);
> +
> + qemu_sem_destroy(&thread_context->sem_init_done);
> +}
> +
> /**
> * @brief Check if DSA is running.
> *
> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task
> *batch_task)
> */
> bool dsa_is_running(void)
> {
> - return false;
> + return completion_thread.running;
> }
>
> static void
> @@ -481,6 +720,7 @@ void dsa_start(void)
> return;
> }
> dsa_device_group_start(&dsa_group);
> + dsa_completion_thread_init(&completion_thread, &dsa_group);
> }
>
> /**
> @@ -496,6 +736,7 @@ void dsa_stop(void)
> return;
> }
>
> + dsa_completion_thread_stop(&completion_thread);
> dsa_empty_task_queue(group);
> }
- Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model.,
Fabiano Rosas <=