gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33842 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r33842 - gnunet/src/transport
Date: Wed, 25 Jun 2014 16:14:17 +0200

Author: grothoff
Date: 2014-06-25 16:14:16 +0200 (Wed, 25 Jun 2014)
New Revision: 33842

Modified:
   gnunet/src/transport/plugin_transport_http_client.c
   gnunet/src/transport/plugin_transport_http_server.c
Log:
-add monitor support to http server, more code clean up, add http server MHD 
suspend feature, support quota reset API from transport in HTTP server

Modified: gnunet/src/transport/plugin_transport_http_client.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_client.c 2014-06-25 08:24:22 UTC 
(rev 33841)
+++ gnunet/src/transport/plugin_transport_http_client.c 2014-06-25 14:14:16 UTC 
(rev 33842)
@@ -156,9 +156,10 @@
   struct HTTP_Client_Plugin *plugin;
 
   /**
-   * Client send handle
+   * Curl client PUT handle.
+   * FIXME: delta to put.easyhandle?
    */
-  void *client_put;
+  CURL *client_put;
 
   /**
    * Handle for the HTTP PUT request.
@@ -166,14 +167,15 @@
   struct ConnectionHandle put;
 
   /**
-   * Handle for the HTTP GET request.
+   * Curl client GET handle
+   * FIXME: delta to get.easyhandle?
    */
-  struct ConnectionHandle get;
+  CURL *client_get;
 
   /**
-   * Client receive handle
+   * Handle for the HTTP GET request.
    */
