[Top][All Lists]

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchr

From: Wang, Lei
Subject: Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model.
Date: Tue, 19 Dec 2023 09:33:29 +0800
User-agent: Mozilla Thunderbird

On 12/19/2023 2:57, Hao Xiang wrote:> On Sun, Dec 17, 2023 at 7:11 PM Wang, Lei
<lei4.wang@intel.com> wrote:
>> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA 
>> task
>> completion.
>>> * DSA completion thread runs a loop and poll for completed tasks.
>>> * Start and stop DSA completion thread during DSA device start stop.
>>> User space application can directly submit task to Intel DSA
>>> accelerator by writing to DSA's device memory (mapped in user space).
>>> +            }
>>> +            return;
>>> +        }
>>> +    } else {
>>> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
>>> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
>> Nit: indentation is broken here.
>>> +    }
>>> +
>>> +    for (int i = 0; i < count; i++) {
>>> +
>>> +        completion = &batch_task->completions[i];
>>> +        status = completion->status;
>>> +
>>> +        if (status == DSA_COMP_SUCCESS) {
>>> +            results[i] = (completion->result == 0);
>>> +            continue;
>>> +        }
>>> +
>>> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
>>> +            fprintf(stderr,
>>> +                    "Unexpected completion status = %u.\n", status);
>>> +            assert(false);
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +/**
>>> + * @brief Handles an asynchronous DSA batch task completion.
>>> + *
>>> + * @param task A pointer to the batch buffer zero task structure.
>>> + */
>>> +static void
>>> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
>>> +{
>>> +    batch_task->status = DSA_TASK_COMPLETION;
>>> +    batch_task->completion_callback(batch_task);
>>> +}
>>> +
>>> +/**
>>> + * @brief The function entry point called by a dedicated DSA
>>> + *        work item completion thread.
>>> + *
>>> + * @param opaque A pointer to the thread context.
>>> + *
>>> + * @return void* Not used.
>>> + */
>>> +static void *
>>> +dsa_completion_loop(void *opaque)
>> Per my understanding, if a multifd sending thread corresponds to a DSA 
>> device,
>> then the batch tasks are executed in parallel which means a task may be
>> completed slower than another even if this task is enqueued earlier than it. 
>> If
>> we poll on the slower task first it will block the handling of the faster 
>> one,
>> even if the zero checking task for that thread is finished and it can go 
>> ahead
>> and send the data to the wire, this may lower the network resource 
>> utilization.
> Hi Lei, thanks for reviewing. You are correct that we can keep pulling
> a task enqueued first while others in the queue have already been
> completed. In fact, only one DSA completion thread (pulling thread) is
> used here even when multiple DSA devices are used. The pulling loop is
> the most CPU intensive activity in the DSA workflow and that acts
> directly against the goal of saving CPU usage. The trade-off I want to
> take here is a slightly higher latency on DSA task completion but more
> CPU savings. A single DSA engine can reach 30 GB/s throughput on
> memory comparison operation. We use kernel tcp stack for network
> transfer. The best I see is around 10GB/s throughput.  RDMA can
> potentially go higher but I am not sure if it can go higher than 30
> GB/s throughput anytime soon.

Hi Hao, that makes sense, if the DSA is faster than the network, then a little
bit of latency in DSA checking is tolerable. In the long term, I think the best
form of the DSA task checking thread is to use an fd or such sort of thing that
can multiplex the checking of different DSA devices, then we can serve the DSA
task in the order they complete rather than FCFS.

>>> +{
>>> +    struct dsa_completion_thread *thread_context =
>>> +        (struct dsa_completion_thread *)opaque;
>>> +    struct buffer_zero_batch_task *batch_task;
>>> +    struct dsa_device_group *group = thread_context->group;
>>> +
>>> +    rcu_register_thread();
>>> +
>>> +    thread_context->thread_id = qemu_get_thread_id();
>>> +    qemu_sem_post(&thread_context->sem_init_done);
>>> +
>>> +    while (thread_context->running) {
>>> +        batch_task = dsa_task_dequeue(group);
>>> +        assert(batch_task != NULL || !group->running);
>>> +        if (!group->running) {
>>> +            assert(!thread_context->running);
>>> +            break;
>>> +        }
>>> +        if (batch_task->task_type == DSA_TASK) {
>>> +            poll_task_completion(batch_task);
>>> +        } else {
>>> +            assert(batch_task->task_type == DSA_BATCH_TASK);
>>> +            poll_batch_task_completion(batch_task);
>>> +        }
>>> +
>>> +        dsa_batch_task_complete(batch_task);
>>> +    }
>>> +
>>> +    rcu_unregister_thread();
>>> +    return NULL;
>>> +}
>>> +
>>> +/**
>>> + * @brief Initializes a DSA completion thread.
>>> + *
>>> + * @param completion_thread A pointer to the completion thread context.
>>> + * @param group A pointer to the DSA device group.
>>> + */
>>> +static void
>>> +dsa_completion_thread_init(
>>> +    struct dsa_completion_thread *completion_thread,
>>> +    struct dsa_device_group *group)
>>> +{
>>> +    completion_thread->stopping = false;
>>> +    completion_thread->running = true;
>>> +    completion_thread->thread_id = -1;
>>> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
>>> +    completion_thread->group = group;
>>> +
>>> +    qemu_thread_create(&completion_thread->thread,
>>> +                       DSA_COMPLETION_THREAD,
>>> +                       dsa_completion_loop,
>>> +                       completion_thread,
>>> +                       QEMU_THREAD_JOINABLE);
>>> +
>>> +    /* Wait for initialization to complete */
>>> +    while (completion_thread->thread_id == -1) {
>>> +        qemu_sem_wait(&completion_thread->sem_init_done);
>>> +    }
>>> +}
>>> +
>>> +/**
>>> + * @brief Stops the completion thread (and implicitly, the device group).
>>> + *
>>> + * @param opaque A pointer to the completion thread.
>>> + */
>>> +static void dsa_completion_thread_stop(void *opaque)
>>> +{
>>> +    struct dsa_completion_thread *thread_context =
>>> +        (struct dsa_completion_thread *)opaque;
>>> +
>>> +    struct dsa_device_group *group = thread_context->group;
>>> +
>>> +    qemu_mutex_lock(&group->task_queue_lock);
>>> +
>>> +    thread_context->stopping = true;
>>> +    thread_context->running = false;
>>> +
>>> +    dsa_device_group_stop(group);
>>> +
>>> +    qemu_cond_signal(&group->task_queue_cond);
>>> +    qemu_mutex_unlock(&group->task_queue_lock);
>>> +
>>> +    qemu_thread_join(&thread_context->thread);
>>> +
>>> +    qemu_sem_destroy(&thread_context->sem_init_done);
>>> +}
>>> +
>>>  /**
>>>   * @brief Check if DSA is running.
>>>   *
>>> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task 
>>> *batch_task)
>>>   */
>>>  bool dsa_is_running(void)
>>>  {
>>> -    return false;
>>> +    return completion_thread.running;
>>>  }
>>>  static void
>>> @@ -481,6 +720,7 @@ void dsa_start(void)
>>>          return;
>>>      }
>>>      dsa_device_group_start(&dsa_group);
>>> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>>>  }
>>>  /**
>>> @@ -496,6 +736,7 @@ void dsa_stop(void)
>>>          return;
>>>      }
>>> +    dsa_completion_thread_stop(&completion_thread);
>>>      dsa_empty_task_queue(group);
>>>  }

reply via email to

[Prev in Thread] Current Thread [Next in Thread]