[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 05/10] block: add block job transactions
From: |
Fam Zheng |
Subject: |
Re: [Qemu-devel] [PATCH 05/10] block: add block job transactions |
Date: |
Fri, 26 Jun 2015 14:41:04 +0800 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
On Thu, 06/25 13:12, Stefan Hajnoczi wrote:
> Sometimes block jobs must execute as a transaction group. Finishing
> jobs wait until all other jobs are ready to complete successfully.
> Failure or cancellation of one job cancels the other jobs in the group.
>
> Signed-off-by: Stefan Hajnoczi <address@hidden>
Reviewed-by: Fam Zheng <address@hidden>
> ---
> blockjob.c | 160
> ++++++++++++++++++++++++++++++++++++++++++++++
> include/block/block.h | 1 +
> include/block/block_int.h | 3 +-
> include/block/blockjob.h | 49 ++++++++++++++
> trace-events | 4 ++
> 5 files changed, 216 insertions(+), 1 deletion(-)
>
> diff --git a/blockjob.c b/blockjob.c
> index ec46fad..3c6f1d4 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
>
> qemu_bh_schedule(data->bh);
> }
> +
> +/* Transactional group of block jobs */
> +struct BlockJobTxn {
> + /* Jobs may be in different AioContexts so protect all fields */
> + QemuMutex lock;
> +
> + /* Reference count for txn object */
> + unsigned int ref;
> +
> + /* Is this txn cancelling its jobs? */
> + bool aborting;
> +
> + /* Number of jobs still running */
> + unsigned int jobs_pending;
> +
> + /* List of jobs */
> + QLIST_HEAD(, BlockJob) jobs;
> +};
> +
> +BlockJobTxn *block_job_txn_new(void)
> +{
> + BlockJobTxn *txn = g_new(BlockJobTxn, 1);
> + qemu_mutex_init(&txn->lock);
> + txn->ref = 1; /* dropped by block_job_txn_begin() */
> + txn->aborting = false;
> + txn->jobs_pending = 0;
> + QLIST_INIT(&txn->jobs);
> + return txn;
> +}
> +
> +static void block_job_txn_unref(BlockJobTxn *txn)
> +{
> + qemu_mutex_lock(&txn->lock);
> +
> + if (--txn->ref > 0) {
> + qemu_mutex_unlock(&txn->lock);
> + return;
> + }
> +
> + qemu_mutex_unlock(&txn->lock);
> + qemu_mutex_destroy(&txn->lock);
> + g_free(txn);
> +}
> +
> +/* The purpose of this is to keep txn alive until all jobs have been added */
> +void block_job_txn_begin(BlockJobTxn *txn)
> +{
> + block_job_txn_unref(txn);
> +}
> +
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
> +{
> + if (!txn) {
> + return;
> + }
> +
> + assert(!job->txn);
> + job->txn = txn;
> +
> + qemu_mutex_lock(&txn->lock);
> + txn->ref++;
> + txn->jobs_pending++;
> + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
> + qemu_mutex_unlock(&txn->lock);
> +}
> +
> +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
> + * successful completion. Runs from main loop.
> + */
> +static void block_job_txn_complete(BlockJob *job, void *opaque)
> +{
> + BlockJobTxn *txn = opaque;
> + BlockJob *other_job;
> + bool aborting = txn->aborting;
> +
> + qemu_mutex_lock(&txn->lock);
> + txn->ref++; /* keep txn alive until the end of this loop */
> +
> + QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
> + AioContext *ctx;
> +
> + qemu_mutex_unlock(&txn->lock);
> + ctx = bdrv_get_aio_context(other_job->bs);
> + aio_context_acquire(ctx);
> +
> + /* Cancel all other jobs if aborting. Don't cancel our own failed
> job
> + * since cancellation throws away the error value.
> + */
> + if (aborting && other_job != job) {
> + block_job_cancel(other_job);
> + } else {
> + block_job_enter(other_job);
> + }
> +
> + aio_context_release(ctx);
> + qemu_mutex_lock(&txn->lock);
> + }
> +
> + qemu_mutex_unlock(&txn->lock);
> + block_job_txn_unref(txn);
> +}
> +
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> + BlockJob *job,
> + int ret)
> +{
> + if (!txn) {
> + return;
> + }
> +
> + qemu_mutex_lock(&txn->lock);
> +
> + /* This function is entered in 3 cases:
> + *
> + * 1. Successful job completion - wait for other jobs
> + * 2. First failed/cancelled job in txn - cancel other jobs and wait
> + * 3. Subsequent cancelled jobs - finish immediately, don't wait
> + */
> + trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
> +
> block_job_is_cancelled(job),
> + txn->aborting,
> + txn->jobs_pending);
> +
> + if (txn->aborting) { /* Case 3 */
> + assert(block_job_is_cancelled(job));
> + goto out; /* already cancelled, don't yield */
> + }
> +
> + if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
> +abort:
> + txn->aborting = true;
> + block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> + } else { /* Case 1 */
> + if (--txn->jobs_pending == 0) {
> + block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> + }
> + }
> +
> + /* Wait for block_job_txn_complete() */
> + do {
> + qemu_mutex_unlock(&txn->lock);
> + job->busy = false;
> + qemu_coroutine_yield();
> + job->busy = true;
> + qemu_mutex_lock(&txn->lock);
> +
> + if (block_job_is_cancelled(job) && !txn->aborting) {
> + goto abort; /* this job just got cancelled by the user */
> + }
> + } while (!txn->aborting && txn->jobs_pending > 0);
> +
> +out:
> + trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
> +
> block_job_is_cancelled(job),
> + txn->aborting,
> + txn->jobs_pending);
> +
> + qemu_mutex_unlock(&txn->lock);
> + block_job_txn_unref(txn);
> +}
> diff --git a/include/block/block.h b/include/block/block.h
> index a4c505d..cb19c73 100644
> --- a/include/block/block.h
> +++ b/include/block/block.h
> @@ -13,6 +13,7 @@
> typedef struct BlockDriver BlockDriver;
> typedef struct BlockJob BlockJob;
> typedef struct BdrvChildRole BdrvChildRole;
> +typedef struct BlockJobTxn BlockJobTxn;
>
> typedef struct BlockDriverInfo {
> /* in bytes, 0 if irrelevant */
> diff --git a/include/block/block_int.h b/include/block/block_int.h
> index ea3e7f0..812a18a 100644
> --- a/include/block/block_int.h
> +++ b/include/block/block_int.h
> @@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState
> *target,
> * @on_source_error: The action to take upon error reading from the source.
> * @on_target_error: The action to take upon error writing to the target.
> * @cb: Completion function for the job.
> + * @txn: Transaction that this job is part of (may be NULL).
> * @opaque: Opaque pointer value passed to @cb.
> *
> * Start a backup operation on @bs. Clusters in @bs are written to @target
> @@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState
> *target,
> BlockdevOnError on_source_error,
> BlockdevOnError on_target_error,
> BlockCompletionFunc *cb, void *opaque,
> - Error **errp);
> + BlockJobTxn *txn, Error **errp);
>
> void blk_dev_change_media_cb(BlockBackend *blk, bool load);
> bool blk_dev_has_removable_media(BlockBackend *blk);
> diff --git a/include/block/blockjob.h b/include/block/blockjob.h
> index 57d8ef1..ce57e98 100644
> --- a/include/block/blockjob.h
> +++ b/include/block/blockjob.h
> @@ -122,6 +122,10 @@ struct BlockJob {
>
> /** The opaque value that is passed to the completion function. */
> void *opaque;
> +
> + /** Non-NULL if this job is part of a transaction */
> + BlockJobTxn *txn;
> + QLIST_ENTRY(BlockJob) txn_list;
> };
>
> /**
> @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
> BlockJobDeferToMainLoopFn *fn,
> void *opaque);
>
> +/**
> + * block_job_txn_new:
> + *
> + * Allocate and return a new block job transaction. Jobs can be added to the
> + * transaction using block_job_txn_add_job(). block_job_txn_begin() must be
> + * called when all jobs (if any) have been added.
> + *
> + * All jobs in the transaction either complete successfully or fail/cancel
> as a
> + * group. Jobs wait for each other before completing. Cancelling one job
> + * cancels all jobs in the transaction.
> + */
> +BlockJobTxn *block_job_txn_new(void);
> +
> +/**
> + * block_job_txn_add_job:
> + * @txn: The transaction
> + * @job: Job to add to the transaction
> + *
> + * Add @job to the transaction. The @job must not already be in a
> transaction.
> + * The block job driver must call block_job_txn_prepare_to_complete() before
> + * final cleanup and completion.
> + */
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
> +
> +/**
> + * block_job_txn_begin:
> + * @txn: The transaction
> + *
> + * Call this to mark the end of adding jobs to the transaction. This must be
> + * called even if no jobs were added.
> + */
> +void block_job_txn_begin(BlockJobTxn *txn);
> +
> +/**
> + * block_job_txn_prepare_to_complete:
> + * @txn: The transaction
> + * @job: The block job
> + * @ret: Block job return value (0 for success, otherwise job failure)
> + *
> + * Wait for other jobs in the transaction to complete. If @ret is non-zero
> or
> + * @job is cancelled, all other jobs in the transaction will be cancelled.
> + */
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> + BlockJob *job, int ret);
> +
> #endif
> diff --git a/trace-events b/trace-events
> index 52b7efa..b6a43a0 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
> virtio_blk_data_plane_stop(void *s) "dataplane %p"
> virtio_blk_data_plane_process_request(void *s, unsigned int out_num,
> unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u
> head %u"
>
> +# blockjob.c
> +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool
> cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d
> cancelled %d aborting %d jobs_pending %u"
> +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool
> cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d
> cancelled %d aborting %d jobs_pending %u"
> +
> # hw/virtio/dataplane/vring.c
> vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring
> physical %#"PRIx64" desc %p avail %p used %p"
>
> --
> 2.4.3
>
>
- [Qemu-devel] [PATCH 03/10] block: rename BlkTransactionState and BdrvActionOps, (continued)
- [Qemu-devel] [PATCH 05/10] block: add block job transactions, Stefan Hajnoczi, 2015/06/25
- [Qemu-devel] [PATCH 09/10] qmp-commands.hx: Update the supported 'transaction' operations, Stefan Hajnoczi, 2015/06/25
- [Qemu-devel] [PATCH 08/10] iotests: 124 - transactional failure test, Stefan Hajnoczi, 2015/06/25
- [Qemu-devel] [PATCH 10/10] tests: add BlockJobTxn unit test, Stefan Hajnoczi, 2015/06/25
- Re: [Qemu-devel] [PATCH 00/10] block: incremental backup transactions using BlockJobTxn, Stefan Hajnoczi, 2015/06/29