-  void *client_get;
+  struct ConnectionHandle get;
 
   /**
    * next pointer for double linked list
@@ -415,6 +417,7 @@
   struct HTTP_Client_Plugin *plugin = s->plugin;
   struct HTTP_Message *pos;
   struct HTTP_Message *next;
+  CURLMcode mret;
 
   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
   {
@@ -427,11 +430,49 @@
     GNUNET_SCHEDULER_cancel (s->put_disconnect_task);
     s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   }
+  if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+    s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
                                                        &s->target,
                                                        s));
+  if (NULL != s->client_put)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Session %p/connection %p: disconnecting PUT connection to peer 
`%s'\n",
+         s,
+         s->client_put,
+         GNUNET_i2s (&s->target));
 
+    /* remove curl handle from multi handle */
+    mret = curl_multi_remove_handle (plugin->curl_multi_handle,
+                                     s->client_put);
+    GNUNET_break (CURLM_OK == mret);
+    curl_easy_cleanup (s->client_put);
+    s->client_put = NULL;
+  }
+  if (NULL != s->client_get)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Session %p/connection %p: disconnecting GET connection to peer 
`%s'\n",
+         s, s->client_get,
+         GNUNET_i2s (&s->target));
+    /* remove curl handle from multi handle */
+    mret = curl_multi_remove_handle (plugin->curl_multi_handle,
+                                     s->client_get);
+    GNUNET_break (CURLM_OK == mret);
+    curl_easy_cleanup (s->client_get);
+    GNUNET_assert (plugin->cur_connections > 0);
+    plugin->cur_connections--;
+    s->client_get = NULL;
+  }
+  GNUNET_STATISTICS_set (plugin->env->stats,
+                         HTTP_STAT_STR_CONNECTIONS,
+                         plugin->cur_connections,
+                         GNUNET_NO);
   next = s->msg_head;
   while (NULL != (pos = next))
   {
@@ -757,86 +798,12 @@
                                        struct Session *s)
 {
   struct HTTP_Client_Plugin *plugin = cls;
-  struct HTTP_Message *msg;
-  struct HTTP_Message *t;
-  int res = GNUNET_OK;
-  CURLMcode mret;
 
-  if (NULL != s->client_put)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Session %p/connection %p: disconnecting PUT connection to peer 
`%s'\n",
-         s,
-         s->client_put,
-         GNUNET_i2s (&s->target));
-
-    /* remove curl handle from multi handle */
-    mret = curl_multi_remove_handle (plugin->curl_multi_handle,
-                                     s->client_put);
-    if (mret != CURLM_OK)
-    {
-      /* clean up easy handle, handle is now invalid and free'd */
-      res = GNUNET_SYSERR;
-      GNUNET_break (0);
-    }
-    curl_easy_cleanup (s->client_put);
-    s->client_put = NULL;
-  }
-
-
-  if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
-  {
-    GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
-    s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-
-  if (NULL != s->client_get)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Session %p/connection %p: disconnecting GET connection to peer 
`%s'\n",
-         s, s->client_get,
-         GNUNET_i2s (&s->target));
-    /* remove curl handle from multi handle */
-    mret = curl_multi_remove_handle (plugin->curl_multi_handle, s->client_get);
-    if (mret != CURLM_OK)
-    {
-      /* clean up easy handle, handle is now invalid and free'd */
-      res = GNUNET_SYSERR;
-      GNUNET_break (0);
-    }
-    curl_easy_cleanup (s->client_get);
-    s->client_get = NULL;
-  }
-
-  msg = s->msg_head;
-  while (NULL != msg)
-  {
-    t = msg->next;
-    if (NULL != msg->transmit_cont)
-      msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR,
-                          msg->size, msg->pos + s->overhead);
-    s->overhead = 0;
-    GNUNET_CONTAINER_DLL_remove (s->msg_head,
-                                 s->msg_tail,
-                                 msg);
-    GNUNET_assert (0 < s->msgs_in_queue);
-    s->msgs_in_queue--;
-    GNUNET_assert (msg->size <= s->bytes_in_queue);
-    s->bytes_in_queue -= msg->size;
-    GNUNET_free (msg);
-    msg = t;
-  }
-
-  GNUNET_assert (plugin->cur_connections >= 2);
-  plugin->cur_connections -= 2;
-  GNUNET_STATISTICS_set (plugin->env->stats,
-                         HTTP_STAT_STR_CONNECTIONS,
-                         plugin->cur_connections,
-                         GNUNET_NO);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Session %p: notifying transport about ending session\n",s);
-
-  plugin->env->session_end (plugin->env->cls, s->address, s);
+  plugin->env->session_end (plugin->env->cls,
+                            s->address,
+                            s);
   client_delete_session (s);
 
   /* Re-schedule since handles have changed */
@@ -847,7 +814,7 @@
   }
   client_schedule (plugin, GNUNET_YES);
 
-  return res;
+  return GNUNET_OK;
 }
 
 
@@ -993,11 +960,13 @@
   s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Session %p/connection %p: will be disconnected due to no activity\n",
-       s, s->client_put);
+       s,
+       s->client_put);
   s->put_paused = GNUNET_NO;
   s->put_tmp_disconnecting = GNUNET_YES;
   if (NULL != s->client_put)
-    curl_easy_pause (s->client_put, CURLPAUSE_CONT);
+    curl_easy_pause (s->client_put,
+                     CURLPAUSE_CONT);
   client_schedule (s->plugin, GNUNET_YES);
 }
 
@@ -1284,24 +1253,23 @@
   int running;
   long http_statuscode;
   CURLMcode mret;
+  CURLMsg *msg;
+  int msgs_left;
 
   plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   do
   {
     running = 0;
     mret = curl_multi_perform (plugin->curl_multi_handle, &running);
 
-    CURLMsg *msg;
-    int msgs_left;
 
     while ((msg = curl_multi_info_read (plugin->curl_multi_handle, 
&msgs_left)))
     {
       CURL *easy_h = msg->easy_handle;
       struct Session *s = NULL;
-      char *d = (char *) s;
+      char *d = NULL; /* curl requires 'd' to be a 'char *' */
 
       if (NULL == easy_h)
       {
@@ -1316,80 +1284,88 @@
       GNUNET_assert (CURLE_OK ==
                      curl_easy_getinfo (easy_h, CURLINFO_PRIVATE, &d));
       s = (struct Session *) d;
-      GNUNET_assert (s != NULL);
+      GNUNET_assert (NULL != s);
       if (msg->msg == CURLMSG_DONE)
       {
-        GNUNET_break (CURLE_OK == curl_easy_getinfo (easy_h,
-            CURLINFO_RESPONSE_CODE, &http_statuscode));
+        GNUNET_break (CURLE_OK ==
+curl_easy_getinfo (easy_h,
+                   CURLINFO_RESPONSE_CODE,
+                   &http_statuscode));
         if (easy_h == s->client_put)
         {
-            if  ((0 != msg->data.result) || (http_statuscode != 200))
-            {
-              LOG (GNUNET_ERROR_TYPE_DEBUG,
-                   "Session %p/connection %p: PUT connection to `%s' ended 
with status %i reason %i: `%s'\n",
-                   s, msg->easy_handle,
-                   GNUNET_i2s (&s->target),
-                   http_statuscode,
-                   msg->data.result,
-                   curl_easy_strerror (msg->data.result));
-            }
-            else
-              LOG (GNUNET_ERROR_TYPE_DEBUG,
-                   "Session %p/connection %p: PUT connection to `%s' ended 
normal\n",
-                   s, msg->easy_handle,
-                   GNUNET_i2s (&s->target));
-            if (NULL == s->client_get)
-            {
-              /* Disconnect other transmission direction and tell transport */
-              /* FIXME? */
-            }
-            curl_multi_remove_handle (plugin->curl_multi_handle, easy_h);
-            curl_easy_cleanup (easy_h);
-            s->put_tmp_disconnecting = GNUNET_NO;
-            s->put_tmp_disconnected = GNUNET_YES;
-            s->client_put = NULL;
-            s->put.easyhandle = NULL;
-            s->put.s = NULL;
+          if  ((0 != msg->data.result) || (http_statuscode != 200))
+          {
+            LOG (GNUNET_ERROR_TYPE_DEBUG,
+                 "Session %p/connection %p: PUT connection to `%s' ended with 
status %i reason %i: `%s'\n",
+                 s, msg->easy_handle,
+                 GNUNET_i2s (&s->target),
+                 http_statuscode,
+                 msg->data.result,
+                 curl_easy_strerror (msg->data.result));
+          }
+          else
+            LOG (GNUNET_ERROR_TYPE_DEBUG,
+                 "Session %p/connection %p: PUT connection to `%s' ended 
normal\n",
+                 s, msg->easy_handle,
+                 GNUNET_i2s (&s->target));
+          if (NULL == s->client_get)
+          {
+            /* Disconnect other transmission direction and tell transport */
+            /* FIXME? */
+          }
+          curl_multi_remove_handle (plugin->curl_multi_handle,
+                                    easy_h);
+          curl_easy_cleanup (easy_h);
+          GNUNET_assert (plugin->cur_connections > 0);
+          plugin->cur_connections--;
+          s->put_tmp_disconnecting = GNUNET_NO;
+          s->put_tmp_disconnected = GNUNET_YES;
+          s->client_put = NULL;
+          s->put.easyhandle = NULL;
+          s->put.s = NULL;
 
-            /*
-             * Handling a rare case:
-             * plugin_send was called during temporary put disconnect,
-             * reconnect required after connection was disconnected
-             */
-            if (GNUNET_YES == s->put_reconnect_required)
+          /*
+           * Handling a rare case:
+           * plugin_send was called during temporary put disconnect,
+           * reconnect required after connection was disconnected
+           */
+          if (GNUNET_YES == s->put_reconnect_required)
+          {
+            s->put_reconnect_required = GNUNET_NO;
+            if (GNUNET_SYSERR == client_connect_put (s))
             {
-              s->put_reconnect_required = GNUNET_NO;
-              if (GNUNET_SYSERR == client_connect_put (s))
-              {
-                GNUNET_break (s->client_put == NULL);
-                GNUNET_break (s->put_tmp_disconnected == GNUNET_NO);
-              }
+              GNUNET_break (s->client_put == NULL);
+              GNUNET_break (s->put_tmp_disconnected == GNUNET_NO);
             }
+          }
         }
         if (easy_h == s->client_get)
         {
-            if  ((0 != msg->data.result) || (http_statuscode != 200))
-            {
-              LOG (GNUNET_ERROR_TYPE_DEBUG,
-                   "Session %p/connection %p: GET connection to `%s' ended 
with status %i reason %i: `%s'\n",
-                   s,
-                   msg->easy_handle,
-                   GNUNET_i2s (&s->target),
-                   http_statuscode,
-                   msg->data.result,
-                   curl_easy_strerror (msg->data.result));
+          if  ((0 != msg->data.result) || (http_statuscode != 200))
+          {
+            LOG (GNUNET_ERROR_TYPE_DEBUG,
+                 "Session %p/connection %p: GET connection to `%s' ended with 
status %i reason %i: `%s'\n",
+                 s,
+                 msg->easy_handle,
+                 GNUNET_i2s (&s->target),
+                 http_statuscode,
+                 msg->data.result,
+                 curl_easy_strerror (msg->data.result));
 
-            }
-            else
-              LOG (GNUNET_ERROR_TYPE_DEBUG,
-                   "Session %p/connection %p: GET connection to `%s' ended 
normal\n",
-                   s,
-                   msg->easy_handle,
-                   GNUNET_i2s (&s->target));
-            /* Disconnect other transmission direction and tell transport */
-            s->get.easyhandle = NULL;
-            s->get.s = NULL;
-            http_client_plugin_session_disconnect (plugin, s);
+          }
+          else
+            LOG (GNUNET_ERROR_TYPE_DEBUG,
+                 "Session %p/connection %p: GET connection to `%s' ended 
normal\n",
+                 s,
+                 msg->easy_handle,
+                 GNUNET_i2s (&s->target));
+          /* Disconnect other transmission direction and tell transport */
+          s->get.easyhandle = NULL;
+          s->get.s = NULL;
+          /* FIXME: who calls curl_multi_remove on 'easy_h' now!? */
+          GNUNET_assert (plugin->cur_connections > 0);
+          plugin->cur_connections--;
+          http_client_plugin_session_disconnect (plugin, s);
         }
       }
     }
