gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5517 - GNUnet/src/transports


From: gnunet
Subject: [GNUnet-SVN] r5517 - GNUnet/src/transports
Date: Sun, 19 Aug 2007 02:50:53 -0600 (MDT)

Author: grothoff
Date: 2007-08-19 02:50:43 -0600 (Sun, 19 Aug 2007)
New Revision: 5517

Modified:
   GNUnet/src/transports/http.c
Log:
getting http to pass gnunet-transport-check

Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c        2007-08-19 08:48:40 UTC (rev 5516)
+++ GNUnet/src/transports/http.c        2007-08-19 08:50:43 UTC (rev 5517)
@@ -24,19 +24,7 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - connection timeout (shutdown inactive connections)
- *   => CURL and MHD can help do this, we mostly need
- *      to make sure we clean up properly...
- * - proper connection re-establishment (i.e., if a GET times out or
- *      dies otherwise, we need to re-start the TSession if the
- *      core wants to keep using it!)
- * - free resources allocated for PUT inside of CURL
- *      select loop (as soon as PUT is complete)
- * - bound the number of concurrent PUTs for a given
- *      connection (to 1 + urgent?)
- * - why does valgrind show "conditional jump depends on uninit values"
- *   for curl_multi_perform?
- * - where does the 1s loopback-ping latency come from?
+ * - test GETs (gnunet-transport-check does NOT!)
  */
 
 #include "gnunet_util.h"
@@ -52,10 +40,19 @@
 #define DEBUG_HTTP NO
 
 /**
- * after how much time of the core not being associated with a http
+ * Disable GET (for debugging only!).  Must be YES
+ * in production use!
+ */
+#define DO_GET YES
+
+/**
+ * After how much time of the core not being associated with a http
  * connection anymore do we close it?
+ *
+ * Needs to be larger than SECONDS_INACTIVE_DROP in
+ * core's connection.s
  */
-#define HTTP_TIMEOUT (30 * cronSECONDS)
+#define HTTP_TIMEOUT (600 * cronSECONDS)
 
 /**
  * Default maximum size of the HTTP read and write buffer.
@@ -63,6 +60,13 @@
 #define HTTP_BUF_SIZE (64 * 1024)
 
 /**
+ * Text of the response sent back after the last bytes of a PUT
+ * request have been received (just to formally obey the HTTP 
+ * protocol).
+ */
+#define HTTP_PUT_RESPONSE "Thank you!"
+
+/**
  * Host-Address in a HTTP network.
  */
 typedef struct
@@ -84,93 +88,141 @@
 
 } HostAddress;
 
+/**
+ * Client-side data per PUT request.
+ */
 struct HTTPPutData
 {
+  /**
+   * This is a linked list.
+   */
   struct HTTPPutData *next;
 
+  /**
+   * Handle to our CURL request.
+   */
+  CURL *curl_put;  
+
+  /**
+   * Last time we made progress with the PUT.
+   */
+  cron_t last_activity;
+
+  /**
+   * The message we are sending.
+   */
   char *msg;
 
-  CURL *curl_put;
-
+  /**
+   * Size of msg.
+   */
   unsigned int size;
 
+  /**
+   * Current position in msg.
+   */
   unsigned int pos;
 
+  /**
+   * Are we done sending?  Set to 1 after we
+   * completed sending and started to receive
+   * a response ("Thank you!") or once the
+   * timeout has been reached.
+   */
   int done;
 
 };
 
 /**
- * Transport Session handle.
+ * Server-side data per PUT request.
  */
-typedef struct
+struct MHDPutData
 {
+  /**
+   * This is a linked list.
+   */
+  struct MHDPutData *next;
 
   /**
-   * mutex for synchronized access to struct
+   * MHD connection handle for this request.
    */
-  struct MUTEX *lock;
+  struct MHD_Connection * session;
 
   /**
-   * Read buffer for the header.
+   * Last time we received data on this PUT
+   * connection.
    */
-  char rbuff1[sizeof (MESSAGE_HEADER)];
+  cron_t last_activity;
 
   /**
-   * The read buffer (used only for the actual data).
+   * Read buffer for the header (from PUT)
    */
+  char rbuff1[sizeof (MESSAGE_HEADER)];
+  
+  /**
+   * The read buffer (used only receiving PUT data).
+   */
   char *rbuff2;
-
+  
   /**
-   * The write buffer.
+   * Number of valid bytes in rbuff1
    */
-  char *wbuff;
-
+  unsigned int rpos1;
+  
   /**
-   * Last time this connection was used
+   * Number of valid bytes in rbuff2
    */
-  cron_t lastUse;
+  unsigned int rpos2;
+  
 
   /**
-   * To whom are we talking to (set to our identity
-   * if we are still waiting for the welcome message)
+   * Size of the rbuff2 buffer.
    */
-  PeerIdentity sender;
+  unsigned int rsize2;
 
   /**
-   * number of users of this session
+   * Should we sent a response for this PUT yet?
    */
-  unsigned int users;
+  int ready;
 
   /**
-   * Number of valid bytes in rbuff1
+   * Have we sent a response for this PUT yet?
    */
-  unsigned int rpos1;
+  int done;
 
+};
+
+/**
+ * Transport Session handle.
+ */
+typedef struct
+{
+
   /**
-   * Number of valid bytes in rbuff2
+   * TSession for this session.
    */
-  unsigned int rpos2;
+  TSession * tsession;
 
   /**
-   * Current size of the read buffer rbuff2.
+   * mutex for synchronized access to struct
    */
-  unsigned int rsize2;
+  struct MUTEX *lock;
 
   /**
-   * Current write position in wbuff
+   * Last time this connection was used
    */
-  unsigned int woff;
+  cron_t lastUse;
 
   /**
-   * Number of valid bytes in wbuff (starting at woff)
+   * To whom are we talking to (set to our identity
+   * if we are still waiting for the welcome message)
    */
-  unsigned int wpos;
+  PeerIdentity sender;
 
   /**
-   * Size of the write buffer.
+   * number of users of this session
    */
-  unsigned int wsize;
+  unsigned int users;
 
   /**
    * Has this session been destroyed?
@@ -178,16 +230,13 @@
   int destroyed;
 
   /**
-   * Are we client or server?
+   * Are we client or server?  Determines which of the
+   * structs in the union below is being used for this
+   * connection!
    */
   int is_client;
 
   /**
-   * TSession for this session.
-   */
-  TSession *tsession;
-
-  /**
    * Data maintained for the http client-server connection
    * (depends on if we are client or server).
    */
@@ -196,23 +245,92 @@
 
     struct
     {
+      /**
+       * Active PUT requests (linked list).
+       */
+      struct MHDPutData * puts;
 
+#if DO_GET
       /**
        * GET session response handle
        */
-      struct MHD_Response *get;
+      struct MHD_Response * get;
 
+      /**
+       * The write buffer (for sending GET response)
+       */
+      char * wbuff;
+
+      /**
+       * What was the last time we were able to
+       * transmit data using the current get handle?
+       */
+      cron_t last_get_activity;
+
+      /**
+       * Current write position in wbuff
+       */
+      unsigned int woff;
+      
+      /**
+       * Number of valid bytes in wbuff (starting at woff)
+       */
+      unsigned int wpos;
+
+      /**
+       * Size of the write buffer.
+       */
+      unsigned int wsize;
+#endif
+
     } server;
 
     struct
     {
 
       /**
+       * Address of the other peer.
+       */
+      HostAddress address;
+
+#if DO_GET
+      /**
+       * Last time the GET was active.
+       */
+      cron_t last_get_activity;
+
+      /**
        * GET operation
        */
       CURL *get;
 
       /**
+       * Read buffer for the header (from GET).
+       */
+      char rbuff1[sizeof (MESSAGE_HEADER)];
+      
+      /**
+       * The read buffer (used only receiving GET data).
+       */
+      char *rbuff2;
+      
+      /**
+       * Number of valid bytes in rbuff1
+       */
+      unsigned int rpos1;
+      
+      /**
+       * Number of valid bytes in rbuff2
+       */
+      unsigned int rpos2;
+      
+      /**
+       * Current size of the read buffer rbuff2.
+       */
+      unsigned int rsize2;
+#endif
+
+      /**
        * URL of the get and put operations.
        */
       char *url;
@@ -243,6 +361,8 @@
 
 static int stat_bytesDropped;
 
+static int signal_pipe[2];
+
 static char *proxy;
 
 /**
@@ -270,8 +390,14 @@
  */
 static TSession **tsessions;
 
+/**
+ * Number of valid entries in tsessions.
+ */
 static unsigned int tsessionCount;
 
+/**
+ * Sie of the tsessions array.
+ */
 static unsigned int tsessionArrayLength;
 
 /**
@@ -284,7 +410,6 @@
  */
 static UPnP_ServiceAPI *upnp;
 
-
 /**
  * Lock for access to mutable state of the module,
  * that is the configuration and the tsessions array.
@@ -299,7 +424,17 @@
  */
 static struct MUTEX *httplock;
 
+
 /**
+ * Signal select thread that its selector
+ * set may have changed.
+ */
+static void signal_select() {
+  static char c;
+  write(signal_pipe[1], &c, sizeof(c));
+}
+
+/**
  * Check if we are allowed to connect to the given IP.
  */
 static int
@@ -319,13 +454,30 @@
     }
   else
     {
+#if DEBUG_HTTP
+      GE_LOG(coreAPI->ectx,
+            GE_DEBUG | GE_DEVELOPER | GE_BULK,
+            "Rejecting HTTP connection\n");
+#endif
       return MHD_NO;
     }
   MUTEX_LOCK (httplock);
   ret = check_ipv4_listed (filteredNetworks_, ip);
   MUTEX_UNLOCK (httplock);
-  if (YES == ret)
-    return MHD_NO;
+  if (YES == ret) 
+    {
+#if DEBUG_HTTP
+      GE_LOG(coreAPI->ectx,
+            GE_DEBUG | GE_DEVELOPER | GE_BULK,
+            "Rejecting HTTP connection\n");
+#endif
+      return MHD_NO;
+    }
+#if DEBUG_HTTP
+  GE_LOG(coreAPI->ectx,
+        GE_DEBUG | GE_DEVELOPER | GE_BULK,
+        "Accepting HTTP connection\n");
+#endif
   return MHD_YES;
 }
 
