gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r10666 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r10666 - gnunet/src/transport
Date: Fri, 19 Mar 2010 14:22:43 +0100

Author: grothoff
Date: 2010-03-19 14:22:43 +0100 (Fri, 19 Mar 2010)
New Revision: 10666

Modified:
   gnunet/src/transport/gnunet-service-transport.c
   gnunet/src/transport/plugin_transport.h
   gnunet/src/transport/plugin_transport_tcp.c
   gnunet/src/transport/plugin_transport_udp.c
Log:
fixing disconnect handling, making TCP plugin ready for bi-di use

Modified: gnunet/src/transport/gnunet-service-transport.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport.c     2010-03-19 13:21:31 UTC 
(rev 10665)
+++ gnunet/src/transport/gnunet-service-transport.c     2010-03-19 13:22:43 UTC 
(rev 10666)
@@ -305,11 +305,10 @@
   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
 
   /**
-   * Set to GNUNET_YES if we need to scrap the existing
-   * list of "addresses" and start fresh when we receive
-   * the next address update from a transport.  Set to
-   * GNUNET_NO if we should just add the new address
-   * to the list and wait for the commit call.
+   * Set to GNUNET_YES if we need to scrap the existing list of
+   * "addresses" and start fresh when we receive the next address
+   * update from a transport.  Set to GNUNET_NO if we should just add
+   * the new address to the list and wait for the commit call.
    */
   int rebuild;
 

Modified: gnunet/src/transport/plugin_transport.h
===================================================================
--- gnunet/src/transport/plugin_transport.h     2010-03-19 13:21:31 UTC (rev 
10665)
+++ gnunet/src/transport/plugin_transport.h     2010-03-19 13:22:43 UTC (rev 
10666)
@@ -45,7 +45,7 @@
  * @param cls closure
  * @param peer (claimed) identity of the other peer
  * @param message the message, NULL if we only care about
- *                learning about the delay until we should receive again
+ *                learning about the delay until we should receive again -- 
FIXME!
  * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
  * @param sender_address binary address of the sender (if observed)
  * @param sender_address_len number of bytes in sender_address
@@ -205,8 +205,10 @@
  *                is "on its own" (i.e. re-use existing TCP connection))
  * @param addrlen length of the address in bytes
  * @param force_address GNUNET_YES if the plugin MUST use the given address,
- *                otherwise the plugin may use other addresses or
- *                existing connections (if available)
+ *                GNUNET_NO means the plugin may use any other address and
+ *                GNUNET_SYSERR means that only reliable existing
+ *                bi-directional connections should be used (regardless
+ *                of address)
  * @param cont continuation to call once the message has
  *        been transmitted (or if the transport is ready
  *        for the next transmission call; or if the
@@ -351,7 +353,6 @@
    */
   GNUNET_TRANSPORT_CheckAddress check_address;
 
-
 };
 
 

Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2010-03-19 13:21:31 UTC (rev 
10665)
+++ gnunet/src/transport/plugin_transport_tcp.c 2010-03-19 13:22:43 UTC (rev 
10666)
@@ -17,13 +17,11 @@
      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
      Boston, MA 02111-1307, USA.
 */
-
 /**
  * @file transport/plugin_transport_tcp.c
  * @brief Implementation of the TCP transport service
  * @author Christian Grothoff
  */
-
 #include "platform.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_connection_lib.h"
@@ -185,6 +183,11 @@
    */
   int expecting_welcome;
 
+  /**
+   * Was this a connection that was inbound (we accepted)? 
(GNUNET_YES/GNUNET_NO)
+   */
+  int inbound;
+
 };
 
 