@@ -1423,6 +1399,7 @@
   curl_easy_setopt (s->client_get, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1);
   {
     struct HttpAddress *ha;
+
     ha = (struct HttpAddress *) s->address->address;
 
     if (HTTP_OPTIONS_VERIFY_CERTIFICATE ==
@@ -1444,7 +1421,7 @@
   curl_easy_setopt (s->client_get, CURLOPT_REDIR_PROTOCOLS, CURLPROTO_HTTP);
 #endif
 
-  if (s->plugin->proxy_hostname != NULL)
+  if (NULL != s->plugin->proxy_hostname)
   {
     curl_easy_setopt (s->client_get, CURLOPT_PROXY, s->plugin->proxy_hostname);
     curl_easy_setopt (s->client_get, CURLOPT_PROXYTYPE, s->plugin->proxytype);
@@ -1478,8 +1455,9 @@
 #endif
   curl_easy_setopt (s->client_get, CURLOPT_FOLLOWLOCATION, 0);
 
-  mret = curl_multi_add_handle (s->plugin->curl_multi_handle, s->client_get);
-  if (mret != CURLM_OK)
+  mret = curl_multi_add_handle (s->plugin->curl_multi_handle,
+                                s->client_get);
+  if (CURLM_OK != mret)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "Session %p : Failed to add GET handle to multihandle: `%s'\n",
@@ -1492,7 +1470,7 @@
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-
+  s->plugin->cur_connections++;
   return GNUNET_OK;
 }
 
@@ -1576,8 +1554,9 @@
 #if CURL_TCP_NODELAY
   curl_easy_setopt (s->client_put, CURLOPT_TCP_NODELAY, 1);
 #endif
-  mret = curl_multi_add_handle (s->plugin->curl_multi_handle, s->client_put);
-  if (mret != CURLM_OK)
+  mret = curl_multi_add_handle (s->plugin->curl_multi_handle,
+                                s->client_put);
+  if (CURLM_OK != mret)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "Session %p : Failed to add PUT handle to multihandle: `%s'\n",
@@ -1591,6 +1570,8 @@
     return GNUNET_SYSERR;
   }
   s->put_tmp_disconnected = GNUNET_NO;
+  s->plugin->cur_connections++;
+
   return GNUNET_OK;
 }
 