@@ -335,6 +487,9 @@
  * For the core, aquiration means to call associate or
  * connect. The number of disconnects must match the
  * number of calls to connect+associate.
+ * 
+ * Sessions are actually discarded in cleanup_connections.
+ * 
  *
  * @param tsession the session that is closed
  * @return OK on success, SYSERR if the operation failed
@@ -343,10 +498,6 @@
 httpDisconnect (TSession * tsession)
 {
   HTTPSession *httpsession = tsession->internal;
-  struct HTTPPutData *pos;
-  struct HTTPPutData *next;
-  int i;
-
   if (httpsession == NULL)
     {
       FREE (tsession);
@@ -354,13 +505,19 @@
     }
   MUTEX_LOCK (httpsession->lock);
   httpsession->users--;
-  if (httpsession->users > 0)
-    {
-      MUTEX_UNLOCK (httpsession->lock);
-      return OK;
-    }
-  httpsession->destroyed = YES;
   MUTEX_UNLOCK (httpsession->lock);
+  return OK;
+}
+
+static void 
+destroy_tsession(TSession * tsession) {
+  HTTPSession *httpsession = tsession->internal;
+  struct HTTPPutData *pos;
+  struct HTTPPutData *next;
+  struct MHDPutData * mpos;
+  struct MHDPutData * mnext;
+  int i;
+  
   MUTEX_LOCK (httplock);
   for (i = 0; i < tsessionCount; i++)
     {
@@ -373,31 +530,52 @@
   MUTEX_UNLOCK (httplock);
   if (httpsession->is_client)
     {
+#if DO_GET
       curl_multi_remove_handle (curl_multi, httpsession->cs.client.get);
+      signal_select();
       curl_easy_cleanup (httpsession->cs.client.get);
+      GROW (httpsession->cs.client.rbuff2, 
+           httpsession->cs.client.rsize2, 0);
+#endif
       FREE (httpsession->cs.client.url);
       pos = httpsession->cs.client.puts;
       while (pos != NULL)
         {
           next = pos->next;
           curl_multi_remove_handle (curl_multi, pos->curl_put);
+         signal_select();
           curl_easy_cleanup (pos->curl_put);
           FREE (pos->msg);
           FREE (pos);
           pos = next;
         }
-
     }
   else
     {
-      MHD_destroy_response (httpsession->cs.server.get);
+#if DO_GET
+      GROW (httpsession->cs.server.wbuff,
+           httpsession->cs.server.wsize, 0);
+      if (httpsession->cs.server.get != NULL) {
+       MHD_destroy_response (httpsession->cs.server.get);
+       httpsession->cs.server.get = NULL;
+      }
+#endif      
+      mpos = httpsession->cs.server.puts;
+      /* this should be NULL already, but just
+        in case it is not, we free it anyway... */
+      while (mpos != NULL) {
+       mnext = mpos->next;
+       GROW(mpos->rbuff2,
+            mpos->rsize2,
+            0);        
+       FREE(mpos);
+       mpos = mnext;
+      }
+      
     }
-  GROW (httpsession->rbuff2, httpsession->rsize2, 0);
-  GROW (httpsession->wbuff, httpsession->wsize, 0);
   MUTEX_DESTROY (httpsession->lock);
   FREE (httpsession);
   FREE (tsession);
-  return OK;
 }
 
 /**
@@ -479,8 +657,10 @@
       (ntohs (hello->protocol) != HTTP_PROTOCOL_NUMBER) ||
       (MHD_NO == acceptPolicyCallback (NULL,
                                        (const struct sockaddr *) haddr,
-                                       sizeof (IPaddr))))
+                                       sizeof (IPaddr)))) {
+    GE_BREAK_OP(NULL, 0);
     return SYSERR;              /* obviously invalid */
+  }
   return OK;
 }
 
@@ -557,10 +737,22 @@
   return i;
 }
 
