[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchr
From: |
Hao Xiang |
Subject: |
Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model. |
Date: |
Mon, 18 Dec 2023 21:12:15 -0800 |
On Mon, Dec 18, 2023 at 5:34 PM Wang, Lei <lei4.wang@intel.com> wrote:
>
> On 12/19/2023 2:57, Hao Xiang wrote:> On Sun, Dec 17, 2023 at 7:11 PM Wang,
> Lei
> <lei4.wang@intel.com> wrote:
> >>
> >> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA
> >> task
> >> completion.
> >>> * DSA completion thread runs a loop and poll for completed tasks.
> >>> * Start and stop DSA completion thread during DSA device start stop.
> >>>
> >>> User space application can directly submit task to Intel DSA
> >>> accelerator by writing to DSA's device memory (mapped in user space).
> >>
> >>> + }
> >>> + return;
> >>> + }
> >>> + } else {
> >>> + assert(batch_status == DSA_COMP_BATCH_FAIL ||
> >>> + batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> >>
> >> Nit: indentation is broken here.
> >>
> >>> + }
> >>> +
> >>> + for (int i = 0; i < count; i++) {
> >>> +
> >>> + completion = &batch_task->completions[i];
> >>> + status = completion->status;
> >>> +
> >>> + if (status == DSA_COMP_SUCCESS) {
> >>> + results[i] = (completion->result == 0);
> >>> + continue;
> >>> + }
> >>> +
> >>> + if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> >>> + fprintf(stderr,
> >>> + "Unexpected completion status = %u.\n", status);
> >>> + assert(false);
> >>> + }
> >>> + }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Handles an asynchronous DSA batch task completion.
> >>> + *
> >>> + * @param task A pointer to the batch buffer zero task structure.
> >>> + */
> >>> +static void
> >>> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> >>> +{
> >>> + batch_task->status = DSA_TASK_COMPLETION;
> >>> + batch_task->completion_callback(batch_task);
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief The function entry point called by a dedicated DSA
> >>> + * work item completion thread.
> >>> + *
> >>> + * @param opaque A pointer to the thread context.
> >>> + *
> >>> + * @return void* Not used.
> >>> + */
> >>> +static void *
> >>> +dsa_completion_loop(void *opaque)
> >>
> >> Per my understanding, if a multifd sending thread corresponds to a DSA
> >> device,
> >> then the batch tasks are executed in parallel which means a task may be
> >> completed slower than another even if this task is enqueued earlier than
> >> it. If
> >> we poll on the slower task first it will block the handling of the faster
> >> one,
> >> even if the zero checking task for that thread is finished and it can go
> >> ahead
> >> and send the data to the wire, this may lower the network resource
> >> utilization.
> >>
> >
> > Hi Lei, thanks for reviewing. You are correct that we can keep pulling
> > a task enqueued first while others in the queue have already been
> > completed. In fact, only one DSA completion thread (pulling thread) is
> > used here even when multiple DSA devices are used. The pulling loop is
> > the most CPU intensive activity in the DSA workflow and that acts
> > directly against the goal of saving CPU usage. The trade-off I want to
> > take here is a slightly higher latency on DSA task completion but more
> > CPU savings. A single DSA engine can reach 30 GB/s throughput on
> > memory comparison operation. We use kernel tcp stack for network
> > transfer. The best I see is around 10GB/s throughput. RDMA can
> > potentially go higher but I am not sure if it can go higher than 30
> > GB/s throughput anytime soon.
>
> Hi Hao, that makes sense, if the DSA is faster than the network, then a little
> bit of latency in DSA checking is tolerable. In the long term, I think the
> best
> form of the DSA task checking thread is to use an fd or such sort of thing
> that
> can multiplex the checking of different DSA devices, then we can serve the DSA
> task in the order they complete rather than FCFS.
>
I have experimented using N completion threads and each thread pulls
tasks submitted to a particular DSA device. That approach uses too
many CPU cycles. If Intel can come up with a better workflow for DSA
completion, there is definitely space for improvement here.
> >
> >>> +{
> >>> + struct dsa_completion_thread *thread_context =
> >>> + (struct dsa_completion_thread *)opaque;
> >>> + struct buffer_zero_batch_task *batch_task;
> >>> + struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> + rcu_register_thread();
> >>> +
> >>> + thread_context->thread_id = qemu_get_thread_id();
> >>> + qemu_sem_post(&thread_context->sem_init_done);
> >>> +
> >>> + while (thread_context->running) {
> >>> + batch_task = dsa_task_dequeue(group);
> >>> + assert(batch_task != NULL || !group->running);
> >>> + if (!group->running) {
> >>> + assert(!thread_context->running);
> >>> + break;
> >>> + }
> >>> + if (batch_task->task_type == DSA_TASK) {
> >>> + poll_task_completion(batch_task);
> >>> + } else {
> >>> + assert(batch_task->task_type == DSA_BATCH_TASK);
> >>> + poll_batch_task_completion(batch_task);
> >>> + }
> >>> +
> >>> + dsa_batch_task_complete(batch_task);
> >>> + }
> >>> +
> >>> + rcu_unregister_thread();
> >>> + return NULL;
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Initializes a DSA completion thread.
> >>> + *
> >>> + * @param completion_thread A pointer to the completion thread context.
> >>> + * @param group A pointer to the DSA device group.
> >>> + */
> >>> +static void
> >>> +dsa_completion_thread_init(
> >>> + struct dsa_completion_thread *completion_thread,
> >>> + struct dsa_device_group *group)
> >>> +{
> >>> + completion_thread->stopping = false;
> >>> + completion_thread->running = true;
> >>> + completion_thread->thread_id = -1;
> >>> + qemu_sem_init(&completion_thread->sem_init_done, 0);
> >>> + completion_thread->group = group;
> >>> +
> >>> + qemu_thread_create(&completion_thread->thread,
> >>> + DSA_COMPLETION_THREAD,
> >>> + dsa_completion_loop,
> >>> + completion_thread,
> >>> + QEMU_THREAD_JOINABLE);
> >>> +
> >>> + /* Wait for initialization to complete */
> >>> + while (completion_thread->thread_id == -1) {
> >>> + qemu_sem_wait(&completion_thread->sem_init_done);
> >>> + }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Stops the completion thread (and implicitly, the device group).
> >>> + *
> >>> + * @param opaque A pointer to the completion thread.
> >>> + */
> >>> +static void dsa_completion_thread_stop(void *opaque)
> >>> +{
> >>> + struct dsa_completion_thread *thread_context =
> >>> + (struct dsa_completion_thread *)opaque;
> >>> +
> >>> + struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> + qemu_mutex_lock(&group->task_queue_lock);
> >>> +
> >>> + thread_context->stopping = true;
> >>> + thread_context->running = false;
> >>> +
> >>> + dsa_device_group_stop(group);
> >>> +
> >>> + qemu_cond_signal(&group->task_queue_cond);
> >>> + qemu_mutex_unlock(&group->task_queue_lock);
> >>> +
> >>> + qemu_thread_join(&thread_context->thread);
> >>> +
> >>> + qemu_sem_destroy(&thread_context->sem_init_done);
> >>> +}
> >>> +
> >>> /**
> >>> * @brief Check if DSA is running.
> >>> *
> >>> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task
> >>> *batch_task)
> >>> */
> >>> bool dsa_is_running(void)
> >>> {
> >>> - return false;
> >>> + return completion_thread.running;
> >>> }
> >>>
> >>> static void
> >>> @@ -481,6 +720,7 @@ void dsa_start(void)
> >>> return;
> >>> }
> >>> dsa_device_group_start(&dsa_group);
> >>> + dsa_completion_thread_init(&completion_thread, &dsa_group);
> >>> }
> >>>
> >>> /**
> >>> @@ -496,6 +736,7 @@ void dsa_stop(void)
> >>> return;
> >>> }
> >>>
> >>> + dsa_completion_thread_stop(&completion_thread);
> >>> dsa_empty_task_queue(group);
> >>> }
> >>>