@@ -1632,11 +1613,7 @@
 
   if ((GNUNET_SYSERR == client_connect_get (s)) ||
       (GNUNET_SYSERR == client_connect_put (s)))
-  {
-    plugin->env->session_end (plugin->env->cls, s->address, s);
-    client_delete_session (s);
     return GNUNET_SYSERR;
-  }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Session %p: connected with connections GET %p and PUT %p\n",
@@ -1644,7 +1621,6 @@
        s->client_get,
        s->client_put);
   /* Perform connect */
-  plugin->cur_connections += 2;
   GNUNET_STATISTICS_set (plugin->env->stats,
                          HTTP_STAT_STR_CONNECTIONS,
                          plugin->cur_connections,

Modified: gnunet/src/transport/plugin_transport_http_server.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_server.c 2014-06-25 08:24:22 UTC 
(rev 33841)
+++ gnunet/src/transport/plugin_transport_http_server.c 2014-06-25 14:14:16 UTC 
(rev 33842)
@@ -23,6 +23,7 @@
  * @brief HTTP/S server transport plugin
  * @author Matthias Wachs
  * @author David Barksdale
+ * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -223,11 +224,31 @@
   struct GNUNET_TIME_Absolute next_receive;
 
   /**
+   * Absolute time when this connection will time out.
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
    * Session timeout task
    */
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
   /**
+   * Task to resume MHD handling when receiving is allowed again
+   */
+  GNUNET_SCHEDULER_TaskIdentifier recv_wakeup_task;
+
+  /**
+   * Number of bytes waiting for transmission to this peer.
+   */
+  unsigned long long bytes_in_queue;
+
+  /**
+   * Number of messages waiting for transmission to this peer.
+   */
+  unsigned int msgs_in_queue;
+
+  /**
    * Unique HTTP/S connection tag for this connection
    */
   uint32_t tag;
@@ -295,6 +316,7 @@
    * NAT handle & address management
    */
   struct GNUNET_NAT_Handle *nat;
+
   /**
    * IPv4 addresses DLL head
    */
@@ -453,10 +475,10 @@
   memset (&info, 0, sizeof (info));
   info.state = state;
   info.is_inbound = GNUNET_YES;
-  // info.num_msg_pending = session->msgs_in_queue; // FIXME
-  // info.num_bytes_pending = session->bytes_in_queue; // FIXME
-  // info.receive_delay = session->next_receive; // FIXME
-  // info.session_timeout = session->timeout; // FIXME
+  info.num_msg_pending = session->msgs_in_queue;
+  info.num_bytes_pending = session->bytes_in_queue;
+  info.receive_delay = session->next_receive;
+  info.session_timeout = session->timeout;
   info.address = session->address;
   plugin->sic (plugin->sic_cls,
                session,
@@ -465,6 +487,28 @@
 
 
 /**
+ * Wake up an MHD connection which was suspended
+ *
+ * @param cls the session
+ * @param tc task context
+ */
+static void
+server_wake_up (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Session *s = cls;
+
+  s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Session %p: Waking up PUT handle\n",
+       s);
+  MHD_resume_connection (s->server_recv->mhd_conn);
+}
+
+
+/**
  * Reschedule the execution of both IPv4 and IPv6 server.
  *
  * @param plugin the plugin
@@ -488,7 +532,6 @@
 {
   struct HTTP_Server_Plugin *plugin = s->plugin;
   struct HTTP_Message *msg;
-  struct HTTP_Message *tmp;
   struct ServerConnection *send;
   struct ServerConnection *recv;
 
@@ -496,15 +539,21 @@
   {
     GNUNET_SCHEDULER_cancel (s->timeout_task);
     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    s->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   }
+  if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+    s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
+    if (NULL != s->server_recv)
+      MHD_resume_connection (s->server_recv->mhd_conn);
+  }
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
                                                        &s->target,
                                                        s));
-  msg = s->msg_head;
-  while (NULL != msg)
+  while (NULL != (msg = s->msg_head))
   {
-    tmp = msg->next;
     GNUNET_CONTAINER_DLL_remove (s->msg_head,
                                  s->msg_tail,
                                  msg);
@@ -514,9 +563,14 @@
                           GNUNET_SYSERR,
                           msg->size,
                           msg->pos + msg->overhead);
+    GNUNET_assert (s->msgs_in_queue > 0);
+    s->msgs_in_queue--;
+    GNUNET_assert (s->bytes_in_queue >= msg->size);
+    s->bytes_in_queue -= msg->size;
     GNUNET_free (msg);
-    msg = tmp;
   }
+  GNUNET_assert (0 == s->msgs_in_queue);
+  GNUNET_assert (0 == s->bytes_in_queue);
   send = s->server_send;
   if (NULL != send)
   {
@@ -546,6 +600,9 @@
                        recv->mhd_daemon,
                        GNUNET_YES);
   }
+  notify_session_monitor (plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_DOWN);
   if (GNUNET_YES == s->known_to_service)
     plugin->env->session_end (plugin->env->cls,
                               s->address,
@@ -591,8 +648,22 @@
                         const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Session *s = cls;
+  struct GNUNET_TIME_Relative left;
 
   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+  if (0 != left.rel_value_us)
+  {
+    /* not actually our turn yet, but let's at least update
+       the monitor, it may think we're about to die ... */
+    notify_session_monitor (s->plugin,
+                            s,
+                            GNUNET_TRANSPORT_SS_UP);
+    s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+                                                    &server_session_timeout,
+                                                    s);
+    return;
+  }
   GNUNET_log (TIMEOUT_LOG,
               "Session %p was idle for %s, disconnecting\n",
               s,
@@ -611,15 +682,7 @@
 server_reschedule_session_timeout (struct Session *s)
 {
  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- GNUNET_SCHEDULER_cancel (s->timeout_task);
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (HTTP_SERVER_SESSION_TIMEOUT,
-                                                 &server_session_timeout,
-                                                 s);
- GNUNET_log (TIMEOUT_LOG,
-             "Timeout rescheduled for session %p set to %s\n",
-             s,
-            GNUNET_STRINGS_relative_time_to_string 
(HTTP_SERVER_SESSION_TIMEOUT,
-                                                    GNUNET_YES));
+  s->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
 }
 
 
@@ -681,10 +744,17 @@
   msg->buf = (char *) &msg[1];
   msg->transmit_cont = cont;
   msg->transmit_cont_cls = cont_cls;
-  memcpy (msg->buf, msgbuf, msgbuf_size);
+  memcpy (msg->buf,
+          msgbuf,
+          msgbuf_size);
   GNUNET_CONTAINER_DLL_insert_tail (session->msg_head,
                                     session->msg_tail,
                                     msg);
+  session->msgs_in_queue++;
+  session->bytes_in_queue += msg->size;
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
   GNUNET_asprintf (&stat_txt,
                    "# bytes currently in %s_server buffers",
                    plugin->protocol);
@@ -1018,6 +1088,15 @@
 }
 
 
