qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH] curl: fix curl read


From: Fam Zheng
Subject: [Qemu-devel] [PATCH] curl: fix curl read
Date: Fri, 3 May 2013 16:00:09 +0800

CURL library API has changed, the current curl driver is not working.
This patch rewrites the use of API as well as the structure of internal
states. (It is hard to split this to multiple patches as basically all
these changes need to work together.)

BDRVCURLState holds the pointer to curl multi interface (man 3
libcurl-multi), and 4 lists for internal states:
 - CURLState holds state for libcurl connection (man 3 libcurl-easy)
 - CURLSockInfo holds information for libcurl socket interface (man 3
   curl_multi_socket_action).
 - CURLDataCache holds the user data read from libcurl, it is in a list
   ordered by access, the used cache is moved to list head on access, so
   the tail element is freed first. BDRVCURLState.cache_quota is the
   threshold to start freeing cache.
 - CURLAIOCB holds ongoing aio information.

Signed-off-by: Fam Zheng <address@hidden>
---
 block/curl.c | 553 ++++++++++++++++++++++++++++++++++++-----------------------
 1 file changed, 336 insertions(+), 217 deletions(-)

diff --git a/block/curl.c b/block/curl.c
index b8935fd..e5ad36f 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -38,14 +38,9 @@
                    CURLPROTO_FTP | CURLPROTO_FTPS | \
                    CURLPROTO_TFTP)
 
-#define CURL_NUM_STATES 8
-#define CURL_NUM_ACB    8
 #define SECTOR_SIZE     512
 #define READ_AHEAD_SIZE (256 * 1024)
-
-#define FIND_RET_NONE   0
-#define FIND_RET_OK     1
-#define FIND_RET_WAIT   2
+#define CURL_CACHE_QUOTA 10
 
 struct BDRVCURLState;
 
@@ -56,171 +51,219 @@ typedef struct CURLAIOCB {
 
     int64_t sector_num;
     int nb_sectors;
-
-    size_t start;
-    size_t end;
+    QLIST_ENTRY(CURLAIOCB) next;
 } CURLAIOCB;
 