@@ -347,12 +350,17 @@
 do_transmit (void *cls, size_t size, void *buf)
 {
   struct Session *session = cls;
-  struct PendingMessage *pm;
+  struct GNUNET_PeerIdentity pid;
+  struct Plugin *plugin;
+  struct PendingMessage *pos;
+  struct PendingMessage *hd;
+  struct PendingMessage *tl;
+  struct GNUNET_TIME_Absolute now;
   char *cbuf;
-
   size_t ret;
 
   session->transmit_handle = NULL;
+  plugin = session->plugin;
   if (buf == NULL)
     {
 #if DEBUG_TCP
@@ -361,63 +369,97 @@
                        "Timeout trying to transmit to peer `%4s', discarding 
message queue.\n",
                        GNUNET_i2s (&session->target));
 #endif
-      /* timeout */
-      while (NULL != (pm = session->pending_messages_head))
-        {
+      /* timeout; cancel all messages that have already expired */
+      hd = NULL;
+      tl = NULL;
+      ret = 0;
+      now = GNUNET_TIME_absolute_get ();
+      while ( (NULL != (pos = session->pending_messages_head)) &&
+             (pos->timeout.value <= now.value) )
+       {
          GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
                                       session->pending_messages_tail,
-                                      pm);
+                                      pos);
 #if DEBUG_TCP
           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                            "tcp",
                            "Failed to transmit %u byte message to `%4s'.\n",
-                          pm->message_size,
+                          pos->message_size,
                            GNUNET_i2s (&session->target));
 #endif
-         GNUNET_STATISTICS_update (session->plugin->env->stats,
-                                   gettext_noop ("# bytes currently in TCP 
buffers"),
-                                   - (int64_t) pm->message_size,
-                                   GNUNET_NO); 
-         GNUNET_STATISTICS_update (session->plugin->env->stats,
-                                   gettext_noop ("# bytes discarded by TCP 
(timeout)"),
-                                   pm->message_size,
-                                   GNUNET_NO);      
-          if (pm->transmit_cont != NULL)
-            pm->transmit_cont (pm->transmit_cont_cls,
-                               &session->target, GNUNET_SYSERR);
-          GNUNET_free (pm);
+         ret += pos->message_size;
+         GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
         }
+      /* do this call before callbacks (so that if callbacks destroy
+        session, they have a chance to cancel actions done by this
+        call) */
+      process_pending_messages (session);  
+      pid = session->target;
+      /* no do callbacks and do not use session again since
+        the callbacks may abort the session */
+      while (NULL != (pos = hd))
+       {
+         GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+         if (pos->transmit_cont != NULL)
+           pos->transmit_cont (pos->transmit_cont_cls,
+                               &pid, GNUNET_SYSERR);
+         GNUNET_free (pos);
+       }
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                               gettext_noop ("# bytes currently in TCP 
buffers"),
+                               - (int64_t) ret,
+                               GNUNET_NO); 
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                               gettext_noop ("# bytes discarded by TCP 
(timeout)"),
+                               ret,
+                               GNUNET_NO);      
       return 0;
     }
+  /* copy all pending messages that would fit */
   ret = 0;
   cbuf = buf;
-  while (NULL != (pm = session->pending_messages_head))
+  hd = NULL;
+  tl = NULL;
+  while (NULL != (pos = session->pending_messages_head)) 
     {
-      if (size < pm->message_size)
+      if (ret + pos->message_size > size) 
        break;
-      memcpy (cbuf, pm->msg, pm->message_size);
-      cbuf += pm->message_size;
-      ret += pm->message_size;
-      size -= pm->message_size;
       GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
                                   session->pending_messages_tail,
-                                  pm);
-      GNUNET_STATISTICS_update (session->plugin->env->stats,
-                               gettext_noop ("# bytes currently in TCP 
buffers"),
-                               - (int64_t) pm->message_size,
-                               GNUNET_NO);       
-      if (pm->transmit_cont != NULL)
-        pm->transmit_cont (pm->transmit_cont_cls,
-                           &session->target, GNUNET_OK);
-      GNUNET_free (pm);
+                                  pos);
+      GNUNET_assert (size >= pos->message_size);
+      memcpy (cbuf, pos->msg, pos->message_size);
+      cbuf += pos->message_size;
+      ret += pos->message_size;
+      size -= pos->message_size;
+      GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
     }
