gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33799 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r33799 - gnunet/src/transport
Date: Mon, 23 Jun 2014 21:58:56 +0200

Author: grothoff
Date: 2014-06-23 21:58:56 +0200 (Mon, 23 Jun 2014)
New Revision: 33799

Modified:
   gnunet/src/transport/plugin_transport_http_client.c
Log:
-add support for 'update_inbound_delay' to HTTP client, complete plugin 
monitoring implementation

Modified: gnunet/src/transport/plugin_transport_http_client.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_client.c 2014-06-23 19:24:14 UTC 
(rev 33798)
+++ gnunet/src/transport/plugin_transport_http_client.c 2014-06-23 19:58:56 UTC 
(rev 33799)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2002-2013 Christian Grothoff (and other contributing authors)
+     (C) 2002-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -22,6 +22,7 @@
  * @file transport/plugin_transport_http_client.c
  * @brief HTTP/S client transport plugin
  * @author Matthias Wachs
+ * @author Christian Grothoff
  */
 
 #if BUILD_HTTPS
@@ -237,7 +238,7 @@
   uint32_t ats_address_network_type;
 
   /**
-   * Is the client PUT handle currently paused
+   * Is the client PUT handle currently paused?
    */
   int put_paused;
 
@@ -394,9 +395,7 @@
   info.is_inbound = GNUNET_SYSERR; /* hard to say */
   info.num_msg_pending = session->msgs_in_queue;
   info.num_bytes_pending = session->bytes_in_queue;
-  /* info.receive_delay remains zero as this is not supported by UDP
-     (cannot selectively not receive from 'some' peer while continuing
-     to receive from others) */
+  info.receive_delay = session->next_receive;
   info.session_timeout = session->timeout;
   info.address = session->address;
   plugin->sic (plugin->sic_cls,
@@ -406,11 +405,16 @@
 
 
 /**
- * Increment session timeout due to activity for a session
+ * Increment session timeout due to activity for session @a s.
+ *
  * @param s the session
  */
 static void
-client_reschedule_session_timeout (struct Session *s);
+client_reschedule_session_timeout (struct Session *s)
+{
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+  s->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+}
 
 
 /**
@@ -564,7 +568,9 @@
   GNUNET_STATISTICS_update (plugin->env->stats,
                             stat_txt, msgbuf_size, GNUNET_NO);
   GNUNET_free (stat_txt);
-
+  notify_session_monitor (plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP);
   if (GNUNET_YES == s->put_tmp_disconnecting)
   {
     /* PUT connection is currently getting disconnected */
@@ -597,12 +603,10 @@
     s->put_tmp_disconnected = GNUNET_NO;
     GNUNET_break (s->client_put == NULL);
     if (GNUNET_SYSERR == client_connect_put (s))
-    {
       return GNUNET_SYSERR;
-    }
   }
-
-  client_schedule (s->plugin, GNUNET_YES);
+  client_schedule (s->plugin,
+                   GNUNET_YES);
   return msgbuf_size;
 }
 
@@ -655,7 +659,11 @@
     s->overhead = 0;
     GNUNET_free (pos);
   }