+/**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that a session is still active and in use and
+ * therefore the session timeout for this session has to be updated
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ */
 static void
 http_server_plugin_update_session_timeout (void *cls,
                                            const struct GNUNET_PeerIdentity 
*peer,
@@ -1094,8 +1173,8 @@
 
   if (NULL == url)
   {
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
 
   if (regexec(&plugin->url_regex, url, 4, matches, 0))
@@ -1162,9 +1241,12 @@
        GNUNET_i2s_full (target));
 
   /* convert options */
-  if (-1 == matches[3].rm_so) {
-      *options = 0;
-  } else {
+  if (-1 == matches[3].rm_so)
+  {
+    *options = 0;
+  }
+  else
+  {
     rc = strtoul (&url[matches[3].rm_so + 1], &options_end, 10);
     if (&url[matches[3].rm_eo] != options_end)
     {
@@ -1184,9 +1266,10 @@
            "URL options > UINT32_MAX\n");
       return GNUNET_SYSERR;
     }
-    (*options) = (uint32_t)rc;
+    (*options) = (uint32_t) rc;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Found options `%u' in url\n", *options);
+         "Found options `%u' in url\n",
+         *options);
   }
   return GNUNET_OK;
 }
@@ -1344,6 +1427,7 @@
     s->ats_address_network_type = ats.value;
     s->next_receive = GNUNET_TIME_UNIT_ZERO_ABS;
     s->tag = stc.tag;
+    s->timeout = GNUNET_TIME_relative_to_absolute 
(HTTP_SERVER_SESSION_TIMEOUT);
     s->timeout_task = GNUNET_SCHEDULER_add_delayed 
(HTTP_SERVER_SESSION_TIMEOUT,
                                                     &server_session_timeout,
                                                     s);
@@ -1351,7 +1435,9 @@
                                               &s->target,
                                               s,
                                               
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
+    notify_session_monitor (plugin,
+                            s,
+                            GNUNET_TRANSPORT_SS_HANDSHAKE);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Creating new session %p for peer `%s' connecting from `%s'\n",
          s, GNUNET_i2s (&target),
@@ -1361,7 +1447,8 @@
     GNUNET_free_non_null (addr);
   }
 
