[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v4 3/3] The support for queue timer and throttli
From: |
Zhi Yong Wu |
Subject: |
Re: [Qemu-devel] [PATCH v4 3/3] The support for queue timer and throttling algorithm |
Date: |
Fri, 5 Aug 2011 10:48:11 +0800 |
On Tue, Aug 2, 2011 at 4:39 AM, Ryan Harper <address@hidden> wrote:
> * Zhi Yong Wu <address@hidden> [2011-08-01 01:32]:
>> Note:
>> 1.) When bps/iops limits are specified to a small value such as 511
>> bytes/s, this VM will hang up. We are considering how to handle this senario.
>> 2.) When "dd" command is issued in guest, if its option bs is set to a
>> large value such as "bs=1024K", the result speed will slightly bigger than
>> the limits.
>>
>> For these problems, if you have nice thought, pls let us know.:)
>>
>> Signed-off-by: Zhi Yong Wu <address@hidden>
>> ---
>> block.c | 302
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
>> block.h | 1 -
>> block_int.h | 29 ++++++
>> 3 files changed, 323 insertions(+), 9 deletions(-)
>>
>> diff --git a/block.c b/block.c
>> index 24a25d5..42763a3 100644
>> --- a/block.c
>> +++ b/block.c
>> @@ -29,6 +29,9 @@
>> #include "module.h"
>> #include "qemu-objects.h"
>>
>> +#include "qemu-timer.h"
>> +#include "block/blk-queue.h"
>> +
>> #ifdef CONFIG_BSD
>> #include <sys/types.h>
>> #include <sys/stat.h>
>> @@ -58,6 +61,13 @@ static int bdrv_read_em(BlockDriverState *bs, int64_t
>> sector_num,
>> static int bdrv_write_em(BlockDriverState *bs, int64_t sector_num,
>> const uint8_t *buf, int nb_sectors);
>>
>> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> + bool is_write, double elapsed_time, uint64_t *wait);
>> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
>> + double elapsed_time, uint64_t *wait);
>> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
>> + bool is_write, uint64_t *wait);
>> +
>> static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
>> QTAILQ_HEAD_INITIALIZER(bdrv_states);
>>
>> @@ -90,6 +100,20 @@ int is_windows_drive(const char *filename)
>> }
>> #endif
>>
>> +static int bdrv_io_limits_enable(BlockIOLimit *io_limits)
>> +{
>> + if ((io_limits->bps[0] == 0)
>> + && (io_limits->bps[1] == 0)
>> + && (io_limits->bps[2] == 0)
>> + && (io_limits->iops[0] == 0)
>> + && (io_limits->iops[1] == 0)
>> + && (io_limits->iops[2] == 0)) {
>> + return 0;
>
>
> I'd add a field to BlockIOLimit structure, and just do:
>
> static int bdrv_io_limits_enabled(BlockIOLimit *io_limits)
> {
> return io_limist->enabled;
> }
>
> Update bdrv_set_io_limits() to do the original check you have, and if
> one of the fields is set, update the enabled flag.
>
> We incur that logic on *every* request, so let's make it as cheap as
> possible.
Good point, it has been adopted in qmp/hmp patch.
>
>> + }
>> +
>> + return 1;
>> +}
>> +
>> /* check if the path starts with "<protocol>:" */
>> static int path_has_protocol(const char *path)
>> {
>> @@ -167,6 +191,28 @@ void path_combine(char *dest, int dest_size,
>> }
>> }
>>
>> +static void bdrv_block_timer(void *opaque)
>> +{
>> + BlockDriverState *bs = opaque;
>> + BlockQueue *queue = bs->block_queue;
>> +
>> + while (!QTAILQ_EMPTY(&queue->requests)) {
>> + BlockIORequest *request = NULL;
>> + int ret = 0;
>> +
>> + request = QTAILQ_FIRST(&queue->requests);
>> + QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> + ret = qemu_block_queue_handler(request);
>> + if (ret == 0) {
>> + QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> + break;
>
> btw, I did some tracing and I never saw a request get re-added here.
> Now, ideally, I think you were try to do the following:
>
> The request is coming from the queue, if we exceed our limits, then we'd
> get back NULL from the handler and we'd requeue the request at the
Right.
> head... but that doesn't actually happen.
It could take place. For example, if block req1 comes, it exceed the
limits, the block timer is set to fire in 10ms; When block req2 comes,
it also exceed this limits, the block timer is updated to 3ms; Let us
assume that no block request is coming. When the firing time arrives,
req2 will be serviced; but when next enqueued request req1 is got from
block queue, it could still exceed the limits, it will need to be
enqueued. right?
>
> Rather, if we exceed our limits, we invoke qemu_block_queue_enqueue()
> again, which allocates a whole new request and puts it at the tail.
> I think we want to update the throttling logic in readv/writev to return
> NULL if bs->req_from_queue == true and we exceed the limits. Then this
No, NULL indicate that the block emulation layer fails to start
readv/writev request.
> logic will do the right thing by inserting the request back to the head.
We've used a request pool, Maybe we can release it to this pool when
the request need to be freed.
>
>
>> + }
>> +
>> + qemu_free(request);
>
>
> See my email to blk-queue.c on how we can eliminate free'ing the request
> here.
After i go to office next week, i will check it.
>
>> + }
>> +}
>> +
>> void bdrv_register(BlockDriver *bdrv)
>> {
>> if (!bdrv->bdrv_aio_readv) {
>> @@ -642,6 +688,19 @@ int bdrv_open(BlockDriverState *bs, const char
>> *filename, int flags,
>> bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>> }
>>
>> + /* throttling disk I/O limits */
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->req_from_queue = false;
>> + bs->block_queue = qemu_new_block_queue();
>> + bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer,
>> bs);
>> +
>> + bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
>> + bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
>> +
>> + bs->slice_end[0] = qemu_get_clock_ns(vm_clock) +
>> BLOCK_IO_SLICE_TIME;
>> + bs->slice_end[1] = qemu_get_clock_ns(vm_clock) +
>> BLOCK_IO_SLICE_TIME;
>> + }
>> +
>> return 0;
>>
>> unlink_and_fail:
>> @@ -680,6 +739,16 @@ void bdrv_close(BlockDriverState *bs)
>> if (bs->change_cb)
>> bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>> }
>> +
>> + /* throttling disk I/O limits */
>> + if (bs->block_queue) {
>> + qemu_del_block_queue(bs->block_queue);
>> + }
>> +
>> + if (bs->block_timer) {
>> + qemu_del_timer(bs->block_timer);
>> + qemu_free_timer(bs->block_timer);
>> + }
>> }
>>
>> void bdrv_close_all(void)
>> @@ -1312,6 +1381,14 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>> *psecs = bs->secs;
>> }
>>
>> +/* throttling disk io limits */
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> + BlockIOLimit *io_limits)
>> +{
>> + memset(&bs->io_limits, 0, sizeof(BlockIOLimit));
>> + bs->io_limits = *io_limits;
>> +}
>> +
>> /* Recognize floppy formats */
>> typedef struct FDFormat {
>> FDriveType drive;
>> @@ -2111,6 +2188,165 @@ char *bdrv_snapshot_dump(char *buf, int buf_size,
>> QEMUSnapshotInfo *sn)
>> return buf;
>> }
>>
>> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> + bool is_write, double elapsed_time, uint64_t *wait) {
>> + uint64_t bps_limit = 0;
>> + double bytes_limit, bytes_disp, bytes_res;
>> + double slice_time, wait_time;
>> +
>> + if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
>> + bps_limit = bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL];
>> + } else if (bs->io_limits.bps[is_write]) {
>> + bps_limit = bs->io_limits.bps[is_write];
>> + } else {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> + }
>> +
>> + slice_time = bs->slice_end[is_write] - bs->slice_start[is_write];
>> + slice_time /= (BLOCK_IO_SLICE_TIME * 10.0);
>> + bytes_limit = bps_limit * slice_time;
>> + bytes_disp = bs->io_disps.bytes[is_write];
>> + if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
>> + bytes_disp += bs->io_disps.bytes[!is_write];
>> + }
>> +
>> + bytes_res = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> +
>> + if (bytes_disp + bytes_res <= bytes_limit) {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> + }
>> +
>> + /* Calc approx time to dispatch */
>> + wait_time = (bytes_disp + bytes_res) / bps_limit - elapsed_time;
>> +
>> + if (wait) {
>> + *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
>> + }
>> +
>> + return true;
>> +}
>> +
>> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
>> + double elapsed_time, uint64_t *wait) {
>> + uint64_t iops_limit = 0;
>> + double ios_limit, ios_disp;
>> + double slice_time, wait_time;
>> +
>> + if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
>> + iops_limit = bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL];
>> + } else if (bs->io_limits.iops[is_write]) {
>> + iops_limit = bs->io_limits.iops[is_write];
>> + } else {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> + }
>> +
>> + slice_time = bs->slice_end[is_write] - bs->slice_start[is_write];
>> + slice_time /= (BLOCK_IO_SLICE_TIME * 10.0);
>> + ios_limit = iops_limit * slice_time;
>> + ios_disp = bs->io_disps.ios[is_write];
>> + if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
>> + ios_disp += bs->io_disps.ios[!is_write];
>> + }
>> +
>> + if (ios_disp + 1 <= ios_limit) {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> + }
>> +
>> + /* Calc approx time to dispatch */
>> + wait_time = (ios_disp + 1) / iops_limit;
>> + if (wait_time > elapsed_time) {
>> + wait_time = wait_time - elapsed_time;
>> + } else {
>> + wait_time = 0;
>> + }
>> +
>> + if (wait) {
>> + *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
>> + }
>> +
>> + return true;
>> +}
>> +
>> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
>> + bool is_write, uint64_t *wait) {
>> + int64_t real_time, real_slice;
>> + uint64_t bps_wait = 0, iops_wait = 0, max_wait;
>> + double elapsed_time;
>> + int bps_ret, iops_ret;
>> +
>> + real_time = qemu_get_clock_ns(vm_clock);
>> + real_slice = bs->slice_end[is_write] - bs->slice_start[is_write];
>> + if ((bs->slice_start[is_write] < real_time)
>> + && (bs->slice_end[is_write] > real_time)) {
>> + bs->slice_end[is_write] = real_time + BLOCK_IO_SLICE_TIME;
>> + } else {
>> + bs->slice_start[is_write] = real_time;
>> + bs->slice_end[is_write] = real_time + BLOCK_IO_SLICE_TIME;
>> +
>> + bs->io_disps.bytes[is_write] = 0;
>> + bs->io_disps.bytes[!is_write] = 0;
>> +
>> + bs->io_disps.ios[is_write] = 0;
>> + bs->io_disps.ios[!is_write] = 0;
>> + }
>> +
>> + /* If a limit was exceeded, immediately queue this request */
>> + if ((bs->req_from_queue == false)
>> + && !QTAILQ_EMPTY(&bs->block_queue->requests)) {
>> + if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]
>> + || bs->io_limits.bps[is_write] || bs->io_limits.iops[is_write]
>> + || bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return true;
>> + }
>> + }
>> +
>> + elapsed_time = real_time - bs->slice_start[is_write];
>> + elapsed_time /= (BLOCK_IO_SLICE_TIME * 10.0);
>> +
>> + bps_ret = bdrv_exceed_bps_limits(bs, nb_sectors,
>> + is_write, elapsed_time, &bps_wait);
>> + iops_ret = bdrv_exceed_iops_limits(bs, is_write,
>> + elapsed_time, &iops_wait);
>> + if (bps_ret || iops_ret) {
>> + max_wait = bps_wait > iops_wait ? bps_wait : iops_wait;
>> + if (wait) {
>> + *wait = max_wait;
>> + }
>> +
>> + real_time = qemu_get_clock_ns(vm_clock);
>> + if (bs->slice_end[is_write] < real_time + max_wait) {
>> + bs->slice_end[is_write] = real_time + max_wait;
>> + }
>> +
>> + return true;
>> + }
>> +
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> +}
>>
>> /**************************************************************/
>> /* async I/Os */
>> @@ -2121,13 +2357,28 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState
>> *bs, int64_t sector_num,
>> {
>> BlockDriver *drv = bs->drv;
>> BlockDriverAIOCB *ret;
>> + uint64_t wait_time = 0;
>>
>> trace_bdrv_aio_readv(bs, sector_num, nb_sectors, opaque);
>>
>> - if (!drv)
>> - return NULL;
>> - if (bdrv_check_request(bs, sector_num, nb_sectors))
>> + if (!drv || bdrv_check_request(bs, sector_num, nb_sectors)) {
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->req_from_queue = false;
>> + }
>> return NULL;
>> + }
>> +
>> + /* throttling disk read I/O */
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>
> We need something like:
> if (bs->request_from_queue) {
> return NULL;
> }
>
> This allows the re-queue logic in bdrv_block_timer() to work correctly
> rather than creating an entirely new request at the tail.
Please see above comments. NULL indicates to fail to start the request
for upper layer.
>
>> + ret = qemu_block_queue_enqueue(bs->block_queue, bs,
>> bdrv_aio_readv,
>> + sector_num, qiov, nb_sectors, cb, opaque);
>> + qemu_mod_timer(bs->block_timer,
>> + wait_time + qemu_get_clock_ns(vm_clock));
>> + bs->req_from_queue = false;
>> + return ret;
>> + }
>> + }
>>
>> ret = drv->bdrv_aio_readv(bs, sector_num, qiov, nb_sectors,
>> cb, opaque);
>> @@ -2136,6 +2387,16 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState
>> *bs, int64_t sector_num,
>> /* Update stats even though technically transfer has not happened. */
>> bs->rd_bytes += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> bs->rd_ops ++;
>> +
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->io_disps.bytes[BLOCK_IO_LIMIT_READ] +=
>> + (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> + bs->io_disps.ios[BLOCK_IO_LIMIT_READ]++;
>> + }
>> + }
>> +
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->req_from_queue = false;
>> }
>>
>> return ret;
>> @@ -2184,15 +2445,18 @@ BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState
>> *bs, int64_t sector_num,
>> BlockDriver *drv = bs->drv;
>> BlockDriverAIOCB *ret;
>> BlockCompleteData *blk_cb_data;
>> + uint64_t wait_time = 0;
>>
>> trace_bdrv_aio_writev(bs, sector_num, nb_sectors, opaque);
>>
>> - if (!drv)
>> - return NULL;
>> - if (bs->read_only)
>> - return NULL;
>> - if (bdrv_check_request(bs, sector_num, nb_sectors))
>> + if (!drv || bs->read_only
>> + || bdrv_check_request(bs, sector_num, nb_sectors)) {
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->req_from_queue = false;
>> + }
>> +
>> return NULL;
>> + }
>>
>> if (bs->dirty_bitmap) {
>> blk_cb_data = blk_dirty_cb_alloc(bs, sector_num, nb_sectors, cb,
>> @@ -2201,6 +2465,18 @@ BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState
>> *bs, int64_t sector_num,
>> opaque = blk_cb_data;
>> }
>>
>> + /* throttling disk write I/O */
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + if (bdrv_exceed_io_limits(bs, nb_sectors, true, &wait_time)) {
>
> same change as writev needed here.
pls see above comments.
>
>> + ret = qemu_block_queue_enqueue(bs->block_queue, bs,
>> bdrv_aio_writev,
>> + sector_num, qiov, nb_sectors, cb, opaque);
>> + qemu_mod_timer(bs->block_timer,
>> + wait_time + qemu_get_clock_ns(vm_clock));
>> + bs->req_from_queue = false;
>> + return ret;
>> + }
>> + }
>> +
>> ret = drv->bdrv_aio_writev(bs, sector_num, qiov, nb_sectors,
>> cb, opaque);
>>
>> @@ -2211,6 +2487,16 @@ BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState
>> *bs, int64_t sector_num,
>> if (bs->wr_highest_sector < sector_num + nb_sectors - 1) {
>> bs->wr_highest_sector = sector_num + nb_sectors - 1;
>> }
>> +
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->io_disps.bytes[BLOCK_IO_LIMIT_WRITE] +=
>> + (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> + bs->io_disps.ios[BLOCK_IO_LIMIT_WRITE]++;
>> + }
>> + }
>> +
>> + if (bdrv_io_limits_enable(&bs->io_limits)) {
>> + bs->req_from_queue = false;
>> }
>>
>> return ret;
>> diff --git a/block.h b/block.h
>> index 859d1d9..f0dac62 100644
>> --- a/block.h
>> +++ b/block.h
>> @@ -97,7 +97,6 @@ int bdrv_change_backing_file(BlockDriverState *bs,
>> const char *backing_file, const char *backing_fmt);
>> void bdrv_register(BlockDriver *bdrv);
>>
>> -
>> typedef struct BdrvCheckResult {
>> int corruptions;
>> int leaks;
>> diff --git a/block_int.h b/block_int.h
>> index 1e265d2..1ca826b 100644
>> --- a/block_int.h
>> +++ b/block_int.h
>> @@ -27,10 +27,17 @@
>> #include "block.h"
>> #include "qemu-option.h"
>> #include "qemu-queue.h"
>> +#include "block/blk-queue.h"
>>
>> #define BLOCK_FLAG_ENCRYPT 1
>> #define BLOCK_FLAG_COMPAT6 4
>>
>> +#define BLOCK_IO_LIMIT_READ 0
>> +#define BLOCK_IO_LIMIT_WRITE 1
>> +#define BLOCK_IO_LIMIT_TOTAL 2
>> +
>> +#define BLOCK_IO_SLICE_TIME 100000000
>> +
>> #define BLOCK_OPT_SIZE "size"
>> #define BLOCK_OPT_ENCRYPT "encryption"
>> #define BLOCK_OPT_COMPAT6 "compat6"
>> @@ -46,6 +53,16 @@ typedef struct AIOPool {
>> BlockDriverAIOCB *free_aiocb;
>> } AIOPool;
>>
>> +typedef struct BlockIOLimit {
>> + uint64_t bps[3];
>> + uint64_t iops[3];
>> +} BlockIOLimit;
>> +
>> +typedef struct BlockIODisp {
>> + uint64_t bytes[2];
>> + uint64_t ios[2];
>> +} BlockIODisp;
>> +
>> struct BlockDriver {
>> const char *format_name;
>> int instance_size;
>> @@ -175,6 +192,15 @@ struct BlockDriverState {
>>
>> void *sync_aiocb;
>>
>> + /* the time for latest disk I/O */
>> + int64_t slice_start[2];
>> + int64_t slice_end[2];
>> + BlockIOLimit io_limits;
>> + BlockIODisp io_disps;
>> + BlockQueue *block_queue;
>> + QEMUTimer *block_timer;
>> + bool req_from_queue;
>> +
>> /* I/O stats (display with "info blockstats"). */
>> uint64_t rd_bytes;
>> uint64_t wr_bytes;
>> @@ -222,6 +248,9 @@ void qemu_aio_release(void *p);
>>
>> void *qemu_blockalign(BlockDriverState *bs, size_t size);
>>
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> + BlockIOLimit *io_limits);
>> +
>> #ifdef _WIN32
>> int is_windows_drive(const char *filename);
>> #endif
>> --
>> 1.7.2.3
>>
>
> --
> Ryan Harper
> Software Engineer; Linux Technology Center
> IBM Corp., Austin, Tx
> address@hidden
>
--
Regards,
Zhi Yong Wu