[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v7 07/13] curl: make use of CURLDataCache.
From: |
Fam Zheng |
Subject: |
[Qemu-devel] [PATCH v7 07/13] curl: make use of CURLDataCache. |
Date: |
Thu, 6 Jun 2013 14:25:53 +0800 |
Make subsequecial changes to make use of introduced CURLDataCache. Moved
acb struct from CURLState to BDRVCURLState, and changed to list.
Signed-off-by: Fam Zheng <address@hidden>
---
block/curl.c | 170 ++++++++++++++++++++++++++++++++---------------------------
1 file changed, 92 insertions(+), 78 deletions(-)
diff --git a/block/curl.c b/block/curl.c
index a99d8b5..5405485 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -39,7 +39,6 @@
CURLPROTO_TFTP)
#define CURL_NUM_STATES 8
-#define CURL_NUM_ACB 8
#define SECTOR_SIZE 512
#define READ_AHEAD_SIZE (256 * 1024)
@@ -52,9 +51,7 @@ typedef struct CURLAIOCB {
int64_t sector_num;
int nb_sectors;
-
- size_t start;
- size_t end;
+ QLIST_ENTRY(CURLAIOCB) next;
} CURLAIOCB;
typedef struct CURLDataCache {
@@ -62,20 +59,18 @@ typedef struct CURLDataCache {
int64_t base_pos;
size_t data_len;
int64_t write_pos;
+ /* Ref count for CURLState */
+ int use_count;
QLIST_ENTRY(CURLDataCache) next;
} CURLDataCache;
typedef struct CURLState
{
struct BDRVCURLState *s;
- CURLAIOCB *acb[CURL_NUM_ACB];
CURL *curl;
- char *orig_buf;
- size_t buf_start;
- size_t buf_off;
- size_t buf_len;
char range[128];
char errmsg[CURL_ERROR_SIZE];
+ CURLDataCache *cache;
char in_use;
} CURLState;
@@ -90,6 +85,7 @@ typedef struct BDRVCURLState {
CURLM *multi;
size_t len;
CURLState states[CURL_NUM_STATES];
+ QLIST_HEAD(, CURLAIOCB) acbs;
QLIST_HEAD(, CURLSockInfo) socks;
char *url;
size_t readahead_size;
@@ -219,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB
*acb,
static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
{
- CURLState *s = ((CURLState*)opaque);
+ CURLState *s = opaque;
+ CURLDataCache *c = s->cache;
size_t realsize = size * nmemb;
- int i;
-
- DPRINTF("CURL: Just reading %zd bytes\n", realsize);
+ CURLAIOCB *acb;
- if (!s || !s->orig_buf)
+ if (!c || !c->data) {
goto read_end;
+ }
+ if (c->write_pos >= c->data_len) {
+ goto read_end;
+ }
+ memcpy(c->data + c->write_pos, ptr,
+ MIN(realsize, c->data_len - c->write_pos));
+ c->write_pos += realsize;
+ if (c->write_pos >= c->data_len) {
+ c->write_pos = c->data_len;
+ }
- memcpy(s->orig_buf + s->buf_off, ptr, realsize);
- s->buf_off += realsize;
-
- for(i=0; i<CURL_NUM_ACB; i++) {
- CURLAIOCB *acb = s->acb[i];
-
- if (!acb)
- continue;
-
- if ((s->buf_off >= acb->end)) {
- qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
- acb->end - acb->start);
- acb->common.cb(acb->common.opaque, 0);
- qemu_aio_release(acb);
- s->acb[i] = NULL;
+ acb = QLIST_FIRST(&s->s->acbs);
+ while (acb) {
+ int64_t aio_base = acb->sector_num * SECTOR_SIZE;
+ size_t aio_len = acb->nb_sectors * SECTOR_SIZE;
+ CURLAIOCB *next = QLIST_NEXT(acb, next);
+ if (aio_base >= c->base_pos &&
+ aio_base + aio_len <= c->base_pos + c->write_pos) {
+ QLIST_REMOVE(acb, next);
+ curl_complete_io(s->s, acb, c);
}
+ acb = next;
}
read_end:
@@ -273,10 +273,12 @@ static void curl_fd_handler(void *arg)
CURLMsg *msg;
msg = curl_multi_info_read(s->multi, &msgs_in_queue);
- if (!msg)
+ if (!msg) {
break;
- if (msg->msg == CURLMSG_NONE)
+ }
+ if (msg->msg == CURLMSG_NONE) {
break;
+ }
switch (msg->msg) {
case CURLMSG_DONE:
@@ -286,19 +288,17 @@ static void curl_fd_handler(void *arg)
CURLINFO_PRIVATE,
(char **)&state);
- /* ACBs for successful messages get completed in curl_read_cb
*/
+ /* ACBs for successful messages get completed in curl_read_cb,
+ * fail existing acbs for now */
if (msg->data.result != CURLE_OK) {
- int i;
- for (i = 0; i < CURL_NUM_ACB; i++) {
- CURLAIOCB *acb = state->acb[i];
-
- if (acb == NULL) {
- continue;
- }
-
+ CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+ while (acb) {
+ CURLAIOCB *next = QLIST_NEXT(acb, next);
+ DPRINTF("EIO, %s\n", state->errmsg);
acb->common.cb(acb->common.opaque, -EIO);
+ QLIST_REMOVE(acb, next);
qemu_aio_release(acb);
- state->acb[i] = NULL;
+ acb = next;
}
}
@@ -315,13 +315,10 @@ static void curl_fd_handler(void *arg)
static CURLState *curl_init_state(BDRVCURLState *s)
{
CURLState *state = NULL;
- int i, j;
+ int i;
do {
for (i=0; i<CURL_NUM_STATES; i++) {
- for (j=0; j<CURL_NUM_ACB; j++)
- if (s->states[i].acb[j])
- continue;
if (s->states[i].in_use)
continue;
@@ -378,6 +375,10 @@ static void curl_clean_state(CURLState *s)
if (s->s->multi)
curl_multi_remove_handle(s->s->multi, s->curl);
s->in_use = 0;
+ if (s->cache) {
+ s->cache->use_count--;
+ assert(s->cache->use_count >= 0);
+ }
}
static void curl_parse_filename(const char *filename, QDict *options,
@@ -481,6 +482,7 @@ static int curl_open(BlockDriverState *bs, QDict *options,
int flags)
QLIST_INIT(&s->socks);
QLIST_INIT(&s->cache);
+ QLIST_INIT(&s->acbs);
DPRINTF("CURL: Opening %s\n", file);
s->url = g_strdup(file);
@@ -549,14 +551,8 @@ out_noclean:
static int curl_aio_flush(void *opaque)
{
BDRVCURLState *s = opaque;
- int i, j;
-
- for (i=0; i < CURL_NUM_STATES; i++) {
- for(j=0; j < CURL_NUM_ACB; j++) {
- if (s->states[i].acb[j]) {
- return 1;
- }
- }
+ if (!QLIST_EMPTY(&s->acbs)) {
+ return 1;
}
return 0;
}
@@ -579,7 +575,7 @@ static void curl_readv_bh_cb(void *p)
CURLAIOCB *acb = p;
BDRVCURLState *s = acb->common.bs->opaque;
int64_t aio_base, aio_bytes;
- int64_t start, end;
+ int running;
qemu_bh_delete(acb->bh);
acb->bh = NULL;
@@ -587,7 +583,9 @@ static void curl_readv_bh_cb(void *p)
aio_base = acb->sector_num * SECTOR_SIZE;
aio_bytes = acb->nb_sectors * SECTOR_SIZE;
- start = acb->sector_num * SECTOR_SIZE;
+ if (aio_base + aio_bytes > s->len) {
+ goto err_release;
+ }
cache = curl_find_cache(s, aio_base, aio_bytes);
if (cache) {
@@ -598,29 +596,41 @@ static void curl_readv_bh_cb(void *p)
// No cache found, so let's start a new request
state = curl_init_state(s);
if (!state) {
- acb->common.cb(acb->common.opaque, -EIO);
- qemu_aio_release(acb);
- return;
+ goto err_release;
}
- acb->start = 0;
- acb->end = (acb->nb_sectors * SECTOR_SIZE);
-
- state->buf_off = 0;
- if (state->orig_buf)
- g_free(state->orig_buf);
- state->buf_start = start;
- state->buf_len = acb->end + s->readahead_size;
- end = MIN(start + state->buf_len, s->len) - 1;
- state->orig_buf = g_malloc(state->buf_len);
- state->acb[0] = acb;
-
- snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end);
- DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
- (acb->nb_sectors * SECTOR_SIZE), start, state->range);
- curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+ cache = g_malloc0(sizeof(CURLDataCache));
+ cache->base_pos = acb->sector_num * SECTOR_SIZE;
+ cache->data_len = aio_bytes + s->readahead_size;
+ cache->write_pos = 0;
+ cache->data = g_malloc(cache->data_len);
+ QLIST_INSERT_HEAD(&s->acbs, acb, next);
+ snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd",
cache->base_pos,
+ cache->base_pos + cache->data_len);
+ DPRINTF("Reading range: %s\n", state->range);
+ curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+ QLIST_INSERT_HEAD(&s->cache, cache, next);
+ state->cache = cache;
+ cache->use_count++;
curl_multi_add_handle(s->multi, state->curl);
+ /* kick off curl to start the action */
+ curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running);
+ return;
+
+err_release:
+ if (cache) {
+ if (cache->data) {
+ g_free(cache->data);
+ cache->data = NULL;
+ }
+ g_free(cache);
+ cache = NULL;
+ }
+ acb->common.cb(acb->common.opaque, -EIO);
+ qemu_aio_release(acb);
+ return;
+
}
@@ -667,14 +677,18 @@ static void curl_close(BlockDriverState *bs)
curl_easy_cleanup(s->states[i].curl);
s->states[i].curl = NULL;
}
- if (s->states[i].orig_buf) {
- g_free(s->states[i].orig_buf);
- s->states[i].orig_buf = NULL;
- }
}
if (s->multi)
curl_multi_cleanup(s->multi);
+ while (!QLIST_EMPTY(&s->acbs)) {
+ CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+ acb->common.cb(acb->common.opaque, -EIO);
+ QLIST_REMOVE(acb, next);
+ qemu_aio_release(acb);
+ acb = NULL;
+ }
+
while (!QLIST_EMPTY(&s->cache)) {
CURLDataCache *cache = QLIST_FIRST(&s->cache);
if (cache->data) {
--
1.8.3
- [Qemu-devel] [PATCH v7 00/13] curl: fix curl read, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 01/13] curl: introduce CURLSockInfo to BDRVCURLState., Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 02/13] curl: change magic number to sizeof, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 03/13] curl: change curl_multi_do to curl_fd_handler, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 04/13] curl: fix curl_open, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 05/13] curl: add timer to BDRVCURLState, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 06/13] curl: introduce CURLDataCache, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 07/13] curl: make use of CURLDataCache.,
Fam Zheng <=
- [Qemu-devel] [PATCH v7 08/13] curl: use list to store CURLState, Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 09/13] curl: add cache quota., Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 10/13] curl: introduce ssl_no_cert runtime option., Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 11/13] block/curl.c: Refuse to open the handle for writes., Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 12/13] curl: set s->url to NULL after free., Fam Zheng, 2013/06/06
- [Qemu-devel] [PATCH v7 13/13] curl: change timeout to 30 seconds, Fam Zheng, 2013/06/06
- Re: [Qemu-devel] [PATCH v7 00/13] curl: fix curl read, Richard W.M. Jones, 2013/06/06