+#if DO_GET
+/**
+ * Callback for processing GET requests if our side is the
+ * MHD HTTP server.
+ *
+ * @param cls the HTTP session
+ * @param pos read-offset in the stream
+ * @param buf where to write the data
+ * @param max how much data to write (at most)
+ * @return number of bytes written, 0 is allowed!
+ */
 static int
 contentReaderCallback (void *cls, size_t pos, char *buf, int max)
 {
   HTTPSession *session = cls;
+  cron_t now;
 
   MUTEX_LOCK (session->lock);
   if (session->destroyed)
@@ -568,18 +760,28 @@
       MUTEX_UNLOCK (session->lock);
       return -1;
     }
-  if (session->wpos < max)
-    max = session->wpos;
-  memcpy (buf, &session->wbuff[session->woff], max);
-  session->wpos -= max;
-  session->woff += max;
-  session->lastUse = get_time ();
-  if (session->wpos == 0)
-    session->woff = 0;
+  if (session->cs.server.wpos < max)
+    max = session->cs.server.wpos;
+  memcpy (buf, &session->cs.server.wbuff[session->cs.server.woff], max);
+  session->cs.server.wpos -= max;
+  session->cs.server.woff += max;
+  now = get_time ();
+  session->lastUse = now;
+  session->cs.server.last_get_activity = now;
+  if (session->cs.server.wpos == 0)
+    session->cs.server.woff = 0;
   MUTEX_UNLOCK (session->lock);
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+          GE_DEBUG | GE_REQUEST | GE_USER,
+          "HTTP returns %u bytes in MHD GET handler.\n",
+          max);
+#endif
   return max;
 }
+#endif
 