-  if (session->client != NULL)
-    process_pending_messages (session);
+  /* schedule 'continuation' before callbacks so that callbacks that
+     cancel everything don't cause us to use a session that no longer
+     exists... */
+  process_pending_messages (session);  
+  pid = session->target;
+  /* we'll now call callbacks that may cancel the session; hence
+     we should not use 'session' after this point */
+  while (NULL != (pos = hd))
+    {
+      GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+      if (pos->transmit_cont != NULL)
+        pos->transmit_cont (pos->transmit_cont_cls,
+                           &pid, GNUNET_OK);
+      GNUNET_free (pos);
+    }
+  GNUNET_assert (hd == NULL);
+  GNUNET_assert (tl == NULL);
 #if DEBUG_TCP > 1
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                    "tcp", "Transmitting %u bytes\n", ret);
 #endif
-  GNUNET_STATISTICS_update (session->plugin->env->stats,
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                           gettext_noop ("# bytes currently in TCP buffers"),
+                           - (int64_t) ret,
+                           GNUNET_NO);       
+  GNUNET_STATISTICS_update (plugin->env->stats,
                            gettext_noop ("# bytes transmitted via TCP"),
                            ret,
                            GNUNET_NO);      
@@ -435,6 +477,7 @@
 process_pending_messages (struct Session *session)
 {
   struct PendingMessage *pm;
+
   GNUNET_assert (session->client != NULL);
   if (session->transmit_handle != NULL)
     return;
@@ -518,36 +561,15 @@
                            &session->target, GNUNET_SYSERR);
       GNUNET_free (pm);
     }
