[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-block] [PATCH 18/21] backup: new async architecture
From: |
Stefan Hajnoczi |
Subject: |
Re: [Qemu-block] [PATCH 18/21] backup: new async architecture |
Date: |
Wed, 1 Feb 2017 16:13:21 +0000 |
User-agent: |
Mutt/1.7.1 (2016-10-04) |
On Fri, Dec 23, 2016 at 05:29:01PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> @@ -120,11 +257,42 @@ static bool coroutine_fn yield_and_check(BackupBlockJob
> *job)
> uint64_t delay_ns = ratelimit_calculate_delay(&job->limit,
> job->sectors_read);
> job->sectors_read = 0;
> + job->delayed = true;
> + trace_backup_sleep_delay(block_job_is_cancelled(&job->common),
> + block_job_should_pause(&job->common));
> block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, delay_ns);
> + job->delayed = false;
> } else {
> - block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, 0);
> + trace_backup_sleep_zero(block_job_is_cancelled(&job->common),
> + block_job_should_pause(&job->common));
> + block_job_pause_point(&job->common);
> + }
> +
> + trace_backup_sleep_finish();
> +}
> +
> +/* backup_busy_sleep
> + * Just yield, without setting busy=false
How does this interact with block_job_detach_aio_context() and
block_job_drain()? Jobs must reach pause points regularly so that
cancellation and detaching AioContexts works.
> @@ -132,69 +300,246 @@ static bool coroutine_fn
> yield_and_check(BackupBlockJob *job)
> return false;
> }
>
> -static int coroutine_fn backup_do_read(BackupBlockJob *job,
> - int64_t offset, unsigned int bytes,
> - QEMUIOVector *qiov)
> +static void backup_job_wait_workers(BackupBlockJob *job)
Missing coroutine_fn. This function yields and will crash unless called
from coroutine context. Please use coroutine_fn throughout the code so
it's clear when a function is only allowed to be called from coroutine
context.
> +static void backup_worker_pause(BackupBlockJob *job)
> +{
> + job->nb_busy_workers--;
> +
> + trace_backup_worker_pause(qemu_coroutine_self(), job->nb_busy_workers,
> + job->waiting_for_workers);
> +
> + if (job->nb_busy_workers == 0 && job->waiting_for_workers) {
> + qemu_coroutine_add_next(job->common.co);
> + }
> +
> + qemu_co_queue_wait(&job->paused_workers);
> +
> + trace_backup_worker_unpause(qemu_coroutine_self());
> +
> + job->nb_busy_workers++;
This is similar to rwlock. The main coroutine would use wrlock and the
workers would use rdlock.
rwlock avoids situations where the main blockjob re-enters a worker and
vice versa. qemu_coroutine_add_next() is a lower-level primitive and
does not prevent this type of bug.
Please use rwlock.
> +static inline bool check_delay(BackupBlockJob *job)
> +{
> + uint64_t delay_ns;
> +
> + if (!job->common.speed) {
> + return false;
> + }
> +
> + delay_ns = ratelimit_calculate_delay(&job->limit, job->sectors_read);
> + job->sectors_read = 0;
> +
> + if (delay_ns == 0) {
> + if (job->delayed) {
> + job->delayed = false;
> + qemu_co_queue_restart_all(&job->paused_workers);
> }
> + return false;
> + }
>
> - return ret;
> + return job->delayed = true;
This looks like a "== vs =" bug. Please reformat it so readers don't
have to puzzle out what you meant:
job->delayed = true;
return true;
> +static void coroutine_fn backup_worker_co(void *opaque)
> +{
> + BackupBlockJob *job = opaque;
> +
> + job->running_workers++;
> + job->nb_busy_workers++;
> +
> + while (true) {
> + int64_t cluster = backup_get_work(job);
> + trace_backup_worker_got_work(job, qemu_coroutine_self(), cluster);
> +
> + switch (cluster) {
> + case BACKUP_WORKER_STOP:
> + job->nb_busy_workers--;
> + job->running_workers--;
> + if (job->nb_busy_workers == 0 && job->waiting_for_workers) {
> + qemu_coroutine_add_next(job->common.co);
Is there a reason for using qemu_coroutine_add_next() instead of
qemu_coroutine_enter()?
I think neither function prevents a crash if backup_get_work() returns
BACKUP_WORKER_STOP and this coroutine has never yielded yet. We would
try to re-enter the main blockjob coroutine.
> static int coroutine_fn backup_before_write_notify(
> NotifierWithReturn *notifier,
> void *opaque)
> {
> - BackupBlockJob *job = container_of(notifier, BackupBlockJob,
> before_write);
> - BdrvTrackedRequest *req = opaque;
> - int64_t sector_num = req->offset >> BDRV_SECTOR_BITS;
> - int nb_sectors = req->bytes >> BDRV_SECTOR_BITS;
> + BdrvTrackedRequest *tr = opaque;
> + NotifierRequest *nr;
> + BackupBlockJob *job = (BackupBlockJob *)tr->bs->job;
> + int64_t start = tr->offset / job->cluster_size;
> + int64_t end = DIV_ROUND_UP(tr->offset + tr->bytes, job->cluster_size);
> + int ret = 0;
> +
> + assert((tr->offset & (BDRV_SECTOR_SIZE - 1)) == 0);
> + assert((tr->bytes & (BDRV_SECTOR_SIZE - 1)) == 0);
Are these assertions still necessary? req->bytes >> BDRV_SECTOR_BITS
rounds down so we needed a multiple of BDRV_SECTOR_SIZE. Now
DIV_ROUND_UP() is used so it doesn't matter.
> +
> + nr = add_notif_req(job, start, end, qemu_coroutine_self());
>
> - assert(req->bs == blk_bs(job->common.blk));
> - assert((req->offset & (BDRV_SECTOR_SIZE - 1)) == 0);
> - assert((req->bytes & (BDRV_SECTOR_SIZE - 1)) == 0);
> + if (nr == NULL) {
> + trace_backup_before_write_notify_skip(job, tr->offset, tr->bytes);
> + } else {
> + trace_backup_before_write_notify_start(job, tr->offset, tr->bytes,
> nr,
> + nr->start, nr->end,
> nr->nb_wait);
> +
> + if (!job->has_errors) {
> + qemu_co_queue_restart_all(&job->paused_workers);
> + }
> + co_aio_sleep_ns(blk_get_aio_context(job->common.blk),
> + QEMU_CLOCK_REALTIME, WRITE_NOTIFY_TIMEOUT_NS);
> + if (nr->nb_wait > 0) {
> + /* timer expired and read request not finished */
> + ret = -EINVAL;
#define EINVAL 22 /* Invalid argument */
Why did you choose this errno?
EIO is the errno that kernel uses when I/O fails (e.g. hardware timeout).
> @@ -586,50 +923,62 @@ static void coroutine_fn backup_run(void *opaque)
> BackupCompleteData *data;
> BlockDriverState *bs = blk_bs(job->common.blk);
> int64_t end;
> - int ret = 0;
> + int i;
> + bool is_top = job->sync_mode == MIRROR_SYNC_MODE_TOP;
> + bool is_full = job->sync_mode == MIRROR_SYNC_MODE_FULL;
> +
> + trace_backup_run();
This trace event isn't useful if the guest has multiple drives. Please
include arguments that correlate the event with specific objects like
the BlockBackend/BlockDriverState/BlockJob instances, I/O request sector
and length, etc.
> BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError
> on_err,
> int is_read, int error)
> {
block_job_get_error_action() was copy-pasted from
block_job_error_action(). Now there are two functions with similar
names that duplicate code.
Why can't you use the existing block_job_error_action() function?
signature.asc
Description: PGP signature
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- Re: [Qemu-block] [PATCH 18/21] backup: new async architecture,
Stefan Hajnoczi <=