-  if ( (_RECEIVE == direction) && (NULL != s->server_recv) )
+  if ( (_RECEIVE == direction) &&
+       (NULL != s->server_recv) )
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Duplicate PUT connection from `%s' tag %u, dismissing new 
connection\n",
@@ -1392,13 +1479,15 @@
   if (direction == _RECEIVE)
     s->server_recv = sc;
 
-  if ((NULL != s->server_send) && (NULL != s->server_recv))
+  if ( (NULL != s->server_send) &&
+       (NULL != s->server_recv) )
   {
     s->known_to_service = GNUNET_YES;
     plugin->env->session_start (NULL, s->address ,s, NULL, 0);
   }
 
-  if ((NULL == s->server_recv) || (NULL == s->server_send))
+  if ( (NULL == s->server_recv) ||
+       (NULL == s->server_send) )
   {
     to = (HTTP_SERVER_NOT_VALIDATED_TIMEOUT.rel_value_us / 1000LL / 1000LL);
     MHD_set_connection_option (mhd_connection,
@@ -1425,8 +1514,8 @@
  * @param cls current session
  * @param pos position in buffer
  * @param buf the buffer to write data to
- * @param max max number of bytes available in buffer
- * @return bytes written to buffer
+ * @param max max number of bytes available in @a buf
+ * @return bytes written to @a buf
  */
 static ssize_t
 server_send_callback (void *cls,
@@ -1461,7 +1550,14 @@
       if (NULL != msg->transmit_cont)
         msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK,
                             msg->size, msg->size + msg->overhead);
+      GNUNET_assert (s->msgs_in_queue > 0);
+      s->msgs_in_queue--;
+      GNUNET_assert (s->bytes_in_queue >= msg->size);
+      s->bytes_in_queue -= msg->size;
       GNUNET_free (msg);
+      notify_session_monitor (s->plugin,
+                              s,
+                              GNUNET_TRANSPORT_SS_UP);
     }
   }
   if (0 < bytes_read)
@@ -1469,13 +1565,19 @@
     sc->connected = GNUNET_YES;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Sent %u bytes to peer `%s' with session %p \n",
-         bytes_read, GNUNET_i2s (&s->target), s);
-    GNUNET_asprintf (&stat_txt, "# bytes currently in %s_server buffers",
+         bytes_read,
+         GNUNET_i2s (&s->target),
+         s);
+    GNUNET_asprintf (&stat_txt,
+                     "# bytes currently in %s_server buffers",
                      s->plugin->protocol);
     GNUNET_STATISTICS_update (s->plugin->env->stats,
-                              stat_txt, -bytes_read, GNUNET_NO);
+                              stat_txt,
+                              - bytes_read,
+                              GNUNET_NO);
     GNUNET_free (stat_txt);
-    GNUNET_asprintf (&stat_txt, "# bytes transmitted via %s_server",
+    GNUNET_asprintf (&stat_txt,
+                     "# bytes transmitted via %s_server",
                      s->plugin->protocol);
     GNUNET_STATISTICS_update (s->plugin->env->stats,
                               stat_txt, bytes_read, GNUNET_NO);
@@ -1484,8 +1586,9 @@
   else if ((sc->options & OPTION_LONG_POLL) && sc->connected)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Completing GET response to peer `%s' with session %p \n",
-         GNUNET_i2s (&s->target), s);
+         "Completing GET response to peer `%s' with session %p\n",
+         GNUNET_i2s (&s->target),
+         s);
     return MHD_CONTENT_READER_END_OF_STREAM;
   }
   return bytes_read;
@@ -1519,7 +1622,14 @@
   if (GNUNET_NO == s->known_to_service)
   {
     s->known_to_service = GNUNET_YES;
-    plugin->env->session_start (NULL, s->address, s, NULL, 0);
+    plugin->env->session_start (NULL,
+                                s->address,
+                                s,
+                                NULL,
+                                0);
+    notify_session_monitor (plugin,
+                            s,
+                            GNUNET_TRANSPORT_SS_UP);
   }
   delay = plugin->env->receive (plugin->env->cls,
                                 s->address,
@@ -1554,6 +1664,8 @@
 /**
  * Add headers to a request indicating that we allow Cross-Origin Resource
  * Sharing.
+ *
+ * @param response response object to modify
  */
 static void
 add_cors_headers(struct MHD_Response *response)
@@ -1579,7 +1691,7 @@
  * @param method GET or PUT
  * @param version HTTP version
  * @param upload_data upload data
- * @param upload_data_size sizeof upload data
+ * @param upload_data_size size of @a upload_data
  * @param httpSessionCache the session cache to remember the connection
  * @return MHD_YES if connection is accepted, MHD_NO on reject
  */
@@ -1705,6 +1817,8 @@
     }
     else if ((*upload_data_size > 0) && (sc->connected == GNUNET_YES))
     {
+      struct GNUNET_TIME_Relative delay;
+
       /* (*upload_data_size > 0) for every segment received */
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "Session %p / Connection %p: Peer `%s' PUT on address `%s' received 
%u bytes\n",
@@ -1714,9 +1828,8 @@
                                                  s->address->address,
                                                  s->address->address_length),
            *upload_data_size);
-      struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
-
-      if ((s->next_receive.abs_value_us <= now.abs_value_us))
+      delay = GNUNET_TIME_absolute_get_remaining (s->next_receive);
+      if (0 == delay.rel_value_us)
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "PUT with %u bytes forwarded to MST\n",
@@ -1725,19 +1838,26 @@
         {
           s->msg_tk = GNUNET_SERVER_mst_create (&server_receive_mst_cb, s);
         }