-
+  GNUNET_assert (0 == s->msgs_in_queue);
+  GNUNET_assert (0 == s->bytes_in_queue);
+  notify_session_monitor (plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_DOWN);
   if (NULL != s->msg_tk)
   {
     GNUNET_SERVER_mst_destroy (s->msg_tk);
@@ -675,8 +683,8 @@
  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  */
 static int
-http_client_session_disconnect (void *cls,
-                                struct Session *s)
+http_client_plugin_session_disconnect (void *cls,
+                                       struct Session *s)
 {
   struct HTTP_Client_Plugin *plugin = cls;
   struct HTTP_Message *msg;
@@ -688,7 +696,9 @@
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Session %p/connection %p: disconnecting PUT connection to peer 
`%s'\n",
-         s, s->client_put, GNUNET_i2s (&s->target));
+         s,
+         s->client_put,
+         GNUNET_i2s (&s->target));
 
     /* remove curl handle from multi handle */
     mret = curl_multi_remove_handle (plugin->curl_multi_handle, s->client_put);
@@ -801,7 +811,7 @@
   struct HTTP_Client_Plugin *plugin = cls;
   struct Session *session = value;
 
-  http_client_session_disconnect (plugin, session);
+  http_client_plugin_session_disconnect (plugin, session);
   return GNUNET_OK;
 }
 
@@ -815,8 +825,8 @@
  * @param target peer from which to disconnect
  */
 static void
-http_client_peer_disconnect (void *cls,
-                             const struct GNUNET_PeerIdentity *target)
+http_client_plugin_peer_disconnect (void *cls,
+                                    const struct GNUNET_PeerIdentity *target)
 {
   struct HTTP_Client_Plugin *plugin = cls;
 
@@ -896,7 +906,12 @@
 
 
 /**
- * FIXME.
+ * When we have nothing to transmit, we pause the HTTP PUT
+ * after a while (so that gnurl stops asking).  This task
+ * is the delayed task that actually pauses the PUT.
+ *
+ * @param cls the `struct Session *` with the put
+ * @param tc scheduler context
  */
 static void
 client_put_disconnect (void *cls,
@@ -952,7 +967,9 @@
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Session %p/connection %p: nothing to send, suspending\n",
          s, s->client_put);
-    s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed 
(PUT_DISCONNECT_TIMEOUT, &client_put_disconnect, s);
+    s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed 
(PUT_DISCONNECT_TIMEOUT,
+                                                           
&client_put_disconnect,
+                                                           s);
     s->put_paused = GNUNET_YES;
     return CURL_READFUNC_PAUSE;
   }
@@ -985,6 +1002,9 @@
     s->overhead = 0;
     GNUNET_free (msg);
   }
+  notify_session_monitor (plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP);
   GNUNET_asprintf (&stat_txt,
                    "# bytes currently in %s_client buffers",
                    plugin->protocol);
@@ -1024,10 +1044,19 @@
        "Session %p/connection %p: Waking up GET handle\n",
        s,
        s->client_get);
-  s->put_paused = GNUNET_NO;
+  if (GNUNET_YES == s->put_paused)
+  {
+    /* PUT connection was paused, unpause */
+    GNUNET_assert (s->put_disconnect_task != GNUNET_SCHEDULER_NO_TASK);
+    GNUNET_SCHEDULER_cancel (s->put_disconnect_task);
+    s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK;
+    s->put_paused = GNUNET_NO;
+    if (NULL != s->client_put)
+      curl_easy_pause (s->client_put, CURLPAUSE_CONT);
+  }
   if (NULL != s->client_get)
-    curl_easy_pause (s->client_get, CURLPAUSE_CONT);
-
+    curl_easy_pause (s->client_get,
+                     CURLPAUSE_CONT);
 }
 
 
@@ -1055,7 +1084,10 @@
   atsi.value = s->ats_address_network_type;
   GNUNET_break (s->ats_address_network_type != ntohl 
(GNUNET_ATS_NET_UNSPECIFIED));
 
-  delay = s->plugin->env->receive (plugin->env->cls, s->address, s, message);
+  delay = s->plugin->env->receive (plugin->env->cls,
+                                   s->address,
+                                   s,
+                                   message);
   plugin->env->update_address_metrics (plugin->env->cls,
                                       s->address, s,
                                       &atsi, 1);
@@ -1064,12 +1096,12 @@
                    "# bytes received via %s_client",
                    plugin->protocol);
   GNUNET_STATISTICS_update (plugin->env->stats,
-                            stat_txt, ntohs(message->size), GNUNET_NO);
+                            stat_txt,
+                            ntohs (message->size),
+                            GNUNET_NO);
   GNUNET_free (stat_txt);
 
-  s->next_receive =
-      GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay);
-
+  s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
   if (GNUNET_TIME_absolute_get ().abs_value_us < s->next_receive.abs_value_us)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1117,7 +1149,10 @@
  * @return bytes read from stream
  */
 static size_t
-client_receive (void *stream, size_t size, size_t nmemb, void *cls)
+client_receive (void *stream,
+                size_t size,
+                size_t nmemb,
+                void *cls)
 {
   struct Session *s = cls;
   struct GNUNET_TIME_Absolute now;
@@ -1131,11 +1166,13 @@
   if (now.abs_value_us < s->next_receive.abs_value_us)
   {
     struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
-    struct GNUNET_TIME_Relative delta =
-        GNUNET_TIME_absolute_get_difference (now, s->next_receive);
+    struct GNUNET_TIME_Relative delta
+      = GNUNET_TIME_absolute_get_difference (now, s->next_receive);
+
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Session %p / connection %p: No inbound bandwidth available! Next 
read was delayed for %s\n",
-         s, s->client_get,
+         s,
+         s->client_get,
          GNUNET_STRINGS_relative_time_to_string (delta,
                                                  GNUNET_YES));
     if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1143,13 +1180,21 @@
       GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
       s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
     }
-    s->recv_wakeup_task =
-        GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s);
+    s->recv_wakeup_task
+      = GNUNET_SCHEDULER_add_delayed (delta,
+                                      &client_wake_up,
+                                      s);
     return CURL_WRITEFUNC_PAUSE;
   }
   if (NULL == s->msg_tk)
-    s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
-  GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO);
+    s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb,
+                                          s);
+  GNUNET_SERVER_mst_receive (s->msg_tk,
+                             s,
+                             stream,
+                             len,
+                             GNUNET_NO,
+                             GNUNET_NO);
   return len;
 }
 
@@ -1161,7 +1206,8 @@
  * @param tc gnunet scheduler task context
  */
 static void
-client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+client_run (void *cls,
+            const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -1172,7 +1218,8 @@
  * @return #GNUNET_SYSERR for hard failure, #GNUNET_OK for ok
  */
 static int
-client_schedule (struct HTTP_Client_Plugin *plugin, int now)
+client_schedule (struct HTTP_Client_Plugin *plugin,
+                 int now)
 {
   fd_set rs;
   fd_set ws;
@@ -1212,7 +1259,8 @@
 
   if (mret != CURLM_OK)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"),
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("%s failed at %s:%d: `%s'\n"),
                 "curl_multi_timeout", __FILE__, __LINE__,
                 curl_multi_strerror (mret));
     return GNUNET_SYSERR;
@@ -1237,18 +1285,17 @@
  * Task performing curl operations
  *
  * @param cls plugin as closure
- * @param tc gnunet scheduler task context
+ * @param tc scheduler task context
  */
 static void
-client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+client_run (void *cls,
+            const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct HTTP_Client_Plugin *plugin = cls;
   int running;
   long http_statuscode;
   CURLMcode mret;
 
-  GNUNET_assert (cls != NULL);
-
   plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
@@ -1267,7 +1314,7 @@
       struct Session *s = NULL;
       char *d = (char *) s;
 
-      if (easy_h == NULL)
+      if (NULL == easy_h)
       {
         GNUNET_break (0);
         LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1353,7 +1400,7 @@
             /* Disconnect other transmission direction and tell transport */
             s->get.easyhandle = NULL;
             s->get.s = NULL;
-            http_client_session_disconnect (plugin, s);
+            http_client_plugin_session_disconnect (plugin, s);
         }
       }
     }