-typedef struct CURLState
-{
+typedef struct CURLDataCache {
+    char *data;
+    size_t base_pos;
+    size_t data_len;
+    size_t write_pos;
+    /* Ref count for CURLState */
+    int use_count;
+    QLIST_ENTRY(CURLDataCache) next;
+} CURLDataCache;
+
+typedef struct CURLState {
     struct BDRVCURLState *s;
-    CURLAIOCB *acb[CURL_NUM_ACB];
     CURL *curl;
-    char *orig_buf;
-    size_t buf_start;
-    size_t buf_off;
-    size_t buf_len;
-    char range[128];
+#define CURL_RANGE_SIZE 128
+    char range[CURL_RANGE_SIZE];
     char errmsg[CURL_ERROR_SIZE];
-    char in_use;
+    CURLDataCache *cache;
+    QLIST_ENTRY(CURLState) next;
 } CURLState;
 
+typedef struct CURLSockInfo {
+    curl_socket_t fd;
+    int action;
+    struct BDRVCURLState *s;
+    QLIST_ENTRY(CURLSockInfo) next;
+} CURLSockInfo;
+
 typedef struct BDRVCURLState {
     CURLM *multi;
     size_t len;
-    CURLState states[CURL_NUM_STATES];
+    QLIST_HEAD(, CURLState) curl_states;
+    QLIST_HEAD(, CURLAIOCB) acbs;
+    QLIST_HEAD(, CURLSockInfo) socks;
     char *url;
     size_t readahead_size;
+    QEMUTimer *timer;
+    /* List of data cache ordered by access, freed from tail */
+    QLIST_HEAD(, CURLDataCache) cache;
+    /* Threshold to release unused cache when cache list is longer than it */
+    int cache_quota;
+    /* Whether http server accept range in header */
+    bool accept_range;
+    /* Whether certificated ssl only */
+    bool ssl_no_cert;
 } BDRVCURLState;
 
 static void curl_clean_state(CURLState *s);
-static void curl_multi_do(void *arg);
+static void curl_fd_handler(void *arg);
 static int curl_aio_flush(void *opaque);
 
+static CURLDataCache *curl_find_cache(BDRVCURLState *bs,
+                                      size_t start, size_t len)
+{
+    CURLDataCache *c;
+    QLIST_FOREACH(c, &bs->cache, next) {
+        if (start >= c->base_pos &&
+            start + len <= c->base_pos + c->write_pos) {
+            return c;
+        }
+    }
+    return NULL;
+}
+
 static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
                         void *s, void *sp)
 {
+    BDRVCURLState *bs = (BDRVCURLState *)s;
+    /*int running;*/
     DPRINTF("CURL (AIO): Sock action %d on fd %d\n", action, fd);
+    CURLSockInfo *sock = (CURLSockInfo *)sp;
+    if (!sp) {
+        sock = g_malloc0(sizeof(CURLSockInfo));
+        sock->fd = fd;
+        sock->s = bs;
+        QLIST_INSERT_HEAD(&bs->socks, sock, next);
+        curl_multi_assign(bs->multi, fd, sock);
+    }
     switch (action) {
         case CURL_POLL_IN:
-            qemu_aio_set_fd_handler(fd, curl_multi_do, NULL, curl_aio_flush, 
s);
+            qemu_aio_set_fd_handler(fd, curl_fd_handler, NULL,
+                                    curl_aio_flush, sock);
+            sock->action |= CURL_CSELECT_IN;
             break;
         case CURL_POLL_OUT:
-            qemu_aio_set_fd_handler(fd, NULL, curl_multi_do, curl_aio_flush, 
s);
+            qemu_aio_set_fd_handler(fd, NULL, curl_fd_handler, curl_aio_flush,
+                                    sock);
+            sock->action |= CURL_CSELECT_OUT;
             break;
         case CURL_POLL_INOUT:
-            qemu_aio_set_fd_handler(fd, curl_multi_do, curl_multi_do,
-                                    curl_aio_flush, s);
+            qemu_aio_set_fd_handler(fd, curl_fd_handler, curl_fd_handler,
+                                    curl_aio_flush, sock);
+            sock->action |= CURL_CSELECT_IN | CURL_CSELECT_OUT;
             break;
         case CURL_POLL_REMOVE:
             qemu_aio_set_fd_handler(fd, NULL, NULL, NULL, NULL);
+            sock->action = 0;
             break;
     }
 
     return 0;
 }
 