-            GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data,
-                                       *upload_data_size, GNUNET_NO, 
GNUNET_NO);
+        GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data,
+                                   *upload_data_size, GNUNET_NO, GNUNET_NO);
         server_mhd_connection_timeout (plugin, s,
                                       
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value_us / 1000LL / 1000LL);
         (*upload_data_size) = 0;
       }
       else
       {
+        /* delay processing */
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "Session %p / Connection %p: no inbound bandwidth 
available! Next read was delayed by %s\n",
                     s, sc,
-                   GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_duration (s->next_receive),
+                   GNUNET_STRINGS_relative_time_to_string (delay,
                                                            GNUNET_YES));
+        GNUNET_assert (s->server_recv->mhd_conn == mhd_connection);
+        MHD_suspend_connection (s->server_recv->mhd_conn);
+        if (GNUNET_SCHEDULER_NO_TASK == s->recv_wakeup_task)
+          s->recv_wakeup_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                                              &server_wake_up,
+                                                              s);
       }
       return MHD_YES;
     }
@@ -1832,7 +1952,7 @@
  *
  * @param cls plugin as closure
  * @param addr address of incoming connection
- * @param addr_len address length of incoming connection
+ * @param addr_len number of bytes in @a addr
  * @return MHD_YES if connection is accepted, MHD_NO if connection is rejected
  */
 static int
@@ -1860,6 +1980,14 @@
 }
 
 
+/**
+ * Log function called by MHD.
+ *
+ * @param arg NULL
+ * @param fmt format string
+ * @param ap arguments for the format string (va_start() and va_end()
+ *           will be called by MHD)
+ */
 static void
 server_log (void *arg,
             const char *fmt,
@@ -1867,8 +1995,10 @@
 {
   char text[1024];
 
-  vsnprintf (text, sizeof (text), fmt, ap);
-  va_end (ap);
+  vsnprintf (text,
+             sizeof (text),
+             fmt,
+             ap);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Server: %s\n",
               text);
@@ -1896,7 +2026,7 @@
   gn_file =
       GNUNET_DISK_file_open (file, GNUNET_DISK_OPEN_READ,
                              GNUNET_DISK_PERM_USER_READ);
-  if (gn_file == NULL)
+  if (NULL == gn_file)
   {
     GNUNET_free (text);
     return NULL;
@@ -2114,7 +2244,9 @@
            plugin->name, plugin->port);
     }
     else
-       server_reschedule (plugin, plugin->server_v4, GNUNET_NO);
+      server_reschedule (plugin,
+                         plugin->server_v4,
+                         GNUNET_NO);
   }
 
 
@@ -2186,44 +2318,6 @@
 }
 
 