@@ -1568,7 +1615,6 @@
 static int
 client_connect (struct Session *s)
 {
-
   struct HTTP_Client_Plugin *plugin = s->plugin;
   int res = GNUNET_OK;
 
@@ -1584,8 +1630,9 @@
   }
 
   GNUNET_asprintf (&s->url, "%s/%s;%u",
-                  http_common_plugin_address_to_url (NULL, s->address->address,
-                      s->address->address_length),
+                  http_common_plugin_address_to_url (NULL,
+                                                      s->address->address,
+                                                      
s->address->address_length),
                   GNUNET_i2s_full (plugin->env->my_identity),
                   plugin->last_tag);
 
@@ -1613,7 +1660,6 @@
                          HTTP_STAT_STR_CONNECTIONS,
                          plugin->cur_connections,
                          GNUNET_NO);
-
   /* Re-schedule since handles have changed */
   if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
   {
@@ -1633,8 +1679,8 @@
  * @return the network type
  */
 static enum GNUNET_ATS_Network_Type
-http_client_get_network (void *cls,
-                        struct Session *session)
+http_client_plugin_get_network (void *cls,
+                                struct Session *session)
 {
   return ntohl (session->ats_address_network_type);
 }
@@ -1673,7 +1719,7 @@
        GNUNET_STRINGS_relative_time_to_string (HTTP_CLIENT_SESSION_TIMEOUT,
                                                GNUNET_YES));
   GNUNET_assert (GNUNET_OK ==
-                 http_client_session_disconnect (s->plugin,
+                 http_client_plugin_session_disconnect (s->plugin,
                                                  s));
 }
 
@@ -1732,7 +1778,6 @@
       salen = sizeof (struct sockaddr_in6);
     }
     ats = plugin->env->get_address_type (plugin->env->cls, sa, salen);