-  if (GNUNET_NO == session->expecting_welcome)
-    {
-#if DEBUG_TCP
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                       "tcp",
-                       "Notifying transport service about loss of connection 
with `%4s'.\n",
-                       GNUNET_i2s (&session->target));
-#endif
-      /* Data session that actually went past the initial handshake;
-         transport service may know about this one, so we need to
-         notify transport service about disconnect */
-      // FIXME: we should have a very clear connect-disconnect
-      // protocol with gnunet-service-transport! 
-      // FIXME: but this is not possible for all plugins, so what gives?
-    }
+  GNUNET_break (session->client != NULL);
   if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
     {
       GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
                               session->receive_delay_task);
-      if (session->client != NULL)
-       {
-         GNUNET_SERVER_receive_done (session->client, 
-                                     GNUNET_SYSERR);
-       }
+      GNUNET_SERVER_receive_done (session->client, 
+                                 GNUNET_SYSERR);       
     }
-  if (session->client != NULL)
-    {
-      GNUNET_SERVER_client_drop (session->client);
-      session->client = NULL;
-    } 
+  GNUNET_SERVER_client_drop (session->client);
   GNUNET_STATISTICS_update (session->plugin->env->stats,
                            gettext_noop ("# TCP sessions active"),
                            -1,
@@ -579,8 +601,10 @@
  *                is "on its own" (i.e. re-use existing TCP connection))
  * @param addrlen length of the address in bytes
  * @param force_address GNUNET_YES if the plugin MUST use the given address,
- *                otherwise the plugin may use other addresses or
- *                existing connections (if available)
+ *                GNUNET_NO means the plugin may use any other address and
+ *                GNUNET_SYSERR means that only reliable existing
+ *                bi-directional connections should be used (regardless
+ *                of address)
  * @param cont continuation to call once the message has
  *        been transmitted (or if the transport is ready
  *        for the next transmission call; or if the
@@ -604,6 +628,7 @@
 {
   struct Plugin *plugin = cls;
   struct Session *session;
+  struct Session *next;
   struct PendingMessage *pm;
   struct GNUNET_CONNECTION_Handle *sa;
   int af;
@@ -612,22 +637,43 @@
                            gettext_noop ("# bytes TCP was asked to transmit"),
                            msgbuf_size,
                            GNUNET_NO);      
-  session = plugin->sessions;
   /* FIXME: we could do this a cheaper with a hash table
      where we could restrict the iteration to entries that match
      the target peer... */
-  while ( (session != NULL) &&
-         ( (session->client == NULL) ||
-           (0 != memcmp (target,
-                         &session->target, 
-                         sizeof (struct GNUNET_PeerIdentity))) ||
-           ( (GNUNET_YES == force_address) &&
-             (addr != NULL) &&
-             ( (addrlen != session->connect_alen) ||
-               (0 != memcmp (session->connect_addr,
-                             addr,
-                             addrlen)) ) ) ) )
-    session = session->next;
+  next = plugin->sessions;
+  while (NULL != (session = next)) 
+    {
+      next = session->next;
+      if (session->client == NULL) 
+       continue;
+      if (0 != memcmp (target,
+                      &session->target, 
+                      sizeof (struct GNUNET_PeerIdentity)))
+       continue;
+      if (GNUNET_SYSERR == force_address) 
+       {
+         if (session->expecting_welcome == GNUNET_NO)
+           break; /* established and reliable (TCP!) */
+         else
+           continue; /* not established */
+       }
+      if (GNUNET_NO == force_address)
+       break;
+      GNUNET_break (GNUNET_YES == force_address);
+      if (addr == NULL)
+       {
+         GNUNET_break (0);
+         break;
+       }
+      if (session->inbound == GNUNET_YES) 
+       continue;
+      if (addrlen != session->connect_alen)
+       continue;
+      if (0 == memcmp (session->connect_addr,
+                      addr,
+                      addrlen))
+       break;
+    }
   if ( (session == NULL) &&
        (addr == NULL) )
     {
@@ -646,9 +692,13 @@
   if (session == NULL)
     {
       if (sizeof (struct sockaddr_in) == addrlen)
-       af = AF_INET;
+       {
+         af = AF_INET;
+       }
       else if (sizeof (struct sockaddr_in6) == addrlen)
-       af = AF_INET6;
+       {
+         af = AF_INET6;
+       }
       else
        {
          GNUNET_break_op (0);
@@ -672,7 +722,6 @@
                                    GNUNET_NO);      
          return -1;
        }
-
 #if DEBUG_TCP
       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                        "tcp",
@@ -692,7 +741,6 @@
     }
   GNUNET_assert (session != NULL);
   GNUNET_assert (session->client != NULL);
-
   GNUNET_STATISTICS_update (plugin->env->stats,
                            gettext_noop ("# bytes currently in TCP buffers"),
                            msgbuf_size,
@@ -740,10 +788,12 @@
  *        to be cancelled
  */
 static void
-tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
+tcp_plugin_disconnect (void *cls,
+                      const struct GNUNET_PeerIdentity *target)
 {
   struct Plugin *plugin = cls;
   struct Session *session;
+  struct Session *next;
   struct PendingMessage *pm;
 
 #if DEBUG_TCP
@@ -752,44 +802,53 @@
                    "Asked to cancel session with `%4s'\n",
                    GNUNET_i2s (target));
 #endif
