qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v1 1/1] Submit the codes for QEMU disk I/O limit


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [PATCH v1 1/1] Submit the codes for QEMU disk I/O limits.
Date: Fri, 22 Jul 2011 11:54:01 +0100

On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <address@hidden> wrote:
> +static void bdrv_block_timer(void *opaque)
> +{
> +    BlockDriverState *bs = opaque;
> +    BlockQueue *queue = bs->block_queue;
> +    uint64_t intval = 1;
> +
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        BlockIORequest *request;
> +        int ret;
> +
> +        request = QTAILQ_FIRST(&queue->requests);
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +
> +        ret = queue->handler(request);

Please remove the function pointer and call qemu_block_queue_handler()
directly.  This indirection is not needed and makes it harder to
follow the code.

> +        if (ret == 0) {
> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
> +            break;
> +        }
> +
> +        qemu_free(request);
> +    }
> +
> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + 
> qemu_get_clock_ns(vm_clock));

intval is always 1.  The timer should be set to the next earliest deadline.

> +}
> +
>  void bdrv_register(BlockDriver *bdrv)
>  {
>     if (!bdrv->bdrv_aio_readv) {
> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const 
> char *filename,
>         goto free_and_fail;
>     }
>
> +    /* throttling disk I/O limits */
> +    if (bs->io_limits != NULL) {
> +       bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
> +    }
> +
>  #ifndef _WIN32
>     if (bs->is_temporary) {
>         unlink(filename);
> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char 
> *filename, int flags,
>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>     }
>
> +    /* throttling disk I/O limits */
> +    if (bs->io_limits != NULL) {

block_queue is allocated in bdrv_open_common() but these variables are
initialized in bdrv_open().  Can you move them together, I don't see a
reason to keep them apart?

> +       bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
> +       bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
> +       qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));

Why is the timer being scheduled immediately?  There are no queued requests.

> +
> +       bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
> +       bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
> +    }
> +
>     return 0;
>
>  unlink_and_fail:
> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>         if (bs->change_cb)
>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>     }
> +
> +    /* throttling disk I/O limits */
> +    if (bs->block_queue) {
> +       qemu_del_block_queue(bs->block_queue);

3 space indent, should be 4 spaces.

> +    }
> +
> +    if (bs->block_timer) {

qemu_free_timer() will only free() the timer memory but it will not
cancel the timer.  Use qemu_del_timer() first to ensure that the timer
is not pending:

qemu_del_timer(bs->block_timer);

> +        qemu_free_timer(bs->block_timer);
> +    }
>  }
>
>  void bdrv_close_all(void)
> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>     *psecs = bs->secs;
>  }
>
> +/* throttling disk io limits */
> +void bdrv_set_io_limits(BlockDriverState *bs,
> +                           BlockIOLimit *io_limits)
> +{
> +    bs->io_limits            = io_limits;

This function takes ownership of io_limits but never frees it.  I
suggest not taking ownership and just copying io_limits into
bs->io_limits so that the caller does not need to malloc() and the
lifecycle of bs->io_limits is completely under our control.

Easiest would be to turn BlockDriverState.io_limits into:

BlockIOLimit io_limits;

and just copy in bdrv_set_io_limits():

bs->io_limits = *io_limits;

bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
I wouldn't worry about optimizing it out yet.

> +}
> +
>  /* Recognize floppy formats */
>  typedef struct FDFormat {
>     FDriveType drive;
> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, 
> QEMUSnapshotInfo *sn)
>     return buf;
>  }
>
> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
> +                           bool is_write, uint64_t *wait) {
> +    int64_t  real_time;
> +    uint64_t bps_limit   = 0;
> +    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
> +    double   slice_time = 0.1, wait_time;
> +
> +    if (bs->io_limits->bps[2]) {
> +        bps_limit = bs->io_limits->bps[2];

Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.

> +    } else if (bs->io_limits->bps[is_write]) {
> +        bps_limit = bs->io_limits->bps[is_write];
> +    } else {
> +        if (wait) {
> +            *wait = 0;
> +        }
> +
> +        return false;
> +    }
> +
> +    real_time = qemu_get_clock_ns(vm_clock);
> +    if (bs->slice_start[is_write] + 100000000 <= real_time) {

Please define a constant for the 100 ms time slice instead of using
100000000 directly.

> +        bs->slice_start[is_write] = real_time;
> +
> +        bs->io_disps->bytes[is_write] = 0;
> +        if (bs->io_limits->bps[2]) {
> +            bs->io_disps->bytes[!is_write] = 0;
> +        }

All previous data should be discarded when a new time slice starts:
bs->io_disps->bytes[IO_LIMIT_READ] = 0;
bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;

> +    }

Please start the new slice in common code.  That way you can clear
bytes[] and ios[] at the same time and don't need to duplicate this
code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().

> +
> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = 
> %g\n", real_time, bs->slice_start[is_write], elapsed_time);
> +
> +    bytes_limit        = bps_limit * slice_time;
> +    bytes_disp  = bs->io_disps->bytes[is_write];
> +    if (bs->io_limits->bps[2]) {
> +        bytes_disp += bs->io_disps->bytes[!is_write];
> +    }
> +
> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, 
> elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
> +
> +    if (bytes_disp + bytes_res <= bytes_limit) {
> +        if (wait) {
> +            *wait = 0;
> +        }
> +
> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
> +        return false;
> +    }
> +
> +    /* Calc approx time to dispatch */
> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
> +    if (!wait_time) {
> +        wait_time = 1;
> +    }
> +
> +    wait_time = wait_time + (slice_time - elapsed_time);
> +    if (wait) {
> +       *wait = wait_time * 1000000000 + 1;
> +    }
> +
> +    return true;
> +}