+#if DO_GET
 /**
  * Notification that libmicrohttpd no longer needs the
  * response object.
@@ -587,15 +789,18 @@
 static void
 contentReaderFreeCallback (void *cls)
 {
-  HTTPSession *session = cls;
+  HTTPSession * session = cls;
 
-  session->destroyed = YES;
+  GE_ASSERT(NULL, session->cs.server.get == NULL);
 }
+#endif
 
 /**
- * Create a new session for an inbound connection on the given
- * socket. Adds the session to the array of sessions watched
- * by the select thread.
+ * Process GET or PUT request received via MHD.  For
+ * GET, queue response that will send back our pending
+ * messages.  For PUT, process incoming data and send
+ * to GNUnet core.  In either case, check if a session
+ * already exists and create a new one if not.
  */
 static int
 accessHandlerCallback (void *cls,
@@ -606,8 +811,9 @@
                        const char *upload_data,
                        unsigned int *upload_data_size)
 {
-  TSession *tsession;
-  struct MHD_Response *response;
+  TSession * tsession;
+  struct MHD_Response * response;
+  struct MHDPutData * put;
   HTTPSession *httpSession;
   HashCode512 client;
   int i;
@@ -617,123 +823,192 @@
   unsigned int cpy;
   unsigned int poff;
 
+  /* convert URL to sender peer id */
   if ((strlen (url) < 2) || (OK != enc2hash (&url[1], &client)))
     {
+      /* invalid request */
+      GE_BREAK_OP(NULL, 0);
       return MHD_NO;
     }
 
   /* check if we already have a session for this */
+  httpSession = NULL;
   MUTEX_LOCK (httplock);
   for (i = 0; i < tsessionCount; i++)
     {
       tsession = tsessions[i];
       httpSession = tsession->internal;
-      if (0 == memcmp (&httpSession->sender, &client, sizeof (HashCode512)))
+      if ( (0 == memcmp (&httpSession->sender, &client, sizeof (HashCode512))) 
&&
+          (httpSession->is_client == NO) )
         break;
       tsession = NULL;
       httpSession = NULL;
     }
-  if (tsession != NULL)
-    {
-      MUTEX_LOCK (httpSession->lock);
-      httpSession->users++;
-      MUTEX_UNLOCK (httpSession->lock);
-    }
   MUTEX_UNLOCK (httplock);
 
+  /* create new session if necessary */
   if (httpSession == NULL)
     {
+#if DEBUG_HTTP
+      GE_LOG (coreAPI->ectx,
+             GE_DEBUG | GE_REQUEST | GE_USER,
+             "HTTP/MHD creates new session for request from `%s'.\n",
+             &url[1]);
+#endif 
       httpSession = MALLOC (sizeof (HTTPSession));
       memset (httpSession, 0, sizeof (HTTPSession));
-      httpSession->sender = *(coreAPI->myIdentity);
+      httpSession->sender.hashPubKey = client;
       httpSession->lock = MUTEX_CREATE (YES);
-      httpSession->users = 1;   /* us only, core has not seen this tsession! */
-      httpSession->lastUse = get_time ();
+      httpSession->users = 0;   /* nobody yet */
       tsession = MALLOC (sizeof (TSession));
       memset (tsession, 0, sizeof (TSession));
       tsession->ttype = HTTP_PROTOCOL_NUMBER;
       tsession->internal = httpSession;
-      tsession->peer = *(coreAPI->myIdentity);
+      tsession->peer.hashPubKey = client;
       httpSession->tsession = tsession;
       addTSession (tsession);
     }
-  if (0 == strcmp ("GET", method))
+  MUTEX_LOCK (httpSession->lock);
+  httpSession->lastUse = get_time ();
+#if DO_GET
+  if (0 == strcasecmp (MHD_HTTP_METHOD_GET, method))
     {
-      /* handle get */
-      response = MHD_create_response_from_callback (-1,
-                                                   64 * 1024,
-                                                    contentReaderCallback,
-                                                    httpSession,
-                                                    contentReaderFreeCallback);
-      httpSession->cs.client.get = response;
+#if DEBUG_HTTP
+      GE_LOG (coreAPI->ectx,
+             GE_DEBUG | GE_REQUEST | GE_USER,
+             "HTTP/MHD receives GET request from `%s'.\n",
+             &url[1]);
+#endif
+
+      /* handle get; create response object if we do not 
+        have one already */
+      response = httpSession->cs.server.get;      
+      if (response == NULL) {
+       response = MHD_create_response_from_callback (-1,
+                                                     64 * 1024,
+                                                     contentReaderCallback,
+                                                     httpSession,
+                                                     
contentReaderFreeCallback);
+       httpSession->cs.server.get = response;
+      }
+      httpSession->cs.server.last_get_activity = get_time();
       MHD_queue_response (session, MHD_HTTP_OK, response);
+      MUTEX_UNLOCK (httpSession->lock);
+      return MHD_YES;
     }
-  else if (0 == strcmp ("PUT", method))
+#endif
+  if (0 == strcasecmp (MHD_HTTP_METHOD_PUT, method))
     {
+#if DEBUG_HTTP
+      GE_LOG (coreAPI->ectx,
+             GE_DEBUG | GE_REQUEST | GE_USER,
+             "HTTP/MHD receives PUT request from `%s' with %u bytes.\n",
+             &url[1],
+             *upload_data_size);
+#endif
+      put = httpSession->cs.server.puts;
+      while ( (put != NULL) &&
+             (put->session != session) )
+       put = put->next;
+      if (put == NULL) {
+       put = MALLOC(sizeof(struct MHDPutData));
+       memset(put, 0, sizeof(struct MHDPutData));
+       put->next = httpSession->cs.server.puts;
+       httpSession->cs.server.puts = put;
+       put->session = session;
+      }
+      put->last_activity = get_time();
+       
       /* handle put (upload_data!) */
-      MUTEX_LOCK (httpSession->lock);
       poff = 0;
       have = *upload_data_size;
       *upload_data_size = 0;    /* we will always process everything */
+      if ( (have == 0) &&
+          (put->done == NO) &&
+          (put->ready == YES) ) {
+       put->done = YES;
+       /* end of upload, send response! */
+#if DEBUG_HTTP
+       GE_LOG (coreAPI->ectx,
+               GE_DEBUG | GE_REQUEST | GE_USER,
+               "HTTP/MHD queues dummy response to completed PUT request.\n");
+#endif
+       response = MHD_create_response_from_data (strlen(HTTP_PUT_RESPONSE),
+                                                 HTTP_PUT_RESPONSE,
+                                                 MHD_NO,
+                                                 MHD_NO);
+       MHD_queue_response (session, MHD_HTTP_OK, response);
+       MHD_destroy_response(response);
+       MUTEX_UNLOCK (httpSession->lock);
+       return MHD_YES;
+      }
       while (have > 0)
         {
-          if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+         put->ready = NO;
+          if (put->rpos1 < sizeof (MESSAGE_HEADER))
             {
-              cpy = sizeof (MESSAGE_HEADER) - httpSession->rpos1;
+              cpy = sizeof (MESSAGE_HEADER) - put->rpos1;
               if (cpy > have)
                 cpy = have;
-              memcpy (&httpSession->rbuff1[httpSession->rpos1],
+              memcpy (&put->rbuff1[put->rpos1],
                       &upload_data[poff], cpy);
-              httpSession->rpos1 += cpy;
+              put->rpos1 += cpy;
               have -= cpy;
               poff += cpy;
-              httpSession->rpos2 = 0;
+              put->rpos2 = 0;
             }
-          if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+          if (put->rpos1 < sizeof (MESSAGE_HEADER))
             break;
-          hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
-          GROW (httpSession->rbuff2,
-                httpSession->rsize2,
+          hdr = (MESSAGE_HEADER *) put->rbuff1;
+          GROW (put->rbuff2,
+                put->rsize2,
                 ntohs (hdr->size) - sizeof (MESSAGE_HEADER));
-          if (httpSession->rpos2 <
+          if (put->rpos2 <
               ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
             {
               cpy =
                 ntohs (hdr->size) - sizeof (MESSAGE_HEADER) -
-                httpSession->rpos2;
+                put->rpos2;
               if (cpy > have)
                 cpy = have;
-              memcpy (&httpSession->rbuff2[httpSession->rpos2],
+              memcpy (&put->rbuff2[put->rpos2],
                       &upload_data[poff], cpy);
               have -= cpy;
               poff += cpy;
-              httpSession->rpos2 += cpy;
+              put->rpos2 += cpy;
             }
-          if (httpSession->rpos2 <
+          if (put->rpos2 <
               ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
             break;
           mp = MALLOC (sizeof (P2P_PACKET));
-          mp->msg = httpSession->rbuff2;
+          mp->msg = put->rbuff2;
           mp->sender = httpSession->sender;
           mp->tsession = httpSession->tsession;
           mp->size = ntohs (hdr->size) - sizeof (MESSAGE_HEADER);
+#if DEBUG_HTTP
+       GE_LOG (coreAPI->ectx,
+               GE_DEBUG | GE_REQUEST | GE_USER,
+               "HTTP/MHD passes %u bytes to core (received via PUT 
request).\n",
+               mp->size);
+#endif
           coreAPI->receive (mp);
-          httpSession->rbuff2 = NULL;
-          httpSession->rpos2 = 0;
-          httpSession->rsize2 = 0;
-          httpSession->rpos1 = 0;
+          put->rbuff2 = NULL;
+          put->rpos2 = 0;
+          put->rsize2 = 0;
+          put->rpos1 = 0;
+         put->ready = YES;
         }
       MUTEX_UNLOCK (httpSession->lock);
+      return MHD_YES;
     }
-  else
-    {
-      return MHD_NO;            /* must be get or put! */
-    }
-  return MHD_YES;
+  MUTEX_UNLOCK (httpSession->lock);
+  GE_BREAK_OP(NULL, 0); /* invalid request */
+  return MHD_NO; 
 }
 
+#if DO_GET
 /**
- * Process downloaded bits
+ * Process downloaded bits (from GET via CURL).
  */
 static size_t
 receiveContentCallback (void *ptr, size_t size, size_t nmemb, void *ctx)
@@ -746,58 +1021,66 @@
   MESSAGE_HEADER *hdr;
   P2P_PACKET *mp;
 
-  printf ("Receiving %u bytes from GET\n", have);
+  httpSession->cs.client.last_get_activity = get_time();
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/CURL receives %u bytes as response to GET.\n",
+         size * nmemb);
+#endif
   while (have > 0)
     {
-      if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
+      if (httpSession->cs.client.rpos1 < sizeof (MESSAGE_HEADER))
         {
-          cpy = sizeof (MESSAGE_HEADER) - httpSession->rpos1;
+          cpy = sizeof (MESSAGE_HEADER) - httpSession->cs.client.rpos1;
           if (cpy > have)
             cpy = have;
-          memcpy (&httpSession->rbuff1[httpSession->rpos1],
+          memcpy (&httpSession->cs.client.rbuff1[httpSession->cs.client.rpos1],
                   &inbuf[poff], cpy);
-          httpSession->rpos1 += cpy;
+          httpSession->cs.client.rpos1 += cpy;
           have -= cpy;
           poff += cpy;
-          httpSession->rpos2 = 0;
+          httpSession->cs.client.rpos2 = 0;
         }
-      if (httpSession->rpos1 < sizeof (MESSAGE_HEADER))
-        return size * nmemb;
-      hdr = (MESSAGE_HEADER *) httpSession->rbuff1;
-      GROW (httpSession->rbuff2,
-            httpSession->rsize2, ntohs (hdr->size) - sizeof (MESSAGE_HEADER));
+      if (httpSession->cs.client.rpos1 < sizeof (MESSAGE_HEADER))
+        break;
+      hdr = (MESSAGE_HEADER *) httpSession->cs.client.rbuff1;
+      GROW (httpSession->cs.client.rbuff2,
+            httpSession->cs.client.rsize2, ntohs (hdr->size) - sizeof 
(MESSAGE_HEADER));
       printf ("Expecting message of %u bytes via GET\n", ntohs (hdr->size));
-      if (httpSession->rpos2 < ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
+      if (httpSession->cs.client.rpos2 < ntohs (hdr->size) - sizeof 
(MESSAGE_HEADER))
         {
           cpy =
-            ntohs (hdr->size) - sizeof (MESSAGE_HEADER) - httpSession->rpos2;
+            ntohs (hdr->size) - sizeof (MESSAGE_HEADER) - 
httpSession->cs.client.rpos2;
           if (cpy > have)
             cpy = have;
-          memcpy (&httpSession->rbuff2[httpSession->rpos2],
+          memcpy (&httpSession->cs.client.rbuff2[httpSession->cs.client.rpos2],
                   &inbuf[poff], cpy);
           have -= cpy;
           poff += cpy;
-          httpSession->rpos2 += cpy;
+          httpSession->cs.client.rpos2 += cpy;
         }
-      if (httpSession->rpos2 < ntohs (hdr->size) - sizeof (MESSAGE_HEADER))
-        return size * nmemb;
+      if (httpSession->cs.client.rpos2 < ntohs (hdr->size) - sizeof 
(MESSAGE_HEADER))
+        break;
       mp = MALLOC (sizeof (P2P_PACKET));
-      mp->msg = httpSession->rbuff2;
+      mp->msg = httpSession->cs.client.rbuff2;
       mp->sender = httpSession->sender;
       mp->tsession = httpSession->tsession;
       mp->size = ntohs (hdr->size) - sizeof (MESSAGE_HEADER);
       printf ("Passing message from GET to core!\n");
       coreAPI->receive (mp);
-      httpSession->rbuff2 = NULL;
-      httpSession->rpos2 = 0;
-      httpSession->rsize2 = 0;
-      httpSession->rpos1 = 0;
+      httpSession->cs.client.rbuff2 = NULL;
+      httpSession->cs.client.rpos2 = 0;
+      httpSession->cs.client.rsize2 = 0;
+      httpSession->cs.client.rpos1 = 0;
     }
   return size * nmemb;
 }
+#endif
 
 /**
- * Provide bits for upload
+ * Provide bits for upload: we're using CURL for a PUT request
+ * and now need to provide data from the message we are transmitting.
  */
 static size_t
 sendContentCallback (void *ptr, size_t size, size_t nmemb, void *ctx)
@@ -805,87 +1088,80 @@
   struct HTTPPutData *put = ctx;
   size_t max = size * nmemb;
 
+  put->last_activity = get_time();
   if (max > put->size - put->pos)
     max = put->size - put->pos;
   memcpy (ptr, &put->msg[put->pos], max);
   put->pos += max;
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/CURL sends %u bytes in PUT request.\n",
+         max);
+#endif
   return max;
 }
 
 #define CURL_EASY_SETOPT(c, a, b) do { ret = curl_easy_setopt(c, a, b); if 
(ret != CURLE_OK) GE_LOG(coreAPI->ectx, GE_WARNING | GE_USER | GE_BULK, _("%s 
failed at %s:%d: `%s'\n"), "curl_easy_setopt", __FILE__, __LINE__, 
curl_easy_strerror(ret)); } while (0);
 
+static void 
+create_session_url(HTTPSession * httpSession) {
+  char *url;
+  EncName enc;
+
+  url = httpSession->cs.client.url;
+  if (url == NULL) {
+    hash2enc (&httpSession->sender.hashPubKey, &enc);
+    url = MALLOC (64 + sizeof (EncName));
+    SNPRINTF (url,
+             64 + sizeof (EncName),
+             "http://%u.%u.%u.%u:%u/%s";,
+             PRIP (ntohl (*(int *) &httpSession->cs.client.address.ip.addr)),
+             ntohs (httpSession->cs.client.address.port), &enc);
+    httpSession->cs.client.url = url;
+  }
+}
+
+#if DO_GET
 /**
- * Establish a connection to a remote node.
- *
- * @param hello the hello-Message for the target node
- * @param tsessionPtr the session handle that is set
- * @return OK on success, SYSERR if the operation failed
+ * Try to do a GET on the other peer of the given
+ * http session.
+ * 
+ * @return OK on success, SYSERR on error
  */
 static int
-httpConnect (const P2P_hello_MESSAGE * hello, TSession ** tsessionPtr,
-             int may_reuse)
-{
-  const HostAddress *haddr = (const HostAddress *) &hello[1];
-  TSession *tsession;
-  HTTPSession *httpSession;
+create_curl_get(HTTPSession * httpSession) {
   CURL *curl_get;
   CURLcode ret;
   CURLMcode mret;
-  char *url;
-  EncName enc;
-  int i;
 
-  /* check if we have a session pending for this peer */
-  tsession = NULL;
-  MUTEX_LOCK (httplock);
-  for (i = 0; i < tsessionCount; i++)
-    {
-      if (0 == memcmp (&hello->senderIdentity,
-                       &tsessions[i]->peer, sizeof (PeerIdentity)))
-        {
-          tsession = tsessions[i];
-          break;
-        }
-    }
-  if ((tsession != NULL) && (OK == httpAssociate (tsession)))
-    {
-      *tsessionPtr = tsession;
-      MUTEX_UNLOCK (httplock);
-      return OK;
-    }
-  MUTEX_UNLOCK (httplock);
-
-  /* no session pending, initiate a new one! */
+  curl_get = httpSession->cs.client.get;
+  if (curl_get != NULL) {
+    curl_multi_remove_handle(curl_multi,
+                            curl_get);
+    signal_select();
+    curl_easy_cleanup(curl_get);                            
+    httpSession->cs.client.get = NULL;
+  }    
   curl_get = curl_easy_init ();
   if (curl_get == NULL)
     return SYSERR;
-
-  hash2enc (&hello->senderIdentity.hashPubKey, &enc);
-  url = MALLOC (64 + sizeof (EncName));
-  SNPRINTF (url,
-            64 + sizeof (EncName),
-            "http://%u.%u.%u.%u:%u/%s";,
-            PRIP (ntohl (*(int *) &haddr->ip.addr)),
-            ntohs (haddr->port), &enc);
-
   /* create GET */
   CURL_EASY_SETOPT (curl_get, CURLOPT_FAILONERROR, 1);
-  CURL_EASY_SETOPT (curl_get, CURLOPT_URL, url);
+  CURL_EASY_SETOPT (curl_get, CURLOPT_URL, httpSession->cs.client.url);
   if (strlen (proxy) > 0)
     CURL_EASY_SETOPT (curl_get, CURLOPT_PROXY, proxy);
   CURL_EASY_SETOPT (curl_get, CURLOPT_BUFFERSIZE, 32 * 1024);
-  if (0 == strncmp (url, "http", 4))
+  if (0 == strncmp (httpSession->cs.client.url, "http", 4))
     CURL_EASY_SETOPT (curl_get, CURLOPT_USERAGENT, "GNUnet-http");
   CURL_EASY_SETOPT (curl_get, CURLOPT_CONNECTTIMEOUT, 150L);
   CURL_EASY_SETOPT (curl_get, CURLOPT_TIMEOUT, 150L);
   CURL_EASY_SETOPT (curl_get, CURLOPT_WRITEFUNCTION, &receiveContentCallback);
-
-  httpSession = MALLOC (sizeof (HTTPSession));
-  memset (httpSession, 0, sizeof (HTTPSession));
-  httpSession->cs.client.url = url;
   CURL_EASY_SETOPT (curl_get, CURLOPT_WRITEDATA, httpSession);
-  if (ret != CURLE_OK)
-    goto cleanup;
+  if (ret != CURLE_OK) {
+    curl_easy_cleanup(curl_get);                            
+    return SYSERR;
+  }
   mret = curl_multi_add_handle (curl_multi, curl_get);
   if (mret != CURLM_OK)
     {
@@ -894,60 +1170,174 @@
               _("%s failed at %s:%d: `%s'\n"),
               "curl_multi_add_handle",
               __FILE__, __LINE__, curl_multi_strerror (mret));
-      goto cleanup;
+      curl_easy_cleanup(curl_get);                          
+      return SYSERR;
     }
+  signal_select();
+  httpSession->cs.client.last_get_activity = get_time();
+  httpSession->cs.client.get = curl_get;
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/CURL initiated GET request.\n");
+#endif
+  return OK;
+}
+#endif
 
-  /* create SESSION */
+/**
+ * Establish a connection to a remote node.
+ *
+ * @param hello the hello-Message for the target node
+ * @param tsessionPtr the session handle that is set
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int
+httpConnect (const P2P_hello_MESSAGE * hello, TSession ** tsessionPtr,
+             int may_reuse)
+{
+  const HostAddress *haddr = (const HostAddress *) &hello[1];
+  TSession *tsession;
+  HTTPSession *httpSession;
+  int i;
+
+  /* check if we have a session pending for this peer */
+  tsession = NULL;
+  if (may_reuse) {
+    MUTEX_LOCK (httplock);  
+    for (i = 0; i < tsessionCount; i++)
+      {
+       if (0 == memcmp (&hello->senderIdentity,
+                        &tsessions[i]->peer, sizeof (PeerIdentity)))
+         {
+           tsession = tsessions[i];
+           break;
+         }
+      }
+    if ((tsession != NULL) && (OK == httpAssociate (tsession)))
+      {
+       *tsessionPtr = tsession;
+       MUTEX_UNLOCK (httplock);
+       return OK;
+      }
+    MUTEX_UNLOCK (httplock);
+  }
+  /* no session pending, initiate a new one! */
+  httpSession = MALLOC (sizeof (HTTPSession));  
+  memset (httpSession, 0, sizeof (HTTPSession));
   httpSession->sender = hello->senderIdentity;
   httpSession->lock = MUTEX_CREATE (YES);
   httpSession->users = 1;       /* us only, core has not seen this tsession! */
   httpSession->lastUse = get_time ();
   httpSession->is_client = YES;
-  httpSession->cs.client.get = curl_get;
+  httpSession->cs.client.address = *haddr;
   tsession = MALLOC (sizeof (TSession));
   memset (tsession, 0, sizeof (TSession));
   httpSession->tsession = tsession;
   tsession->ttype = HTTP_PROTOCOL_NUMBER;
   tsession->internal = httpSession;
+  create_session_url(httpSession);
+#if DO_GET
+  if (OK != create_curl_get(httpSession)) {
+    FREE(tsession);
+    FREE(httpSession);
+    return SYSERR;
+  }
+#endif
+  /* PUTs will be created as needed */
   addTSession (tsession);
   *tsessionPtr = tsession;
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/CURL initiated connection to `%s'.\n",
+         httpSession->cs.client.url);
+#endif
   return OK;
-cleanup:
-  curl_easy_cleanup (curl_get);
-  FREE (url);
-  FREE (proxy);
-  FREE (httpSession);
-  return SYSERR;
 }
 
-static CURL *
+/**
+ * We received the "Thank you!" response to a PUT.
+ * Discard the data (not useful) and mark the PUT
+ * operation as completed.
+ */
+static size_t
+discardContentCallback(void * data,
+                      size_t size,
+                      size_t nmemb,
+                      void * put_cls) {
+  struct HTTPPutData * put = put_cls;
+  /* this condition should pretty much always be
+     true; just checking here in case the PUT 
+     response comes early somehow */
+  if (put->pos == put->size) 
+   put->done = YES;  
+  return size * nmemb;
+}
+
+/**
+ * Create a new PUT request for the given PUT data.
+ */
+static int
 create_curl_put (HTTPSession * httpSession,
-                 struct HTTPPutData *put, unsigned int size)
+                 struct HTTPPutData *put)
 {
   CURL *curl_put;
   CURLcode ret;
+  CURLMcode mret;
+  long size;
 
+  /* we should have initiated a GET earlier,
+     so URL must not be NULL here */
+  GE_ASSERT(NULL,
+           httpSession->cs.client.url != NULL);
   curl_put = curl_easy_init ();
   if (curl_put == NULL)
-    return NULL;
+    return SYSERR;
   CURL_EASY_SETOPT (curl_put, CURLOPT_FAILONERROR, 1);
   CURL_EASY_SETOPT (curl_put, CURLOPT_URL, httpSession->cs.client.url);
   if (strlen (proxy) > 0)
     CURL_EASY_SETOPT (curl_put, CURLOPT_PROXY, proxy);
-  CURL_EASY_SETOPT (curl_put, CURLOPT_BUFFERSIZE, 32 * 1024);
+  CURL_EASY_SETOPT (curl_put, CURLOPT_BUFFERSIZE, put->size);
   if (0 == strncmp (httpSession->cs.client.url, "http", 4))
     CURL_EASY_SETOPT (curl_put, CURLOPT_USERAGENT, "GNUnet-http");
   CURL_EASY_SETOPT (curl_put, CURLOPT_UPLOAD, 1);
+#if 0
+  CURL_EASY_SETOPT (curl_put, CURLOPT_VERBOSE, 1);
+#endif
   CURL_EASY_SETOPT (curl_put, CURLOPT_CONNECTTIMEOUT, 150L);
-  CURL_EASY_SETOPT (curl_put, CURLOPT_INFILESIZE_LARGE, size);
+  size = put->size;
+  CURL_EASY_SETOPT (curl_put, CURLOPT_INFILESIZE, size);
   CURL_EASY_SETOPT (curl_put, CURLOPT_READFUNCTION, &sendContentCallback);
   CURL_EASY_SETOPT (curl_put, CURLOPT_READDATA, put);
+  CURL_EASY_SETOPT (curl_put, CURLOPT_WRITEFUNCTION, &discardContentCallback);
+  CURL_EASY_SETOPT (curl_put, CURLOPT_WRITEDATA, put);
+  CURL_EASY_SETOPT (curl_put, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
   if (ret != CURLE_OK)
     {
       curl_easy_cleanup (curl_put);
-      return NULL;
+      return SYSERR;
     }
-  return curl_put;
+  mret = curl_multi_add_handle (curl_multi, curl_put);
+  if (mret != CURLM_OK)
+    {
+      GE_LOG (coreAPI->ectx,
+             GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+             _("%s failed at %s:%d: `%s'\n"),
+             "curl_multi_add_handle",
+             __FILE__, __LINE__, curl_multi_strerror (mret));
+      MUTEX_UNLOCK (httplock);
+      return SYSERR;
+    }
+  signal_select();
+  put->curl_put = curl_put;
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/CURL initiated PUT request to `%s'.\n",
+         httpSession->cs.client.url);
+#endif
+  return OK;
 }
 
 /**
@@ -964,13 +1354,14 @@
 {
   HTTPSession *httpSession = tsession->internal;
   struct HTTPPutData *putData;
-  CURL *curl_put;
-  CURLMcode mret;
   MESSAGE_HEADER *hdr;
+#if DO_GET
   char *tmp;
+#endif
 
   if (httpSession->is_client)
     {
+      /* we need to do a PUT (we are the client) */
       if (size >= MAX_BUFFER_SIZE)
         return SYSERR;
       if (size == 0)
@@ -978,6 +1369,17 @@
           GE_BREAK (NULL, 0);
           return SYSERR;
         }
+      if (important != YES) {
+       MUTEX_LOCK (httpSession->lock);
+       if (httpSession->cs.client.puts != NULL) {
+         /* do not queue more than one unimportant PUT at a time */
+         if (httpSession->cs.client.puts->done == YES)
+           signal_select(); /* do clean up now! */
+         MUTEX_UNLOCK (httpSession->lock);
+         return NO;
+       }
+       MUTEX_UNLOCK (httpSession->lock);
+      }      
       putData = MALLOC (sizeof (struct HTTPPutData));
       memset (putData, 0, sizeof (struct HTTPPutData));
       putData->msg = MALLOC (size + sizeof (MESSAGE_HEADER));
@@ -986,84 +1388,75 @@
       hdr->type = htons (0);
       memcpy (&putData->msg[sizeof (MESSAGE_HEADER)], msg, size);
       putData->size = size + sizeof (MESSAGE_HEADER);
+      if (OK != create_curl_put (httpSession,
+                                putData)) {
+       FREE (putData->msg);
+       FREE (putData);
+       return SYSERR;
+      }
       MUTEX_LOCK (httpSession->lock);
-      curl_put = create_curl_put (httpSession,
-                                  putData, size + sizeof (MESSAGE_HEADER));
-      if (curl_put == NULL)
-        {
-          MUTEX_UNLOCK (httpSession->lock);
-          FREE (putData->msg);
-          FREE (putData);
-          return SYSERR;
-        }
-      putData->curl_put = curl_put;
       putData->next = httpSession->cs.client.puts;
       httpSession->cs.client.puts = putData;
       MUTEX_UNLOCK (httpSession->lock);
-      MUTEX_LOCK (httplock);
-      mret = curl_multi_add_handle (curl_multi, curl_put);
-      if (mret != CURLM_OK)
-        {
-          GE_LOG (coreAPI->ectx,
-                  GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
-                  _("%s failed at %s:%d: `%s'\n"),
-                  "curl_multi_add_handle",
-                  __FILE__, __LINE__, curl_multi_strerror (mret));
-          putData->done = YES;
-          MUTEX_UNLOCK (httplock);
-          return SYSERR;
-        }
-      MUTEX_UNLOCK (httplock);
       return OK;
     }
