qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-block] [PATCH 18/21] backup: new async architecture


From: Stefan Hajnoczi
Subject: Re: [Qemu-block] [PATCH 18/21] backup: new async architecture
Date: Wed, 1 Feb 2017 16:13:21 +0000
User-agent: Mutt/1.7.1 (2016-10-04)

On Fri, Dec 23, 2016 at 05:29:01PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> @@ -120,11 +257,42 @@ static bool coroutine_fn yield_and_check(BackupBlockJob 
> *job)
>          uint64_t delay_ns = ratelimit_calculate_delay(&job->limit,
>                                                        job->sectors_read);
>          job->sectors_read = 0;
> +        job->delayed = true;
> +        trace_backup_sleep_delay(block_job_is_cancelled(&job->common),
> +                                 block_job_should_pause(&job->common));
>          block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, delay_ns);
> +        job->delayed = false;
>      } else {
> -        block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, 0);
> +        trace_backup_sleep_zero(block_job_is_cancelled(&job->common),
> +                                block_job_should_pause(&job->common));
> +        block_job_pause_point(&job->common);
> +    }
> +
> +    trace_backup_sleep_finish();
> +}
> +
> +/* backup_busy_sleep
> + * Just yield, without setting busy=false

How does this interact with block_job_detach_aio_context() and
block_job_drain()?  Jobs must reach pause points regularly so that
cancellation and detaching AioContexts works.

> @@ -132,69 +300,246 @@ static bool coroutine_fn 
> yield_and_check(BackupBlockJob *job)
>      return false;
>  }
>  
> -static int coroutine_fn backup_do_read(BackupBlockJob *job,
> -                                       int64_t offset, unsigned int bytes,
> -                                       QEMUIOVector *qiov)
> +static void backup_job_wait_workers(BackupBlockJob *job)

Missing coroutine_fn.  This function yields and will crash unless called
from coroutine context.  Please use coroutine_fn throughout the code so
it's clear when a function is only allowed to be called from coroutine
context.

> +static void backup_worker_pause(BackupBlockJob *job)
> +{
> +    job->nb_busy_workers--;
> +
> +    trace_backup_worker_pause(qemu_coroutine_self(), job->nb_busy_workers,
> +                              job->waiting_for_workers);
> +
> +    if (job->nb_busy_workers == 0 && job->waiting_for_workers) {
> +        qemu_coroutine_add_next(job->common.co);
> +    }
> +
> +    qemu_co_queue_wait(&job->paused_workers);
> +
> +    trace_backup_worker_unpause(qemu_coroutine_self());
> +
> +    job->nb_busy_workers++;

This is similar to rwlock.  The main coroutine would use wrlock and the
workers would use rdlock.

rwlock avoids situations where the main blockjob re-enters a worker and
vice versa.  qemu_coroutine_add_next() is a lower-level primitive and
does not prevent this type of bug.

Please use rwlock.

> +static inline bool check_delay(BackupBlockJob *job)
> +{
> +    uint64_t delay_ns;
> +
> +    if (!job->common.speed) {
> +        return false;
> +    }
> +
> +    delay_ns = ratelimit_calculate_delay(&job->limit, job->sectors_read);
> +    job->sectors_read = 0;
> +
> +    if (delay_ns == 0) {
> +        if (job->delayed) {
> +            job->delayed = false;
> +            qemu_co_queue_restart_all(&job->paused_workers);
>          }
> +        return false;
> +    }
>  
> -        return ret;
> +    return job->delayed = true;

This looks like a "== vs =" bug.  Please reformat it so readers don't
have to puzzle out what you meant:

job->delayed = true;
return true;

> +static void coroutine_fn backup_worker_co(void *opaque)
> +{
> +    BackupBlockJob *job = opaque;
> +
> +    job->running_workers++;
> +    job->nb_busy_workers++;
> +
> +    while (true) {
> +        int64_t cluster = backup_get_work(job);
> +        trace_backup_worker_got_work(job, qemu_coroutine_self(), cluster);
> +
> +        switch (cluster) {
> +        case BACKUP_WORKER_STOP:
> +            job->nb_busy_workers--;
> +            job->running_workers--;
> +            if (job->nb_busy_workers == 0 && job->waiting_for_workers) {
> +                qemu_coroutine_add_next(job->common.co);

Is there a reason for using qemu_coroutine_add_next() instead of
qemu_coroutine_enter()?

I think neither function prevents a crash if backup_get_work() returns
BACKUP_WORKER_STOP and this coroutine has never yielded yet.  We would
try to re-enter the main blockjob coroutine.

>  static int coroutine_fn backup_before_write_notify(
>          NotifierWithReturn *notifier,
>          void *opaque)
>  {
> -    BackupBlockJob *job = container_of(notifier, BackupBlockJob, 
> before_write);
> -    BdrvTrackedRequest *req = opaque;
> -    int64_t sector_num = req->offset >> BDRV_SECTOR_BITS;
> -    int nb_sectors = req->bytes >> BDRV_SECTOR_BITS;
> +    BdrvTrackedRequest *tr = opaque;
> +    NotifierRequest *nr;
> +    BackupBlockJob *job = (BackupBlockJob *)tr->bs->job;
> +    int64_t start = tr->offset / job->cluster_size;
> +    int64_t end = DIV_ROUND_UP(tr->offset + tr->bytes, job->cluster_size);
> +    int ret = 0;
> +
> +    assert((tr->offset & (BDRV_SECTOR_SIZE - 1)) == 0);
> +    assert((tr->bytes & (BDRV_SECTOR_SIZE - 1)) == 0);

Are these assertions still necessary?  req->bytes >> BDRV_SECTOR_BITS
rounds down so we needed a multiple of BDRV_SECTOR_SIZE.  Now
DIV_ROUND_UP() is used so it doesn't matter.

> +
> +    nr = add_notif_req(job, start, end, qemu_coroutine_self());
>  
> -    assert(req->bs == blk_bs(job->common.blk));
> -    assert((req->offset & (BDRV_SECTOR_SIZE - 1)) == 0);
> -    assert((req->bytes & (BDRV_SECTOR_SIZE - 1)) == 0);
> +    if (nr == NULL) {
> +        trace_backup_before_write_notify_skip(job, tr->offset, tr->bytes);
> +    } else {
> +        trace_backup_before_write_notify_start(job, tr->offset, tr->bytes, 
> nr,
> +                                               nr->start, nr->end, 
> nr->nb_wait);
> +
> +        if (!job->has_errors) {
> +            qemu_co_queue_restart_all(&job->paused_workers);
> +        }
> +        co_aio_sleep_ns(blk_get_aio_context(job->common.blk),
> +                        QEMU_CLOCK_REALTIME, WRITE_NOTIFY_TIMEOUT_NS);
> +        if (nr->nb_wait > 0) {
> +            /* timer expired and read request not finished */
> +            ret = -EINVAL;

#define EINVAL          22      /* Invalid argument */

Why did you choose this errno?

EIO is the errno that kernel uses when I/O fails (e.g. hardware timeout).

> @@ -586,50 +923,62 @@ static void coroutine_fn backup_run(void *opaque)
>      BackupCompleteData *data;
>      BlockDriverState *bs = blk_bs(job->common.blk);
>      int64_t end;
> -    int ret = 0;
> +    int i;
> +    bool is_top = job->sync_mode == MIRROR_SYNC_MODE_TOP;
> +    bool is_full = job->sync_mode == MIRROR_SYNC_MODE_FULL;
> +
> +    trace_backup_run();

This trace event isn't useful if the guest has multiple drives.  Please
include arguments that correlate the event with specific objects like
the BlockBackend/BlockDriverState/BlockJob instances, I/O request sector
and length, etc.

>  BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError 
> on_err,
>                                          int is_read, int error)
>  {

block_job_get_error_action() was copy-pasted from
block_job_error_action().  Now there are two functions with similar
names that duplicate code.

Why can't you use the existing block_job_error_action() function?

Attachment: signature.asc
Description: PGP signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]