-    //fprintf (stderr, "Address %s is in %s\n", GNUNET_a2s (sa,salen), 
GNUNET_ATS_print_network_type(ntohl(ats.value)));
     GNUNET_free (sa);
   }
   else if (GNUNET_NO == res)
@@ -1783,6 +1828,9 @@
     client_delete_session (s);
     return NULL;
   }
+  notify_session_monitor (plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP); /* or handshake? */
   return s;
 }
 
@@ -1811,19 +1859,6 @@
 
 
 /**
- * Increment session timeout due to activity for session @a s.
- *
- * @param s the session
- */
-static void
-client_reschedule_session_timeout (struct Session *s)
-{
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
-  s->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-}
-
-
-/**
  * Another peer has suggested an address for this
  * peer and transport plugin.  Check that this could be a valid
  * address.  If so, consider adding it to the list
@@ -1907,9 +1942,11 @@
 
 
   /* Optional parameters */
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg,
-                      plugin->name,
-                      "MAX_CONNECTIONS", &max_connections))
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg,
+                                             plugin->name,
+                                             "MAX_CONNECTIONS",
+                                             &max_connections))
     max_connections = 128;
   plugin->max_connections = max_connections;
 
@@ -1990,14 +2027,15 @@
     }
 
     /* proxy http tunneling */
-    if (GNUNET_SYSERR == (plugin->proxy_use_httpproxytunnel = 
GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg,
-        plugin->name, "PROXY_HTTP_TUNNELING")))
+    plugin->proxy_use_httpproxytunnel
+      = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg,
+                                              plugin->name,
+                                              "PROXY_HTTP_TUNNELING");
+    if (GNUNET_SYSERR == plugin->proxy_use_httpproxytunnel)
       plugin->proxy_use_httpproxytunnel = GNUNET_NO;
 
     GNUNET_free_non_null (proxy_type);
   }
-
-
   return GNUNET_OK;
 }
 
@@ -2046,6 +2084,38 @@
 
 
 /**
+ * Function that will be called whenever the transport service wants to
+ * notify the plugin that the inbound quota changed and that the plugin
+ * should update it's delay for the next receive value
+ *
+ * @param cls closure
+ * @param peer which peer was the session for
+ * @param session which session is being updated
+ * @param delay new delay to use for receiving
+ */
+static void
+http_client_plugin_update_inbound_delay (void *cls,
+                                         const struct GNUNET_PeerIdentity 
*peer,
+                                         struct Session *s,
+                                         struct GNUNET_TIME_Relative delay)
+{
+  s->next_receive = GNUNET_TIME_relative_to_absolute (delay);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "New inbound delay %s\n",
+       GNUNET_STRINGS_relative_time_to_string (delay,
+                                               GNUNET_NO));
+  if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
+    s->recv_wakeup_task
+      = GNUNET_SCHEDULER_add_delayed (delay,
+                                      &client_wake_up,
+                                      s);
+  }
+}
+
+
+/**
  * Return information about the given session to the
  * monitor callback.
  *
@@ -2131,16 +2201,17 @@
   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
   api->cls = plugin;
   api->send = &http_client_plugin_send;
-  api->disconnect_session = &http_client_session_disconnect;
+  api->disconnect_session = &http_client_plugin_session_disconnect;
   api->query_keepalive_factor = &http_client_query_keepalive_factor;
-  api->disconnect_peer = &http_client_peer_disconnect;
+  api->disconnect_peer = &http_client_plugin_peer_disconnect;
   api->check_address = &http_client_plugin_address_suggested;
   api->get_session = &http_client_plugin_get_session;
   api->address_to_string = &http_client_plugin_address_to_string;
   api->string_to_address = &http_common_plugin_string_to_address;
   api->address_pretty_printer = &http_common_plugin_address_pretty_printer;
-  api->get_network = &http_client_get_network;
+  api->get_network = &http_client_plugin_get_network;
   api->update_session_timeout = &http_client_plugin_update_session_timeout;
+  api->update_inbound_delay = &http_client_plugin_update_inbound_delay;
   api->setup_monitor = &http_client_plugin_setup_monitor;
 #if BUILD_HTTPS
   plugin->name = "transport-https_client";
@@ -2150,7 +2221,6 @@
   plugin->protocol = "http";
 #endif
   plugin->last_tag = 1;
-  plugin->options = 0; /* Setup options */
 
   if (GNUNET_SYSERR == client_configure_plugin (plugin))
   {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]