+  
+  /* httpSession->isClient == false, respond to a GET (we
+     hopefully have one or will have one soon) */
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP/MHD queues %u bytes to be sent as response to GET as soon as 
possible.\n",
+         size);
+#endif
+#if DO_GET
+  MUTEX_LOCK (httpSession->lock);
+  if (httpSession->cs.server.wsize == 0)
+    GROW (httpSession->cs.server.wbuff, httpSession->cs.server.wsize, 
HTTP_BUF_SIZE);
+  if (httpSession->cs.server.wpos + size > httpSession->cs.server.wsize)
+    {
+      /* need to grow or discard */
+      if (!important)
+       {
+         MUTEX_UNLOCK (httpSession->lock);
+         return NO;
+       }
+      tmp = MALLOC (httpSession->cs.server.wpos + size);
+      memcpy (tmp,
+             &httpSession->cs.server.wbuff[httpSession->cs.server.woff], 
httpSession->cs.server.wpos);
+      FREE (httpSession->cs.server.wbuff);
+      httpSession->cs.server.wbuff = tmp;
+      httpSession->cs.server.wsize = httpSession->cs.server.wpos + size;
+      httpSession->cs.server.woff = 0;
+      httpSession->cs.server.wpos = httpSession->cs.server.wpos + size;
+    }
   else