-static size_t curl_size_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
+static size_t curl_header_cb(void *ptr, size_t size, size_t nmemb, void 
*opaque)
 {
-    CURLState *s = ((CURLState*)opaque);
+    BDRVCURLState *s = (BDRVCURLState *)opaque;
     size_t realsize = size * nmemb;
-    size_t fsize;
+    const char *accept_line = "Accept-Ranges: bytes";
 
-    if(sscanf(ptr, "Content-Length: %zd", &fsize) == 1) {
-        s->s->len = fsize;
+    if (strncmp((char *)ptr, accept_line, strlen(accept_line)) == 0) {
+        s->accept_range = true;
     }
 
     return realsize;
 }
 
-static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
+static void curl_timer_cb(void *opaque)
 {
-    CURLState *s = ((CURLState*)opaque);
-    size_t realsize = size * nmemb;
-    int i;
-
-    DPRINTF("CURL: Just reading %zd bytes\n", realsize);
-
-    if (!s || !s->orig_buf)
-        goto read_end;
-
-    memcpy(s->orig_buf + s->buf_off, ptr, realsize);
-    s->buf_off += realsize;
-
-    for(i=0; i<CURL_NUM_ACB; i++) {
-        CURLAIOCB *acb = s->acb[i];
-
-        if (!acb)
-            continue;
+    int running;
+    BDRVCURLState *bs = (BDRVCURLState *)opaque;
+    DPRINTF("curl timeout!\n");
+    curl_multi_socket_action(bs->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+}
 
-        if ((s->buf_off >= acb->end)) {
-            qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
-                                acb->end - acb->start);
-            acb->common.cb(acb->common.opaque, 0);
-            qemu_aio_release(acb);
-            s->acb[i] = NULL;
+/* Call back for curl_multi interface */
+static int curl_multi_timer_cb(CURLM *multi, long timeout_ms, void *s)
+{
+    BDRVCURLState *bs = (BDRVCURLState *)s;
+    DPRINTF("curl multi timer cb, timeout: %ld (ms)\n", timeout_ms);
+    if (timeout_ms < 0) {
+        if (bs->timer) {
+            qemu_del_timer(bs->timer);
+            qemu_free_timer(bs->timer);
+            bs->timer = NULL;
         }
+    } else if (timeout_ms == 0) {
+        curl_timer_cb(bs);
+    } else {
+        if (!bs->timer) {
+            bs->timer = qemu_new_timer_ms(host_clock, curl_timer_cb, s);
+            assert(bs->timer);
+        }
+        qemu_mod_timer(bs->timer, qemu_get_clock_ms(host_clock) + timeout_ms);
     }
 
-read_end:
-    return realsize;
+    return 0;
 }
 
-static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
-                         CURLAIOCB *acb)
+static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb,
+                             CURLDataCache *cache)
 {
-    int i;
-    size_t end = start + len;
-
-    for (i=0; i<CURL_NUM_STATES; i++) {
-        CURLState *state = &s->states[i];
-        size_t buf_end = (state->buf_start + state->buf_off);
-        size_t buf_fend = (state->buf_start + state->buf_len);
-
-        if (!state->orig_buf)
-            continue;
-        if (!state->buf_off)
-            continue;
-
-        // Does the existing buffer cover our section?
-        if ((start >= state->buf_start) &&
-            (start <= buf_end) &&
-            (end >= state->buf_start) &&
-            (end <= buf_end))
-        {
-            char *buf = state->orig_buf + (start - state->buf_start);
-
-            qemu_iovec_from_buf(acb->qiov, 0, buf, len);
-            acb->common.cb(acb->common.opaque, 0);
-
-            return FIND_RET_OK;
-        }
+    size_t aio_base = acb->sector_num * SECTOR_SIZE;
+    size_t aio_bytes = acb->nb_sectors * SECTOR_SIZE;
+    size_t off = aio_base - cache->base_pos;
+
+    qemu_iovec_from_buf(acb->qiov, 0, cache->data + off, aio_bytes);
+    acb->common.cb(acb->common.opaque, 0);
+    DPRINTF("AIO Request OK: %10zd %10zd\n", aio_base, aio_bytes);
+    qemu_aio_release(acb);
+    acb = NULL;
+    /* Move cache next in the list */
+    QLIST_REMOVE(cache, next);
+    QLIST_INSERT_HEAD(&bs->cache, cache, next);
+}
 
-        // Wait for unfinished chunks
-        if ((start >= state->buf_start) &&
-            (start <= buf_fend) &&
-            (end >= state->buf_start) &&
-            (end <= buf_fend))
-        {
-            int j;
+static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+    CURLState *s = (CURLState *)opaque;
+    CURLDataCache *c = s->cache;
+    size_t realsize = size * nmemb;
+    CURLAIOCB *acb;
 
-            acb->start = start - state->buf_start;
-            acb->end = acb->start + len;
+    if (!c || !c->data) {
+        goto read_end;
+    }
+    if (c->write_pos >= c->data_len) {
+        goto read_end;
+    }
+    memcpy(c->data + c->write_pos, ptr,
+           MIN(realsize, c->data_len - c->write_pos));
+    c->write_pos += realsize;
+    if (c->write_pos >= c->data_len) {
+        c->write_pos = c->data_len;
+    }
 
-            for (j=0; j<CURL_NUM_ACB; j++) {
-                if (!state->acb[j]) {
-                    state->acb[j] = acb;
-                    return FIND_RET_WAIT;
-                }
-            }
+    QLIST_FOREACH(acb, &s->s->acbs, next) {
+        size_t aio_base = acb->sector_num * SECTOR_SIZE;
+        size_t aio_len = acb->nb_sectors * SECTOR_SIZE;
+        if (aio_base >= c->base_pos &&
+            aio_base + aio_len <= c->base_pos + c->write_pos) {
+            QLIST_REMOVE(acb, next);
+            curl_complete_io(s->s, acb, c);
         }
     }
 
-    return FIND_RET_NONE;
+read_end:
+    return realsize;
 }
 
-static void curl_multi_do(void *arg)
+static void curl_fd_handler(void *arg)
 {
-    BDRVCURLState *s = (BDRVCURLState *)arg;
+    CURLSockInfo *sock = (CURLSockInfo *)arg;
+    BDRVCURLState *s = sock->s;
     int running;
     int r;
     int msgs_in_queue;
 
-    if (!s->multi)
-        return;
-
     do {
-        r = curl_multi_socket_all(s->multi, &running);
+        r = curl_multi_socket_action(s->multi,
+                sock->fd, sock->action,
+                &running);
     } while(r == CURLM_CALL_MULTI_PERFORM);
 
     /* Try to find done transfers, so we can free the easy
@@ -229,10 +272,12 @@ static void curl_multi_do(void *arg)
         CURLMsg *msg;
         msg = curl_multi_info_read(s->multi, &msgs_in_queue);
 
-        if (!msg)
+        if (!msg) {
             break;
-        if (msg->msg == CURLMSG_NONE)
+        }
+        if (msg->msg == CURLMSG_NONE) {
             break;
+        }
 
         switch (msg->msg) {
             case CURLMSG_DONE:
@@ -240,23 +285,24 @@ static void curl_multi_do(void *arg)
                 CURLState *state = NULL;
                 curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, 
(char**)&state);
 
-                /* ACBs for successful messages get completed in curl_read_cb 
*/
+                /* ACBs for successful messages get completed in curl_read_cb,
+                 * fail existing acbs for now */
                 if (msg->data.result != CURLE_OK) {
-                    int i;
-                    for (i = 0; i < CURL_NUM_ACB; i++) {
-                        CURLAIOCB *acb = state->acb[i];
-
-                        if (acb == NULL) {
-                            continue;
-                        }
-
+                    CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+                    while (acb) {
+                        CURLAIOCB *next = QLIST_NEXT(acb, next);
+                        DPRINTF("EIO, %s\n", state->errmsg);
                         acb->common.cb(acb->common.opaque, -EIO);
+                        QLIST_REMOVE(acb, next);
                         qemu_aio_release(acb);
-                        state->acb[i] = NULL;
+                        acb = next;
                     }
                 }
 
                 curl_clean_state(state);
+                QLIST_REMOVE(state, next);
+                g_free(state);
+                state = NULL;
                 break;
             }
             default:
@@ -268,33 +314,17 @@ static void curl_multi_do(void *arg)
 
 static CURLState *curl_init_state(BDRVCURLState *s)
 {
-    CURLState *state = NULL;
-    int i, j;
-
-    do {
-        for (i=0; i<CURL_NUM_STATES; i++) {
-            for (j=0; j<CURL_NUM_ACB; j++)
-                if (s->states[i].acb[j])
-                    continue;
-            if (s->states[i].in_use)
-                continue;
-
-            state = &s->states[i];
-            state->in_use = 1;
-            break;
-        }
-        if (!state) {
-            g_usleep(100);
-            curl_multi_do(s);
-        }
-    } while(!state);
-
-    if (state->curl)
-        goto has_curl;
+    CURLState *state;
 
+    state = g_malloc0(sizeof(CURLState));
+    state->s = s;
     state->curl = curl_easy_init();
-    if (!state->curl)
-        return NULL;
+    if (!state->curl) {
+        DPRINTF("CURL: curl_easy_init failed\n");
+        g_free(state);
+        state = NULL;
+        goto out;
+    }
     curl_easy_setopt(state->curl, CURLOPT_URL, s->url);
     curl_easy_setopt(state->curl, CURLOPT_TIMEOUT, 5);
     curl_easy_setopt(state->curl, CURLOPT_WRITEFUNCTION, (void *)curl_read_cb);
@@ -305,6 +335,8 @@ static CURLState *curl_init_state(BDRVCURLState *s)
     curl_easy_setopt(state->curl, CURLOPT_NOSIGNAL, 1);
     curl_easy_setopt(state->curl, CURLOPT_ERRORBUFFER, state->errmsg);
     curl_easy_setopt(state->curl, CURLOPT_FAILONERROR, 1);
+    curl_easy_setopt(state->curl, CURLOPT_SSL_VERIFYPEER,
+                     s->ssl_no_cert ? 0 : 1);
 
     /* Restrict supported protocols to avoid security issues in the more
      * obscure protocols.  For example, do not allow POP3/SMTP/IMAP see
@@ -320,19 +352,23 @@ static CURLState *curl_init_state(BDRVCURLState *s)
 #ifdef DEBUG_VERBOSE
     curl_easy_setopt(state->curl, CURLOPT_VERBOSE, 1);
 #endif
-
-has_curl:
-
-    state->s = s;
-
+out:
     return state;
 }
 
 static void curl_clean_state(CURLState *s)
 {
-    if (s->s->multi)
-        curl_multi_remove_handle(s->s->multi, s->curl);
-    s->in_use = 0;
+    if (s->curl) {
+        if (s->s->multi) {
+            curl_multi_remove_handle(s->s->multi, s->curl);
+        }
+        curl_easy_cleanup(s->curl);
+        s->curl = NULL;
+    }
+    if (s->cache) {
+        s->cache->use_count--;
+        assert(s->cache->use_count >= 0);
+    }
 }
 
 static void curl_parse_filename(const char *filename, QDict *options,
@@ -391,7 +427,12 @@ static QemuOptsList runtime_opts = {
             .type = QEMU_OPT_SIZE,
             .help = "Readahead size",
         },
-        { /* end of list */ }
+        {
+            .name = "ssl_no_cert",
+            .type = QEMU_OPT_BOOL,
+            .help = "SSL certificate check",
+        },
+        { /* End of list */ }
     },
 };
 
@@ -403,6 +444,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, 
int flags)
     Error *local_err = NULL;
     const char *file;
     double d;