-  session = plugin->sessions;
-  while (NULL != session)
+  next = plugin->sessions;
+  while (NULL != (session = next))
     {
-      if (0 == memcmp (target,
+      next = session->next;
+      if (0 != memcmp (target,
                       &session->target,
                       sizeof (struct GNUNET_PeerIdentity)))
+       continue;
+      pm = session->pending_messages_head;
+      while (pm != NULL)
        {
-         pm = session->pending_messages_head;
-         while (pm != NULL)
-           {
-             pm->transmit_cont = NULL;
-             pm->transmit_cont_cls = NULL;
-             pm = pm->next;
-           }
-         if (session->client != NULL)
-           {
-             GNUNET_SERVER_client_drop (session->client);
-             session->client = NULL;
-           }
-         /* rest of the clean-up of the session will be done as part of
-            disconnect_notify which should be triggered any time now
-            (or which may be triggering this call in the first place) */
+         pm->transmit_cont = NULL;
+         pm->transmit_cont_cls = NULL;
+         pm = pm->next;
        }
-      session = session->next;
+      disconnect_session (session);
     }
 }
 
 
+/**
+ * Context for address to string conversion.
+ */
 struct PrettyPrinterContext
 {
+  /**
+   * Function to call with the result.
+   */
   GNUNET_TRANSPORT_AddressStringCallback asc;
+
+  /**
+   * Clsoure for 'asc'.
+   */
   void *asc_cls;
+
+  /**
+   * Port to add after the IP address.
+   */
   uint16_t port;
 };
 
 
 /**
  * Append our port and forward the result.
+ *
+ * @param cls the 'struct PrettyPrinterContext*'
+ * @param hostname hostname part of the address
  */
 static void
 append_port (void *cls, const char *hostname)
@@ -873,6 +932,8 @@
  * our listen port or our advertised port).  If it is
  * neither, we return one of these two ports at random.
  *
+ * @param plugin global variables
+ * @param in_port port number to check
  * @return either in_port or a more plausible port
  */
 static uint16_t
@@ -890,7 +951,7 @@
  * Another peer has suggested an address for this peer and transport
  * plugin.  Check that this could be a valid address.
  *
- * @param cls closure
+ * @param cls closure, our 'struct Plugin*'
  * @param addr pointer to the address
  * @param addrlen length of addr
  * @return GNUNET_OK if this is a plausible address for this peer
@@ -967,6 +1028,7 @@
       GNUNET_SERVER_client_keep (client);
       session = create_session (plugin,
                                &wm->clientIdentity, client);
+      session->inbound = GNUNET_YES;
       if (GNUNET_OK ==
          GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
        {
@@ -1010,6 +1072,9 @@
 /**
  * Task to signal the server that we can continue
  * receiving from the TCP client now.
+ *
+ * @param cls the 'struct Session*'
+ * @param tc task context (unused)
  */
 static void
 delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -1056,7 +1121,6 @@
     }    
   if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome))
     {
-      GNUNET_break_op (0);
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
@@ -1264,10 +1328,10 @@
   if (aport != bport)
     GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                      "tcp",
-                     _
-                     ("TCP transport advertises itself as being on port 
%llu\n"),
+                     _("TCP transport advertises itself as being on port 
%llu\n"),
                      aport);
-  GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
+  GNUNET_SERVER_disconnect_notify (plugin->server, 
+                                  &disconnect_notify,
                                    plugin);
   /* FIXME: do the two calls below periodically again and
      not just once (since the info we get might change...) */

Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2010-03-19 13:21:31 UTC (rev 
10665)
+++ gnunet/src/transport/plugin_transport_udp.c 2010-03-19 13:22:43 UTC (rev 
10666)
@@ -186,8 +186,11 @@
  * @param timeout when should we time out (give up) if we can not transmit?
  * @param addr the addr to send the message to, needs to be a sockaddr for us
  * @param addrlen the len of addr
- * @param force_address not used, we had better have an address to send to
- *        because we are stateless!!
+ * @param force_address GNUNET_YES if the plugin MUST use the given address,
+ *                GNUNET_NO means the plugin may use any other address and
+ *                GNUNET_SYSERR means that only reliable existing
+ *                bi-directional connections should be used (regardless
+ *                of address)
  * @param cont continuation to call once the message has
  *        been transmitted (or if the transport is ready
  *        for the next transmission call; or if the
@@ -222,6 +225,8 @@
 #endif
       return -1; /* Can never send if we don't have an address!! */
     }
+  if (force_address == GNUNET_SYSERR)
+    return -1; /* never reliable */
 
   /* Build the message to be sent */
   message = GNUNET_malloc (sizeof (struct UDPMessage) + msgbuf_size);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]