-    {                           /* httpSession->isClient == false */
-      MUTEX_LOCK (httpSession->lock);
-      if (httpSession->wsize == 0)
-        GROW (httpSession->wbuff, httpSession->wsize, HTTP_BUF_SIZE);
-      if (httpSession->wpos + size > httpSession->wsize)
-        {
-          /* need to grow or discard */
-          if (!important)
-            {
-              MUTEX_UNLOCK (httpSession->lock);
-              return NO;
-            }
-          tmp = MALLOC (httpSession->wpos + size);
-          memcpy (tmp,
-                  &httpSession->wbuff[httpSession->woff], httpSession->wpos);
-          FREE (httpSession->wbuff);
-          httpSession->wbuff = tmp;
-          httpSession->wsize = httpSession->wpos + size;
-          httpSession->woff = 0;
-          httpSession->wpos = httpSession->wpos + size;
-        }
-      else
-        {
-          /* fits without growing */
-          if (httpSession->wpos + httpSession->woff + size >
-              httpSession->wsize)
-            {
-              /* need to compact first */
-              memmove (httpSession->wbuff,
-                       &httpSession->wbuff[httpSession->woff],
-                       httpSession->wpos);
-              httpSession->woff = 0;
-            }
-          /* append */
-          memcpy (&httpSession->wbuff[httpSession->woff + httpSession->wpos],
-                  msg, size);
-          httpSession->wpos += size;
-        }
-      MUTEX_UNLOCK (httpSession->lock);
-      return OK;
+    {
+      /* fits without growing */
+      if (httpSession->cs.server.wpos + httpSession->cs.server.woff + size >
+         httpSession->cs.server.wsize)
+       {
+         /* need to compact first */
+         memmove (httpSession->cs.server.wbuff,
+                  &httpSession->cs.server.wbuff[httpSession->cs.server.woff],
+                  httpSession->cs.server.wpos);
+         httpSession->cs.server.woff = 0;
+       }
+      /* append */
+      memcpy (&httpSession->cs.server.wbuff[httpSession->cs.server.woff + 
httpSession->cs.server.wpos],
+             msg, size);
+      httpSession->cs.server.wpos += size;
     }
+  MUTEX_UNLOCK (httpSession->lock);
+#endif
+  return OK;
 }
 
 /**
  * Function called to cleanup dead connections
  * (completed PUTs, GETs that have timed out,
- * etc.).
+ * etc.).  Also re-vives GETs that have timed out
+ * if we are still interested in the connection.
  */
 static void
 cleanup_connections ()
