[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r5094 - GNUnet/src/transports
From: |
gnunet |
Subject: |
[GNUnet-SVN] r5094 - GNUnet/src/transports |
Date: |
Sat, 16 Jun 2007 18:29:30 -0600 (MDT) |
Author: grothoff
Date: 2007-06-16 18:29:29 -0600 (Sat, 16 Jun 2007)
New Revision: 5094
Modified:
GNUnet/src/transports/http.c
Log:
http improvements
Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c 2007-06-16 23:23:19 UTC (rev 5093)
+++ GNUnet/src/transports/http.c 2007-06-17 00:29:29 UTC (rev 5094)
@@ -25,11 +25,17 @@
*
* TODO:
* - connection timeout (shutdown inactive connections)
- * - proper connection re-establishment
- * - nothing copies TO wbuff, only from (see FIXMEs)
- * - free resources allocated for PUT!
- * - integrate MHD thread into CURL thread
- * - why does valgrind show "conditional jump depends on uninit values" for
curl_multi_perform?
+ * => CURL can help do this automatically, need to do it with MHD
+ * and query CURL for timed-out connections (and then clean up)
+ * - proper connection re-establishment (i.e., if a GET times out or
+ * dies otherwise, we need to re-start the TSession if the
+ * core wants to keep using it!)
+ * - free resources allocated for PUT inside of CURL
+ * select loop (as soon as PUT is complete)
+ * - bound the number of concurrent PUTs for a given
+ * connection (to 1 + urgent?)
+ * - why does valgrind show "conditional jump depends on uninit values"
+ * for curl_multi_perform?
* - where does the 1s loopback-ping latency come from?
*/
@@ -77,6 +83,21 @@
} HostAddress;
+struct HTTPPutData {
+ struct HTTPPutData * next;
+
+ char * msg;
+
+ CURL * curl_put;
+
+ unsigned int size;
+
+ unsigned int pos;
+
+ int done;
+
+};
+
/**
* Transport Session handle.
*/
@@ -190,25 +211,17 @@
*/
char * url;
+ /**
+ * Linked list of PUT operations.
+ */
+ struct HTTPPutData * puts;
+
} client;
} cs;
} HTTPSession;
-struct HTTPPutData {
- struct HTTPPutData * next;
-
- char * msg;
-
- CURL * curl_put;
-
- unsigned int size;
-
- unsigned int pos;
-
-};
-
/* *********** globals ************* */
/**
@@ -265,10 +278,6 @@
*/
static UPnP_ServiceAPI * upnp;
-/**
- * List of active PUT requests.
- */
-static struct HTTPPutData * putHead;
/**
* Lock for access to mutable state of the module,
@@ -325,6 +334,8 @@
*/
static int httpDisconnect(TSession * tsession) {
HTTPSession * httpsession = tsession->internal;
+ struct HTTPPutData * pos;
+ struct HTTPPutData * next;
if (httpsession != NULL) {
MUTEX_LOCK(httpsession->lock);
@@ -340,6 +351,17 @@
httpsession->cs.client.get);
curl_easy_cleanup(httpsession->cs.client.get);
FREE(httpsession->cs.client.url);
+ pos = httpsession->cs.client.puts;
+ while (pos != NULL) {
+ next = pos->next;
+ curl_multi_remove_handle(curl_multi,
+ pos->curl_put);
+ curl_easy_cleanup(pos->curl_put);
+ FREE(pos->msg);
+ FREE(pos);
+ pos = next;
+ }
+
} else {
MHD_destroy_response(httpsession->cs.server.get);
}
@@ -400,7 +422,7 @@
GE_BREAK(NULL, 0);
return SYSERR;
}
- httpSession = (HTTPSession*) tsession->internal;
+ httpSession = tsession->internal;
MUTEX_LOCK(httpSession->lock);
if (httpSession->destroyed == YES) {
MUTEX_UNLOCK(httpSession->lock);
@@ -455,7 +477,7 @@
"HTTP port is 0, will only send using HTTP.\n");
return NULL; /* HTTP transport is configured SEND-only! */
}
- msg = (P2P_hello_MESSAGE *) MALLOC(sizeof(P2P_hello_MESSAGE) +
sizeof(HostAddress));
+ msg = MALLOC(sizeof(P2P_hello_MESSAGE) + sizeof(HostAddress));
haddr = (HostAddress*) &msg[1];
if (! ( ( (upnp != NULL) &&
@@ -590,21 +612,13 @@
if (httpSession == NULL) {
httpSession = MALLOC(sizeof(HTTPSession));
- httpSession->destroyed = NO;
- httpSession->rpos1 = 0;
- httpSession->rpos2 = 0;
- httpSession->rsize2 = 0;
- httpSession->rbuff2 = NULL;
- httpSession->wsize = 0;
- httpSession->woff = 0;
- httpSession->wpos = 0;
- httpSession->wbuff = NULL;
+ memset(httpSession,
+ 0,
+ sizeof(HTTPSession));
httpSession->sender = *(coreAPI->myIdentity);
httpSession->lock = MUTEX_CREATE(YES);
httpSession->users = 1; /* us only, core has not seen this tsession! */
httpSession->lastUse = get_time();
- httpSession->is_client = NO;
- httpSession->cs.client.get = NULL;
tsession = MALLOC(sizeof(TSession));
tsession->ttype = HTTP_PROTOCOL_NUMBER;
tsession->internal = httpSession;
@@ -784,10 +798,28 @@
CURLMcode mret;
char * url;
EncName enc;
+ int i;
- /* FIXME: check if we have a GET pending for
- this peer, and if so, use that! */
+ /* check if we have a session pending for this peer */
+ tsession = NULL;
+ MUTEX_LOCK(httplock);
+ for (i=0;i<tsessionCount;i++) {
+ if (0 == memcmp(&hello->senderIdentity,
+ &tsessions[i]->peer,
+ sizeof(PeerIdentity))) {
+ tsession = tsessions[i];
+ break;
+ }
+ }
+ if ( (tsession != NULL) &&
+ (OK == httpAssociate(tsession)) ) {
+ *tsessionPtr = tsession;
+ MUTEX_UNLOCK(httplock);
+ return OK;
+ }
+ MUTEX_UNLOCK(httplock);
+ /* no session pending, initiate a new one! */
curl_get = curl_easy_init();
if (curl_get == NULL)
return SYSERR;
@@ -833,6 +865,9 @@
&receiveContentCallback);
httpSession = MALLOC(sizeof(HTTPSession));
+ memset(httpSession,
+ 0,
+ sizeof(HTTPSession));
httpSession->cs.client.url = url;
CURL_EASY_SETOPT(curl_get,
CURLOPT_WRITEDATA,
@@ -852,15 +887,6 @@
}
/* create SESSION */
- httpSession->destroyed = NO;
- httpSession->rpos1 = 0;
- httpSession->rpos2 = 0;
- httpSession->rsize2 = 0;
- httpSession->rbuff2 = NULL;
- httpSession->wsize = 0;
- httpSession->woff = 0;
- httpSession->wpos = 0;
- httpSession->wbuff = NULL;
httpSession->sender = hello->senderIdentity;
httpSession->lock = MUTEX_CREATE(YES);
httpSession->users = 1; /* us only, core has not seen this tsession! */
@@ -950,57 +976,152 @@
CURL * curl_put;
CURLMcode mret;
MESSAGE_HEADER * hdr;
+ char * tmp;
- /* FIXME: check if we have a GET pending for
- this peer, and if so, use that! */
+ if (httpSession->is_client) {
+ if (size >= MAX_BUFFER_SIZE)
+ return SYSERR;
+ if (size == 0) {
+ GE_BREAK(NULL, 0);
+ return SYSERR;
+ }
+ putData = MALLOC(sizeof(struct HTTPPutData));
+ memset(putData,
+ 0,
+ sizeof(struct HTTPPutData));
+ putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
+ hdr = (MESSAGE_HEADER*) putData->msg;
+ hdr->size = htons(size + sizeof(MESSAGE_HEADER));
+ hdr->type = htons(0);
+ memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
+ msg,
+ size);
+ putData->size = size + sizeof(MESSAGE_HEADER);
+ MUTEX_LOCK(httpSession->lock);
+ curl_put = create_curl_put(httpSession,
+ putData,
+ size + sizeof(MESSAGE_HEADER));
+ if (curl_put == NULL) {
+ MUTEX_UNLOCK(httpSession->lock);
+ FREE(putData->msg);
+ FREE(putData);
+ return SYSERR;
+ }
+ putData->curl_put = curl_put;
+ putData->next = httpSession->cs.client.puts;
+ httpSession->cs.client.puts = putData;
+ MUTEX_UNLOCK(httpSession->lock);
+ MUTEX_LOCK(httplock);
+ mret = curl_multi_add_handle(curl_multi, curl_put);
+ if (mret != CURLM_OK) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ _("%s failed at %s:%d: `%s'\n"),
+ "curl_multi_add_handle",
+ __FILE__,
+ __LINE__,
+ curl_multi_strerror(mret));
+ putData->done = YES;
+ MUTEX_UNLOCK(httplock);
+ return SYSERR;
+ }
+ MUTEX_UNLOCK(httplock);
+ return OK;
+ } else { /* httpSession->isClient == false */
+ MUTEX_LOCK(httpSession->lock);
+ if (httpSession->wsize == 0)
+ GROW(httpSession->wbuff,
+ httpSession->wsize,
+ HTTP_BUF_SIZE);
+ if (httpSession->wpos + size > httpSession->wsize) {
+ /* need to grow or discard */
+ if (! important) {
+ MUTEX_UNLOCK(httpSession->lock);
+ return NO;
+ }
+ tmp = MALLOC(httpSession->wpos + size);
+ memcpy(tmp,
+ &httpSession->wbuff[httpSession->woff],
+ httpSession->wpos);
+ FREE(httpSession->wbuff);
+ httpSession->wbuff = tmp;
+ httpSession->wsize = httpSession->wpos + size;
+ httpSession->woff = 0;
+ httpSession->wpos = httpSession->wpos + size;
+ } else {
+ /* fits without growing */
+ if (httpSession->wpos + httpSession->woff + size > httpSession->wsize) {
+ /* need to compact first */
+ memmove(httpSession->wbuff,
+ &httpSession->wbuff[httpSession->woff],
+ httpSession->wpos);
+ httpSession->woff = 0;
+ }
+ /* append */
+ memcpy(&httpSession->wbuff[httpSession->woff + httpSession->wpos],
+ msg,
+ size);
+ httpSession->wpos += size;
+ }
+ MUTEX_UNLOCK(httpSession->lock);
+ return OK;
+ }
+}
- if (size >= MAX_BUFFER_SIZE)
- return SYSERR;
- if (size == 0) {
- GE_BREAK(NULL, 0);
- return SYSERR;
- }
- putData = MALLOC(sizeof(struct HTTPPutData));
- putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
- hdr = (MESSAGE_HEADER*) putData->msg;
- hdr->size = htons(size + sizeof(MESSAGE_HEADER));
- hdr->type = htons(0);
- memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
- msg,
- size);
- putData->size = size + sizeof(MESSAGE_HEADER);
- MUTEX_LOCK(httpSession->lock);
- curl_put = create_curl_put(httpSession,
- putData,
- size + sizeof(MESSAGE_HEADER));
- MUTEX_UNLOCK(httpSession->lock);
- putData->curl_put = curl_put;
- if (curl_put == NULL) {
- FREE(putData->msg);
- FREE(putData);
- return SYSERR;
- }
+/**
+ * Function called to cleanup dead connections
+ * (completed PUTs, GETs that have timed out,
+ * etc.).
+ */
+static void
+cleanup_connections() {
+ int i;
+ HTTPSession * s;
+ struct HTTPPutData * prev;
+ struct HTTPPutData * pos;
+
MUTEX_LOCK(httplock);
- putData->next = putHead;
- putHead = putData;
- mret = curl_multi_add_handle(curl_multi, curl_put);
- if (mret != CURLM_OK) {
- GE_LOG(coreAPI->ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- _("%s failed at %s:%d: `%s'\n"),
- "curl_multi_add_handle",
- __FILE__,
- __LINE__,
- curl_multi_strerror(mret));
- putHead = putData->next;
- curl_easy_cleanup(curl_put);
- FREE(putData->msg);
- FREE(putData);
- MUTEX_UNLOCK(httplock);
- return SYSERR;
+ for (i=0;i<tsessionCount;i++) {
+ s = tsessions[i]->internal;
+ if (s->is_client) {
+ prev = NULL;
+ pos = s->cs.client.puts;
+ while (pos != NULL) {
+ /* FIXME: check if CURL has timed out
+ the GET operation! If so, clean up!
+ (and make sure we re-establish GET
+ as needed!) */
+
+
+ if (pos->done) {
+ if (prev == NULL)
+ s->cs.client.puts = pos->next;
+ else
+ prev->next = pos->next;
+ FREE(pos->msg);
+ curl_multi_remove_handle(curl_multi,
+ pos->curl_put);
+ curl_easy_cleanup(pos->curl_put);
+ FREE(pos);
+ if (prev == NULL)
+ pos = s->cs.client.puts;
+ else
+ pos = pos->next;
+ continue;
+ }
+ prev = pos;
+ pos = pos->next;
+ }
+ } else {
+ /* FIXME: add code to close MHD connection
+ from the server side (timeout!); need
+ to
+ A) tell GET callback to return "end of transmission"
+ B) destroy response object
+ */
+ }
}
- MUTEX_UNLOCK(httplock);
- return OK;
+ MUTEX_UNLOCK(httplock);
}
static void *
@@ -1033,6 +1154,12 @@
curl_multi_strerror(mret));
break;
}
+ if (mhd_daemon != NULL)
+ MHD_get_fdset(mhd_daemon,
+ &rs,
+ &ws,
+ &es,
+ &max);
/* CURL requires a regular timeout... */
tv.tv_sec = 0;
tv.tv_usec = 1000;
@@ -1045,6 +1172,9 @@
break;
running = 0;
curl_multi_perform(curl_multi, &running);
+ if (mhd_daemon != NULL)
+ MHD_run(mhd_daemon);
+ cleanup_connections();
}
return NULL;
}
@@ -1065,7 +1195,7 @@
port = getGNUnetHTTPPort();
if ( (mhd_daemon == NULL) &&
(port != 0) ) {
- mhd_daemon = MHD_start_daemon(MHD_USE_SELECT_INTERNALLY | MHD_USE_IPv4,
+ mhd_daemon = MHD_start_daemon(MHD_USE_IPv4,
port,
&acceptPolicyCallback,
NULL,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r5094 - GNUnet/src/transports,
gnunet <=