+    int running;
 
     static int inited = 0;
 
@@ -428,6 +470,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, 
int flags)
         goto out_noclean;
     }
 
+    s->ssl_no_cert = qemu_opt_get_bool(opts, "ssl_no_cert", true);
     if (!inited) {
         curl_global_init(CURL_GLOBAL_ALL);
         inited = 1;
@@ -442,31 +485,49 @@ static int curl_open(BlockDriverState *bs, QDict 
*options, int flags)
     // Get file size
 
     curl_easy_setopt(state->curl, CURLOPT_NOBODY, 1);
-    curl_easy_setopt(state->curl, CURLOPT_WRITEFUNCTION, (void *)curl_size_cb);
-    if (curl_easy_perform(state->curl))
+    curl_easy_setopt(state->curl, CURLOPT_HEADERFUNCTION,
+                     curl_header_cb);
+    curl_easy_setopt(state->curl, CURLOPT_HEADERDATA, s);
+    if (curl_easy_perform(state->curl)) {
         goto out;
+    }
     curl_easy_getinfo(state->curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
-    curl_easy_setopt(state->curl, CURLOPT_WRITEFUNCTION, (void *)curl_read_cb);
     curl_easy_setopt(state->curl, CURLOPT_NOBODY, 0);
-    if (d)
+#if LIBCURL_VERSION_NUM > 0x071304
+    if (d != -1) {
+#else
+    if (d) {
+#endif
         s->len = (size_t)d;
-    else if(!s->len)
+    } else if (!s->len) {
         goto out;
+    }
+    if (!strncmp(s->url, "http://";, strlen("http://";)) && !s->accept_range) {
+        strncpy(state->errmsg, "Server not supporting range.", 
CURL_ERROR_SIZE);
+        goto out;
+    }
     DPRINTF("CURL: Size = %zd\n", s->len);
 
     curl_clean_state(state);
     curl_easy_cleanup(state->curl);
-    state->curl = NULL;
+    g_free(state);
+    state = NULL;
 
     // Now we know the file exists and its size, so let's
     // initialize the multi interface!
-
     s->multi = curl_multi_init();
-    curl_multi_setopt( s->multi, CURLMOPT_SOCKETDATA, s); 
-    curl_multi_setopt( s->multi, CURLMOPT_SOCKETFUNCTION, curl_sock_cb ); 
-    curl_multi_do(s);
+    if (!s->multi) {
+        goto out_noclean;
+    }
+    curl_multi_setopt(s->multi, CURLMOPT_SOCKETDATA, s);
+    curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, curl_sock_cb);
+    curl_multi_setopt(s->multi, CURLMOPT_TIMERDATA, s);
+    curl_multi_setopt(s->multi, CURLMOPT_TIMERFUNCTION, curl_multi_timer_cb);
+    curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
     qemu_opts_del(opts);
+    s->cache_quota = CURL_CACHE_QUOTA;
+
     return 0;
 
 out:
@@ -474,22 +535,17 @@ out:
     curl_easy_cleanup(state->curl);
     state->curl = NULL;
 out_noclean:
-    g_free(s->url);
     qemu_opts_del(opts);
+    g_free(s->url);
+    s->url = NULL;
     return -EINVAL;
 }
 
 static int curl_aio_flush(void *opaque)
 {
     BDRVCURLState *s = opaque;
-    int i, j;
-
-    for (i=0; i < CURL_NUM_STATES; i++) {
-        for(j=0; j < CURL_NUM_ACB; j++) {
-            if (s->states[i].acb[j]) {
-                return 1;
-            }
-        }
+    if (!QLIST_EMPTY(&s->acbs)) {
+        return 1;
     }
     return 0;
 }
@@ -504,59 +560,90 @@ static const AIOCBInfo curl_aiocb_info = {
     .cancel             = curl_aio_cancel,
 };
 
-
 static void curl_readv_bh_cb(void *p)
 {
     CURLState *state;
-
     CURLAIOCB *acb = p;
     BDRVCURLState *s = acb->common.bs->opaque;
+    CURLDataCache *cache = NULL;
+    int running;
+    int aio_base, aio_bytes;
 
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
 
-    size_t start = acb->sector_num * SECTOR_SIZE;
-    size_t end;
-
-    // In case we have the requested data already (e.g. read-ahead),
-    // we can just call the callback and be done.
-    switch (curl_find_buf(s, start, acb->nb_sectors * SECTOR_SIZE, acb)) {
-        case FIND_RET_OK:
-            qemu_aio_release(acb);
-            // fall through
-        case FIND_RET_WAIT:
-            return;
-        default:
-            break;
+    aio_base = acb->sector_num * SECTOR_SIZE;
+    aio_bytes = acb->nb_sectors * SECTOR_SIZE;
+
+    if (aio_base + aio_bytes > s->len) {
+        goto err_release;
+    }
+
+    cache = curl_find_cache(s, aio_base, aio_bytes);
+    if (cache) {
+        curl_complete_io(s, acb, cache);
+        return;
     }
 
     // No cache found, so let's start a new request
     state = curl_init_state(s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_release(acb);
-        return;
+        goto err_release;
     }
 
-    acb->start = 0;
-    acb->end = (acb->nb_sectors * SECTOR_SIZE);
-
-    state->buf_off = 0;
-    if (state->orig_buf)
-        g_free(state->orig_buf);
-    state->buf_start = start;
-    state->buf_len = acb->end + s->readahead_size;
-    end = MIN(start + state->buf_len, s->len) - 1;
-    state->orig_buf = g_malloc(state->buf_len);
-    state->acb[0] = acb;
+    cache = g_malloc0(sizeof(CURLDataCache));
+    cache->base_pos = acb->sector_num * SECTOR_SIZE;
+    cache->data_len = aio_bytes + s->readahead_size;
+    cache->write_pos = 0;
+    cache->data = g_malloc(cache->data_len);
+    /* Try to release some cache */
+    while (0 && s->cache_quota <= 0) {
+        CURLDataCache *p;
+        CURLDataCache *q = NULL;
+        assert(!QLIST_EMPTY(&s->cache));
+        for (p = QLIST_FIRST(&s->cache);
+             p; p = QLIST_NEXT(p, next)) {
+            if (p->use_count == 0) {
+                q = p;
+            }
+        }
+        if (!q) {
+            break;
+        }
+        QLIST_REMOVE(q, next);
+        g_free(q->data);
+        q->data = NULL;
+        g_free(q);
+        q = NULL;
+        s->cache_quota++;
+    }
 
-    snprintf(state->range, 127, "%zd-%zd", start, end);
-    DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
-            (acb->nb_sectors * SECTOR_SIZE), start, state->range);
+    QLIST_INSERT_HEAD(&s->acbs, acb, next);
+    snprintf(state->range, CURL_RANGE_SIZE - 1, "%zd-%zd", cache->base_pos,
+             cache->base_pos + cache->data_len);
+    DPRINTF("Reading range: %s\n", state->range);
     curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
-
+    QLIST_INSERT_HEAD(&s->curl_states, state, next);
+    QLIST_INSERT_HEAD(&s->cache, cache, next);
+    state->cache = cache;
+    cache->use_count++;
+    s->cache_quota--;
     curl_multi_add_handle(s->multi, state->curl);
-    curl_multi_do(s);
+    curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running);
+    return;
+
+err_release:
+    if (cache) {
+        if (cache->data) {
+            g_free(cache->data);
+            cache->data = NULL;
+        }
+        g_free(cache);
+        cache = NULL;
+    }
+    acb->common.cb(acb->common.opaque, -EIO);
+    qemu_aio_release(acb);
+    return;
 
 }
 
@@ -586,24 +673,56 @@ static BlockDriverAIOCB *curl_aio_readv(BlockDriverState 
*bs,
 static void curl_close(BlockDriverState *bs)
 {
     BDRVCURLState *s = bs->opaque;
-    int i;
 
     DPRINTF("CURL: Close\n");
-    for (i=0; i<CURL_NUM_STATES; i++) {
-        if (s->states[i].in_use)
-            curl_clean_state(&s->states[i]);
-        if (s->states[i].curl) {
-            curl_easy_cleanup(s->states[i].curl);
-            s->states[i].curl = NULL;
-        }
-        if (s->states[i].orig_buf) {
-            g_free(s->states[i].orig_buf);
-            s->states[i].orig_buf = NULL;
-        }
+    if (s->timer) {
+        qemu_del_timer(s->timer);
+        qemu_free_timer(s->timer);
+        s->timer = NULL;
+    }
+
+    while (!QLIST_EMPTY(&s->curl_states)) {
+        CURLState *state = QLIST_FIRST(&s->curl_states);
+        /* Remove and clean curl easy handles */
+        curl_clean_state(state);
+        QLIST_REMOVE(state, next);
+        g_free(state);
+        state = NULL;
     }
-    if (s->multi)
+
+    if (s->multi) {
         curl_multi_cleanup(s->multi);
+    }
+
+    while (!QLIST_EMPTY(&s->acbs)) {
+        CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+        acb->common.cb(acb->common.opaque, -EIO);
+        QLIST_REMOVE(acb, next);
+        g_free(acb);
+        acb = NULL;
+    }
+
+    while (!QLIST_EMPTY(&s->cache)) {
+        CURLDataCache *cache = QLIST_FIRST(&s->cache);
+        assert(cache->use_count == 0);
+        if (cache->data) {
+            g_free(cache->data);
+            cache->data = NULL;
+        }
+        QLIST_REMOVE(cache, next);
+        g_free(cache);
+        cache = NULL;
+    }
+
+    while (!QLIST_EMPTY(&s->socks)) {
+        CURLSockInfo *sock = QLIST_FIRST(&s->socks);
+        QLIST_REMOVE(sock, next);
+        g_free(sock);
+        sock = NULL;
+    }
+
     g_free(s->url);
+    s->url = NULL;
 }
 
 static int64_t curl_getlength(BlockDriverState *bs)
-- 
1.8.1.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]