@@ -1072,24 +1465,44 @@
   HTTPSession *s;
   struct HTTPPutData *prev;
   struct HTTPPutData *pos;
+  struct MHDPutData * mpos;
+  struct MHDPutData * mprev;
+  cron_t now;
 
   MUTEX_LOCK (httplock);
+  now = get_time();     
   for (i = 0; i < tsessionCount; i++)
     {
       s = tsessions[i]->internal;
       MUTEX_LOCK (s->lock);
       if (s->is_client)
         {
+         if ( (s->cs.client.puts == NULL) &&
+              (s->users == 0)
+#if DO_GET
+              && (s->cs.client.last_get_activity + HTTP_TIMEOUT < now)
+#endif
+              ) {
+           MUTEX_UNLOCK (s->lock);
+#if DO_GET
+#if DEBUG_HTTP
+           GE_LOG (coreAPI->ectx,
+                   GE_DEBUG | GE_REQUEST | GE_USER,
+                   "HTTP transport destroys old (%llu ms) unused client 
session\n",
+                   now - s->cs.client.last_get_activity);
+#endif
+#endif
+           destroy_tsession(tsessions[i]);
+           i--;
+           continue;
+         }
+
           prev = NULL;
           pos = s->cs.client.puts;
           while (pos != NULL)
             {
-              /* FIXME: check if CURL has timed out
-                 the GET operation! If so, clean up!
-                 (and make sure we re-establish GET
-                 as needed!) */
-
-
+             if (pos->last_activity + HTTP_TIMEOUT < now)
+               pos->done = YES;
               if (pos->done)
                 {
                   if (prev == NULL)
@@ -1098,6 +1511,7 @@
                     prev->next = pos->next;
                   FREE (pos->msg);
                   curl_multi_remove_handle (curl_multi, pos->curl_put);
+                 signal_select();        
                   curl_easy_cleanup (pos->curl_put);
                   FREE (pos);
                   if (prev == NULL)
@@ -1109,21 +1523,67 @@
               prev = pos;
               pos = pos->next;
             }
+#if DO_GET
+         if ( (s->cs.client.last_get_activity + HTTP_TIMEOUT < now) &&
+              ( (s->users > 0) ||
+                (s->cs.client.puts != NULL) ) )
+           create_curl_get(s);
+#endif
         }
-      else
-        {
-          /* FIXME: add code to close MHD connection
-             from the server side (timeout!); need
-             to
-             A) tell GET callback to return "end of transmission"
-             B) destroy response object
-           */
-        }
+      else 
+       {
+         mpos = s->cs.server.puts;
+         mprev = NULL;
+         while (mpos != NULL) {
+           if ( (mpos->done == YES) ||
+                (mpos->last_activity + HTTP_TIMEOUT < now) ) {
+             if (mprev == NULL)
+               s->cs.server.puts = mpos->next;
+             else
+               mprev->next = mpos->next;
+             GROW(mpos->rbuff2,
+                  mpos->rsize2,
+                  0); 
+             FREE(mpos);
+             if (mprev == NULL)
+               mpos = s->cs.server.puts;
+             else
+               mpos = mprev->next;
+             continue;
+           }
+           mprev = mpos;
+           mpos = mpos->next;
+         }
+
+         /* ! s->is_client */
+         if ( 
+#if DO_GET
+             (s->cs.server.last_get_activity + HTTP_TIMEOUT < now) &&
+#endif
+              (s->users == 0) 
+              ) {
+           MUTEX_UNLOCK (s->lock);
+#if DO_GET
+#if DEBUG_HTTP
+           GE_LOG (coreAPI->ectx,
+                   GE_DEBUG | GE_REQUEST | GE_USER,
+                   "HTTP transport destroys old (%llu ms) unused server 
session\n",
+                   now - s->cs.server.last_get_activity);
+#endif
+#endif
+           destroy_tsession(tsessions[i]);
+           i--;
+           continue;
+         }
+       }
       MUTEX_UNLOCK (s->lock);
     }
   MUTEX_UNLOCK (httplock);
 }
 