After a slice expires all bytes/iops dispatched data is forgotten,
even if there are still requests queued.  This means that requests
issued by the guest immediately after a new 100 ms period will be
issued but existing queued requests will still be waiting.

And since the queued requests don't get their next chance until later,
it's possible that they will be requeued because the requests that the
guest snuck in have brought us to the limit again.

In order to solve this problem, we need to extend the current slice if
there are still requests pending.  To prevent extensions from growing
the slice forever (and keeping too much old data around), it should be
alright to cull data from 2 slices ago.  The simplest way of doing
that is to subtract the bps/iops limits from the bytes/iops
dispatched.

> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, 
> int64_t sector_num,
>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>         return NULL;
>
> +    /* throttling disk read I/O */
> +    if (bs->io_limits != NULL) {
> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, 
> bdrv_aio_readv,
> +                               sector_num, qiov, nb_sectors, cb, opaque);

5 space indent, should be 4.

> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = 
> %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
> +           qemu_mod_timer(bs->block_timer, wait_time + 
> qemu_get_clock_ns(vm_clock));

Imagine 3 requests that need to be queued: A, B, and C.  Since the
timer is unconditionally set each time a request is queued, the timer
will be set to C's wait_time.  However A or B's wait_time might be
earlier and we will miss that deadline!

We really need a priority queue here.  QEMU's timers solve the same
problem with a sorted list, which might actually be faster for short
lists where a fancy data structure has too much overhead.

> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                       BlockDriverState *bs,
> +                       BlockRequestHandler *handler,
> +                       int64_t sector_num,
> +                       QEMUIOVector *qiov,
> +                       int nb_sectors,
> +                       BlockDriverCompletionFunc *cb,
> +                       void *opaque)
> +{
> +    BlockIORequest *request;
> +    BlockDriverAIOCB *acb;
> +
> +    request = qemu_malloc(sizeof(BlockIORequest));
> +    request->bs = bs;
> +    request->handler = handler;
> +    request->sector_num = sector_num;
> +    request->qiov = qiov;
> +    request->nb_sectors = nb_sectors;
> +    request->cb = cb;
> +    request->opaque = opaque;
> +
> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +
> +    acb = qemu_malloc(sizeof(*acb));
> +    acb->bs = bs;
> +    acb->cb = cb;
> +    acb->opaque = opaque;

AIOPool and qemu_aio_get() should be used instead of manually doing
this.  Otherwise bdrv_aio_cancel() does not work.

In order to free our acb when the I/O request completes we need a cb
function.  Right now acb is leaking.

> diff --git a/qemu-config.c b/qemu-config.c
> index efa892c..ad5f2d9 100644
> --- a/qemu-config.c
> +++ b/qemu-config.c
> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>             .name = "boot",
>             .type = QEMU_OPT_BOOL,
>             .help = "make this a boot drive",
> +        },{
> +            .name = "iops",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops for this drive",

Let's explain what "iops" means:

"limit total I/O operations per second"

> +        },{
> +            .name = "iops_rd",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_rd for this drive",

"limit read operations per second"

> +        },{
> +            .name = "iops_wr",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_wr for this drive",

"limit write operations per second"

> +        },{
> +            .name = "bps",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its bps for this drive",

"limit total bytes per second"

> +        },{
> +            .name = "bps_rd",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its bps_rd for this drive",

"limit read bytes per second"

> +        },{
> +            .name = "bps_wr",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_wr for this drive",

"limit write bytes per second"

Stefan



reply via email to

[Prev in Thread] Current Thread [Next in Thread]