-static void
-server_stop (struct HTTP_Server_Plugin *plugin)
-{
-  if (plugin->server_v4 != NULL)
-  {
-    MHD_stop_daemon (plugin->server_v4);
-    plugin->server_v4 = NULL;
-  }
-  if ( plugin->server_v6 != NULL)
-  {
-    MHD_stop_daemon (plugin->server_v6);
-    plugin->server_v6 = NULL;
-  }
-
-
-  if (plugin->server_v4_task != GNUNET_SCHEDULER_NO_TASK)
-  {
-    GNUNET_SCHEDULER_cancel (plugin->server_v4_task);
-    plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-
-  if (plugin->server_v6_task != GNUNET_SCHEDULER_NO_TASK)
-  {
-    GNUNET_SCHEDULER_cancel (plugin->server_v6_task);
-    plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-#if BUILD_HTTPS
-  GNUNET_free_non_null (plugin->crypto_init);
-  GNUNET_free_non_null (plugin->cert);
-  GNUNET_free_non_null (plugin->key);
-#endif
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s server component stopped\n",
-       plugin->name);
-}
-
-
 /**
  * Add an address to the server's set of addresses and notify transport
  *
@@ -2456,8 +2550,7 @@
     if (port > 65535)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  _
-                  ("Require valid port number for service in 
configuration!\n"));
+                  _("Require valid port number for service in 
configuration!\n"));
       return GNUNET_SYSERR;
     }
   }
@@ -2469,7 +2562,8 @@
   }
 
 
-  if (GNUNET_CONFIGURATION_have_value (cfg, service_name, "BINDTO"))
+  if (GNUNET_CONFIGURATION_have_value (cfg, service_name,
+                                       "BINDTO"))
   {
     GNUNET_break (GNUNET_OK ==
                   GNUNET_CONFIGURATION_get_value_string (cfg, service_name,
@@ -2478,7 +2572,7 @@
   else
     hostname = NULL;
 
-  if (hostname != NULL)
+  if (NULL != hostname)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Resolving `%s' since that is where `%s' will bind to.\n",
@@ -2487,10 +2581,12 @@
     if (disablev6)
       hints.ai_family = AF_INET;
     if ((0 != (ret = getaddrinfo (hostname, NULL, &hints, &res))) ||
-        (res == NULL))
+        (NULL == res))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to resolve `%s': %s\n"),
-                  hostname, gai_strerror (ret));
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  _("Failed to resolve `%s': %s\n"),
+                  hostname,
+                  gai_strerror (ret));
       GNUNET_free (hostname);
       return GNUNET_SYSERR;
     }
@@ -2522,9 +2618,9 @@
       next = pos->ai_next;
       if ((disablev6) && (pos->ai_family == AF_INET6))
         continue;
-      if ((pos->ai_protocol != IPPROTO_TCP) && (pos->ai_protocol != 0))
+      if ((pos->ai_protocol != IPPROTO_TCP) && (0 != pos->ai_protocol))
         continue;               /* not TCP */
-      if ((pos->ai_socktype != SOCK_STREAM) && (pos->ai_socktype != 0))
+      if ((pos->ai_socktype != SOCK_STREAM) && (0 != pos->ai_socktype))
         continue;               /* huh? */
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "Service will bind to `%s'\n",
@@ -2638,7 +2734,7 @@
   while (res > 0)
   {
     res--;
-    GNUNET_assert (addrs[res] != NULL);
+    GNUNET_assert (NULL != addrs[res]);
     GNUNET_free (addrs[res]);
   }
   GNUNET_free_non_null (addrs);
@@ -2733,7 +2829,10 @@
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
-  GNUNET_asprintf(&url, "%s://%s", plugin->protocol, 
plugin->external_hostname);
+  GNUNET_asprintf(&url,
+                  "%s://%s",
+                  plugin->protocol,
+                  plugin->external_hostname);
 
   urlen = strlen (url) + 1;
   ext_addr = GNUNET_malloc (sizeof (struct HttpAddress) + urlen);
@@ -3048,7 +3147,32 @@
 
   /* Stop to report addresses to transport service */
   server_stop_report_addresses (plugin);
-  server_stop (plugin);
+  if (NULL != plugin->server_v4)
+  {
+    MHD_stop_daemon (plugin->server_v4);
+    plugin->server_v4 = NULL;
+  }
+  if (NULL != plugin->server_v6)
+  {
+    MHD_stop_daemon (plugin->server_v6);
+    plugin->server_v6 = NULL;
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != plugin->server_v4_task)
+  {
+    GNUNET_SCHEDULER_cancel (plugin->server_v4_task);
+    plugin->server_v4_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+
+  if (GNUNET_SCHEDULER_NO_TASK != plugin->server_v6_task)
+  {
+    GNUNET_SCHEDULER_cancel (plugin->server_v6_task);
+    plugin->server_v6_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+#if BUILD_HTTPS
+  GNUNET_free_non_null (plugin->crypto_init);
+  GNUNET_free_non_null (plugin->cert);
+  GNUNET_free_non_null (plugin->key);
+#endif
   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
                                          &destroy_session_cb,
                                          plugin);
@@ -3090,7 +3214,9 @@
                                       const void *addr,
                                       size_t addrlen)
 {
-  return http_common_plugin_address_to_string (PLUGIN_NAME, addr, addrlen);
+  return http_common_plugin_address_to_string (PLUGIN_NAME,
+                                               addr,
+                                               addrlen);
 }
 
 
@@ -3099,7 +3225,7 @@
  *
  * @param cls closure ('struct HTTP_Server_Plugin*')
  * @param session the session
- * @return the network type in HBO or GNUNET_SYSERR
+ * @return the network type in HBO or #GNUNET_SYSERR
  */
 static enum GNUNET_ATS_Network_Type
 http_server_get_network (void *cls,
@@ -3110,6 +3236,38 @@
 
 
 /**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that the inbound quota changed and that the plugin
+ * should update it's delay for the next receive value
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ * @param delay new delay to use for receiving
+ */
+static void
+http_server_plugin_update_inbound_delay (void *cls,
+                                         const struct GNUNET_PeerIdentity 
*peer,
+                                         struct Session *s,
+                                         struct GNUNET_TIME_Relative delay)
+{
+  s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "New inbound delay %s\n",
+       GNUNET_STRINGS_relative_time_to_string (delay,
+                                               GNUNET_NO));
+  if (GNUNET_SCHEDULER_NO_TASK != s->recv_wakeup_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+    s->recv_wakeup_task
+      = GNUNET_SCHEDULER_add_delayed (delay,
+                                      &server_wake_up,
+                                      s);
+  }
+}
+
+
+/**
  * Return information about the given session to the
  * monitor callback.
  *
@@ -3207,7 +3365,7 @@
   api->address_pretty_printer = &http_common_plugin_address_pretty_printer;
   api->get_network = &http_server_get_network;
   api->update_session_timeout = &http_server_plugin_update_session_timeout;
-  // api->update_inbound_delay = &http_server_plugin_update_inbound_delay; // 
FIXME: implement/support!
+  api->update_inbound_delay = &http_server_plugin_update_inbound_delay;
   api->setup_monitor = &http_server_plugin_setup_monitor;
 #if BUILD_HTTPS
   plugin->name = "transport-https_server";
@@ -3218,7 +3376,9 @@
 #endif
 
   /* Compile URL regex */
-  if (regcomp(&plugin->url_regex, URL_REGEX, REG_EXTENDED))
+  if (regcomp(&plugin->url_regex,
+              URL_REGEX,
+              REG_EXTENDED))
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
                      _("Unable to compile URL regex\n"));




reply via email to

[Prev in Thread] Current Thread [Next in Thread]