+/**
+ * Thread that runs the CURL and MHD requests.
+ */
 static void *
 curl_runner (void *unused)
 {
@@ -1134,7 +1594,16 @@
   int max;
   struct timeval tv;
   int running;
+  unsigned long long timeout;
+  long ms;
+  int have_tv;
+  char buf[128]; /* for reading from pipe */
 
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP transport select thread started\n");
+#endif
   while (YES == http_running)
     {
       max = 0;
@@ -1153,21 +1622,50 @@
         }
       if (mhd_daemon != NULL)
         MHD_get_fdset (mhd_daemon, &rs, &ws, &es, &max);
-      /* CURL requires a regular timeout... */
-      tv.tv_sec = 0;
-      tv.tv_usec = 1000;
-      SELECT (max + 1, &rs, &ws, &es, &tv);
+      timeout = 0;
+      have_tv = MHD_get_timeout(mhd_daemon,
+                               &timeout);
+      if ( (CURLM_OK == curl_multi_timeout(curl_multi, &ms)) &&
+          ( (ms < timeout) ||
+            (have_tv == MHD_NO) ) ) {
+       timeout = ms;
+       have_tv = MHD_YES;
+      }
+      FD_SET(signal_pipe[0], &rs);
+      if (max < signal_pipe[0])
+       max = signal_pipe[0];
+      tv.tv_sec = timeout / 1000;
+      tv.tv_usec = (timeout % 1000) * 1000;
+      SELECT (max + 1, &rs, &ws, &es, (have_tv == MHD_YES) ? &tv : NULL);
       if (YES != http_running)
         break;
       running = 0;
-      curl_multi_perform (curl_multi, &running);
-      if (mhd_daemon != NULL)
-        MHD_run (mhd_daemon);
+      do {
+       mret = curl_multi_perform (curl_multi, &running);
+      } while ( (mret == CURLM_CALL_MULTI_PERFORM) &&
+               (http_running == YES) );
+      if ( FD_ISSET(signal_pipe[0], &rs)) 
+       read(signal_pipe[0], buf, sizeof(buf));      
+      if ( (mret != CURLM_OK) &&
+          (mret != CURLM_CALL_MULTI_PERFORM) ) 
+       GE_LOG (coreAPI->ectx,
+               GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+               _("%s failed at %s:%d: `%s'\n"),
+               "curl_multi_perform",
+               __FILE__, __LINE__, curl_multi_strerror (mret));      
+      if (mhd_daemon != NULL) 
+        MHD_run (mhd_daemon);  
       cleanup_connections ();
     }
+#if DEBUG_HTTP
+  GE_LOG (coreAPI->ectx,
+         GE_DEBUG | GE_REQUEST | GE_USER,
+         "HTTP transport select thread exits.\n");
+#endif
   return NULL;
 }
 
+
 /**
  * Start the server process to receive inbound traffic.
  * @return OK on success, SYSERR if the operation failed
@@ -1185,7 +1683,7 @@
   port = getGNUnetHTTPPort ();
   if ((mhd_daemon == NULL) && (port != 0))
     {
-      mhd_daemon = MHD_start_daemon (MHD_NO_FLAG,
+      mhd_daemon = MHD_start_daemon (MHD_USE_DEBUG,
                                      port,
                                      &acceptPolicyCallback,
                                      NULL, &accessHandlerCallback, NULL,
@@ -1197,7 +1695,14 @@
                                     128,
                                     MHD_OPTION_END);
     }
-  http_running = YES;
+  if (0 != PIPE(signal_pipe)) {
+    MHD_stop_daemon(mhd_daemon);
+    curl_multi_cleanup(curl_multi);
+    curl_multi = NULL;
+    mhd_daemon = NULL;
+    return SYSERR;
+  }
+  http_running = YES;  
   curl_thread = PTHREAD_CREATE (&curl_runner, NULL, 32 * 1024);
   if (curl_thread == NULL)
     GE_DIE_STRERROR (coreAPI->ectx,
@@ -1217,8 +1722,11 @@
   if ((http_running == NO) || (curl_multi == NULL))
     return SYSERR;
   http_running = NO;
+  signal_select();
   PTHREAD_STOP_SLEEP (curl_thread);
   PTHREAD_JOIN (curl_thread, &unused);
+  CLOSE(signal_pipe[0]);
+  CLOSE(signal_pipe[1]);
   if (mhd_daemon != NULL)
     {
       MHD_stop_daemon (mhd_daemon);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]