[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v1 1/1] Submit the codes for QEMU disk I/O limit
From: |
Zhi Yong Wu |
Subject: |
Re: [Qemu-devel] [PATCH v1 1/1] Submit the codes for QEMU disk I/O limits. |
Date: |
Mon, 25 Jul 2011 12:25:56 +0800 |
On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <address@hidden> wrote:
> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <address@hidden> wrote:
>> +static void bdrv_block_timer(void *opaque)
>> +{
>> + BlockDriverState *bs = opaque;
>> + BlockQueue *queue = bs->block_queue;
>> + uint64_t intval = 1;
>> +
>> + while (!QTAILQ_EMPTY(&queue->requests)) {
>> + BlockIORequest *request;
>> + int ret;
>> +
>> + request = QTAILQ_FIRST(&queue->requests);
>> + QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> + ret = queue->handler(request);
>
> Please remove the function pointer and call qemu_block_queue_handler()
> directly. This indirection is not needed and makes it harder to
> follow the code.
Should it keep the same style with other queue implemetations such as
network queue? As you have known, network queue has one queue handler.
>
>> + if (ret == 0) {
>> + QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> + break;
>> + }
>> +
>> + qemu_free(request);
>> + }
>> +
>> + qemu_mod_timer(bs->block_timer, (intval * 1000000000) +
>> qemu_get_clock_ns(vm_clock));
>
> intval is always 1. The timer should be set to the next earliest deadline.
pls see bdrv_aio_readv/writev:
+ /* throttling disk read I/O */
+ if (bs->io_limits != NULL) {
+ if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+ ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
+ sector_num, qiov, nb_sectors, cb, opaque);
+ qemu_mod_timer(bs->block_timer, wait_time +
qemu_get_clock_ns(vm_clock));
The timer has been modified when the blk request is enqueued.
>
>> +}
>> +
>> void bdrv_register(BlockDriver *bdrv)
>> {
>> if (!bdrv->bdrv_aio_readv) {
>> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const
>> char *filename,
>> goto free_and_fail;
>> }
>>
>> + /* throttling disk I/O limits */
>> + if (bs->io_limits != NULL) {
>> + bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
>> + }
>> +
>> #ifndef _WIN32
>> if (bs->is_temporary) {
>> unlink(filename);
>> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char
>> *filename, int flags,
>> bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>> }
>>
>> + /* throttling disk I/O limits */
>> + if (bs->io_limits != NULL) {
>
> block_queue is allocated in bdrv_open_common() but these variables are
> initialized in bdrv_open(). Can you move them together, I don't see a
> reason to keep them apart?
OK, They will be done.
>
>> + bs->io_disps = qemu_mallocz(sizeof(BlockIODisp));
>> + bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> + qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));
>
> Why is the timer being scheduled immediately? There are no queued requests.
OK, you mean if no request is in block queue, the timer should be stoped?
>
>> +
>> + bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
>> + bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
>> + }
>> +
>> return 0;
>>
>> unlink_and_fail:
>> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>> if (bs->change_cb)
>> bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>> }
>> +
>> + /* throttling disk I/O limits */
>> + if (bs->block_queue) {
>> + qemu_del_block_queue(bs->block_queue);
>
> 3 space indent, should be 4 spaces.
done
>
>> + }
>> +
>> + if (bs->block_timer) {
>
> qemu_free_timer() will only free() the timer memory but it will not
> cancel the timer. Use qemu_del_timer() first to ensure that the timer
> is not pending:
>
> qemu_del_timer(bs->block_timer);
OK, thanks.
>
>> + qemu_free_timer(bs->block_timer);
>> + }
>> }
>>
>> void bdrv_close_all(void)
>> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>> *psecs = bs->secs;
>> }
>>
>> +/* throttling disk io limits */
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> + BlockIOLimit *io_limits)
>> +{
>> + bs->io_limits = io_limits;
>
> This function takes ownership of io_limits but never frees it. I
> suggest not taking ownership and just copying io_limits into
> bs->io_limits so that the caller does not need to malloc() and the
> lifecycle of bs->io_limits is completely under our control.
>
> Easiest would be to turn BlockDriverState.io_limits into:
>
> BlockIOLimit io_limits;
>
> and just copy in bdrv_set_io_limits():
>
> bs->io_limits = *io_limits;
Good point.
>
> bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
> I wouldn't worry about optimizing it out yet.
>
>> +}
>> +
>> /* Recognize floppy formats */
>> typedef struct FDFormat {
>> FDriveType drive;
>> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size,
>> QEMUSnapshotInfo *sn)
>> return buf;
>> }
>>
>> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> + bool is_write, uint64_t *wait) {
>> + int64_t real_time;
>> + uint64_t bps_limit = 0;
>> + double bytes_limit, bytes_disp, bytes_res, elapsed_time;
>> + double slice_time = 0.1, wait_time;
>> +
>> + if (bs->io_limits->bps[2]) {
>> + bps_limit = bs->io_limits->bps[2];
>
> Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.
OK.
>
>> + } else if (bs->io_limits->bps[is_write]) {
>> + bps_limit = bs->io_limits->bps[is_write];
>> + } else {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + return false;
>> + }
>> +
>> + real_time = qemu_get_clock_ns(vm_clock);
>> + if (bs->slice_start[is_write] + 100000000 <= real_time) {
>
> Please define a constant for the 100 ms time slice instead of using
> 100000000 directly.
OK.
>
>> + bs->slice_start[is_write] = real_time;
>> +
>> + bs->io_disps->bytes[is_write] = 0;
>> + if (bs->io_limits->bps[2]) {
>> + bs->io_disps->bytes[!is_write] = 0;
>> + }
>
> All previous data should be discarded when a new time slice starts:
> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will
never be used; i think that it should not be cleared here. right?
>
>> + }
>
> Please start the new slice in common code. That way you can clear
> bytes[] and ios[] at the same time and don't need to duplicate this
> code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().
>
>> +
>> + elapsed_time = (real_time - bs->slice_start[is_write]) / 1000000000.0;
>> + fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time =
>> %g\n", real_time, bs->slice_start[is_write], elapsed_time);
>> +
>> + bytes_limit = bps_limit * slice_time;
>> + bytes_disp = bs->io_disps->bytes[is_write];
>> + if (bs->io_limits->bps[2]) {
>> + bytes_disp += bs->io_disps->bytes[!is_write];
>> + }
>> +
>> + bytes_res = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> + fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g,
>> elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
>> +
>> + if (bytes_disp + bytes_res <= bytes_limit) {
>> + if (wait) {
>> + *wait = 0;
>> + }
>> +
>> + fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
>> + return false;
>> + }
>> +
>> + /* Calc approx time to dispatch */
>> + wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
>> + if (!wait_time) {
>> + wait_time = 1;
>> + }
>> +
>> + wait_time = wait_time + (slice_time - elapsed_time);
>> + if (wait) {
>> + *wait = wait_time * 1000000000 + 1;
>> + }
>> +
>> + return true;
>> +}
>
> After a slice expires all bytes/iops dispatched data is forgotten,
> even if there are still requests queued. This means that requests
> issued by the guest immediately after a new 100 ms period will be
> issued but existing queued requests will still be waiting.
>
> And since the queued requests don't get their next chance until later,
> it's possible that they will be requeued because the requests that the
> guest snuck in have brought us to the limit again.
>
> In order to solve this problem, we need to extend the current slice if
> there are still requests pending. To prevent extensions from growing
> the slice forever (and keeping too much old data around), it should be
> alright to cull data from 2 slices ago. The simplest way of doing
> that is to subtract the bps/iops limits from the bytes/iops
> dispatched.
>
>> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState
>> *bs, int64_t sector_num,
>> if (bdrv_check_request(bs, sector_num, nb_sectors))
>> return NULL;
>>
>> + /* throttling disk read I/O */
>> + if (bs->io_limits != NULL) {
>> + if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> + ret = qemu_block_queue_enqueue(bs->block_queue, bs,
>> bdrv_aio_readv,
>> + sector_num, qiov, nb_sectors, cb, opaque);
>
> 5 space indent, should be 4.
>
>> + fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value =
>> %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
>> + qemu_mod_timer(bs->block_timer, wait_time +
>> qemu_get_clock_ns(vm_clock));
>
> Imagine 3 requests that need to be queued: A, B, and C. Since the
> timer is unconditionally set each time a request is queued, the timer
> will be set to C's wait_time. However A or B's wait_time might be
> earlier and we will miss that deadline!
>
> We really need a priority queue here. QEMU's timers solve the same
> problem with a sorted list, which might actually be faster for short
> lists where a fancy data structure has too much overhead.
>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> + BlockDriverState *bs,
>> + BlockRequestHandler *handler,
>> + int64_t sector_num,
>> + QEMUIOVector *qiov,
>> + int nb_sectors,
>> + BlockDriverCompletionFunc *cb,
>> + void *opaque)
>> +{
>> + BlockIORequest *request;
>> + BlockDriverAIOCB *acb;
>> +
>> + request = qemu_malloc(sizeof(BlockIORequest));
>> + request->bs = bs;
>> + request->handler = handler;
>> + request->sector_num = sector_num;
>> + request->qiov = qiov;
>> + request->nb_sectors = nb_sectors;
>> + request->cb = cb;
>> + request->opaque = opaque;
>> +
>> + QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> + acb = qemu_malloc(sizeof(*acb));
>> + acb->bs = bs;
>> + acb->cb = cb;
>> + acb->opaque = opaque;
>
> AIOPool and qemu_aio_get() should be used instead of manually doing
> this. Otherwise bdrv_aio_cancel() does not work.
>
> In order to free our acb when the I/O request completes we need a cb
> function. Right now acb is leaking.
OK.
>
>> diff --git a/qemu-config.c b/qemu-config.c
>> index efa892c..ad5f2d9 100644
>> --- a/qemu-config.c
>> +++ b/qemu-config.c
>> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>> .name = "boot",
>> .type = QEMU_OPT_BOOL,
>> .help = "make this a boot drive",
>> + },{
>> + .name = "iops",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its iops for this drive",
>
> Let's explain what "iops" means:
>
> "limit total I/O operations per second"
>
>> + },{
>> + .name = "iops_rd",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its iops_rd for this drive",
>
> "limit read operations per second"
>
>> + },{
>> + .name = "iops_wr",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its iops_wr for this drive",
>
> "limit write operations per second"
>
>> + },{
>> + .name = "bps",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its bps for this drive",
>
> "limit total bytes per second"
>
>> + },{
>> + .name = "bps_rd",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its bps_rd for this drive",
>
> "limit read bytes per second"
>
>> + },{
>> + .name = "bps_wr",
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "cap its iops_wr for this drive",
>
> "limit write bytes per second"
Done
>
> Stefan
>
--
Regards,
Zhi Yong Wu