gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r16473 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r16473 - gnunet/src/transport
Date: Fri, 12 Aug 2011 17:49:57 +0200

Author: grothoff
Date: 2011-08-12 17:49:57 +0200 (Fri, 12 Aug 2011)
New Revision: 16473

Modified:
   gnunet/src/transport/gnunet-service-transport_neighbours.c
   gnunet/src/transport/plugin_transport_tcp.c
Log:
finishing neighbours

Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c  2011-08-12 
15:00:47 UTC (rev 16472)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c  2011-08-12 
15:49:57 UTC (rev 16473)
@@ -26,6 +26,7 @@
 #include "platform.h"
 #include "gnunet_ats_service.h"
 #include "gnunet-service-transport_neighbours.h"
+#include "gnunet-service-transport_plugins.h"
 #include "gnunet-service-transport_validation.h"
 #include "gnunet-service-transport.h"
 #include "gnunet_peerinfo_service.h"
@@ -45,15 +46,12 @@
 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
 
 
-// TODO:
-// - have a way to access the currently 'connected' session
-//   (for sending and to notice disconnect of it!)
-// - have a way to access/update bandwidth/quota information per peer
-//   (for CostReport/TrafficReport callbacks)
+/**
+ * Entry in neighbours. 
+ */
+struct NeighbourMapEntry;
 
 
-struct NeighbourMapEntry;
-
 /**
  * For each neighbour we keep a list of messages
  * that we still want to transmit to the neighbour.
@@ -72,6 +70,12 @@
   struct MessageQueue *prev;
 
   /**
+   * Once this message is actively being transmitted, which
+   * neighbour is it associated with?
+   */
+  struct NeighbourMapEntry *n;
+
+  /**
    * Function to call once we're done.
    */
   GST_NeighbourSendContinuation cont;
@@ -130,6 +134,11 @@
   struct GNUNET_TRANSPORT_ATS_Information *ats;
 
   /**
+   * Are we currently trying to send a message? If so, which one?
+   */
+  struct MessageQueue *is_active;
+
+  /**
    * Active session for communicating with the peer.
    */
   struct Session *session;
@@ -161,20 +170,12 @@
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
   /**
-   * ID of task scheduled to run when we should retry transmitting
-   * the head of the message queue.  Actually triggered when the
-   * transmission is timing out (we trigger instantly when we have
-   * a chance of success).
+   * ID of task scheduled to run when we should try transmitting
+   * the head of the message queue.  
    */
-  GNUNET_SCHEDULER_TaskIdentifier retry_task;
+  GNUNET_SCHEDULER_TaskIdentifier transmission_task;
 
   /**
-   * How long until we should consider this peer dead (if we don't
-   * receive another message in the meantime)?
-   */
-  struct GNUNET_TIME_Absolute peer_timeout;
-
-  /**
    * Tracker for inbound bandwidth.
    */
   struct GNUNET_BANDWIDTH_Tracker in_tracker;
@@ -192,15 +193,9 @@
   unsigned int ats_count;
 
   /**
-   * Have we seen an PONG from this neighbour in the past (and
-   * not had a disconnect since)?
-   */
-  // int received_pong;
-
-  /**
    * Are we already in the process of disconnecting this neighbour?
    */
-  // int in_disconnect;
+  int in_disconnect;
 
   /**
    * Do we currently consider this neighbour connected? (as far as
@@ -246,8 +241,50 @@
 }
 
 
-#if 0
 /**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We're done with our transmission attempt, continue processing.
+ *
+ * @param cls the 'struct MessageQueue' of the message
+ * @param receiver intended receiver
+ * @param success whether it worked or not
+ */
+static void
+transmit_send_continuation (void *cls,
+                           const struct GNUNET_PeerIdentity *receiver,
+                           int success)
+{
+  struct MessageQueue *mq;
+  struct NeighbourMapEntry *n;
+  
+  mq = cls;
+  n = mq->n;
+  if (NULL != n) 
+    {
+      GNUNET_assert (n->is_active == mq);
+      n->is_active = NULL;
+      GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
+      n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                      n);
+    }
+  if (NULL != mq->cont)
+    mq->cont (mq->cont_cls,
+             success);
+  GNUNET_free (mq);
+}
+
+
+/**
  * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
  *
@@ -259,44 +296,76 @@
   struct MessageQueue *mq;
   struct GNUNET_TIME_Relative timeout;
   ssize_t ret;
+  struct GNUNET_TRANSPORT_PluginFunctions *papi;
 
-  if (n->messages_head == NULL)
+  if (n->is_active != NULL)
+    return; /* transmission already pending */
+  if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
+    return; /* currently waiting for bandwidth */
+  mq = n->messages_head;
+  while (NULL != (mq = n->messages_head))
     {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Transmission queue for `%4s' is empty\n",
-                 GNUNET_i2s (&n->id));
-#endif
-      return;                     /* nothing to do */
+      timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+      if (timeout.rel_value > 0)
+       break;
+      transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
     }
-  mq = n->messages_head;
+  if (NULL == mq)
+    return; /* no more messages */
+
+  papi = GST_plugins_find (n->plugin_name);
+  if (papi == NULL)
+    {
+      GNUNET_break (0);
+      return;
+    }
   GNUNET_CONTAINER_DLL_remove (n->messages_head,
                               n->messages_tail,
                               mq);
+  n->is_active = mq;
+  mq->n = n;
   ret = papi->send (papi->cls,
-                   &n->pid,
+                   &n->id,
                    mq->message_buf,
                    mq->message_buf_size,
-                   mq->priority,
-                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                   0 /* priority -- remove from plugin API? */,
+                   timeout,
                    n->session,
                    n->addr,
                    n->addrlen,
-                   GNUNET_YES /*?*/,
+                   GNUNET_YES,
                    &transmit_send_continuation, mq);
   if (ret == -1)
     {
       /* failure, but 'send' would not call continuation in this case,
         so we need to do it here! */
       transmit_send_continuation (mq,
-                                 &mq->neighbour_id,
+                                 &n->id,
                                  GNUNET_SYSERR);
+      n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                      n);
     }
 }
-#endif
 
 
 /**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct NeighbourMapEntry *n = cls;
+
+  n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+  try_transmission_to_peer (n);
+}
+
+
+/**
  * Initialize the neighbours subsystem.
  *
  * @param cls closure for callbacks
@@ -325,23 +394,42 @@
 {
   struct MessageQueue *mq;
 
-  if (n->is_connected)
+  if (GNUNET_YES == n->in_disconnect)
+    return;
+  n->in_disconnect = GNUNET_YES;
+  while (NULL != (mq = n->messages_head))
     {
+      GNUNET_CONTAINER_DLL_remove (n->messages_head,
+                                  n->messages_tail,
+                                  mq);
+      mq->cont (mq->cont_cls, GNUNET_SYSERR);
+      GNUNET_free (mq);
+    }
+  if (NULL != n->is_active)
+    {
+      n->is_active->n = NULL;
+      n->is_active = NULL;
+    }
+  if (GNUNET_YES == n->is_connected)
+    {
+      n->is_connected = GNUNET_NO;
       disconnect_notify_cb (callback_cls,
                            &n->id);
-      n->is_connected = GNUNET_NO;
     }
   GNUNET_assert (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_remove (neighbours,
                                                       &n->id.hashPubKey,
                                                       n));
-  while (NULL != (mq = n->messages_head))
+  if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
     {
-      GNUNET_CONTAINER_DLL_remove (n->messages_head,
-                                  n->messages_tail,
-                                  mq);
-      GNUNET_free (mq);
+      GNUNET_SCHEDULER_cancel (n->timeout_task);
+      n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
     }
+  if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
+    {
+      GNUNET_SCHEDULER_cancel (n->timeout_task);
+      n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   if (NULL != n->asc)
     {
       GNUNET_ATS_suggest_address_cancel (n->asc);
@@ -446,6 +534,7 @@
                                  uint32_t ats_count)
 {
   struct NeighbourMapEntry *n;
+  struct GNUNET_MessageHeader connect_msg;
 
   n = lookup_neighbour (peer);
   if (NULL == n)
@@ -466,6 +555,17 @@
          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
   GNUNET_free_non_null (n->plugin_name);
   n->plugin_name = GNUNET_strdup (plugin_name);
+  GNUNET_SCHEDULER_cancel (n->timeout_task);
+  n->timeout_task =
+    GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                 &neighbour_timeout_task, n);
+  connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+  GST_neighbours_send (peer,
+                      &connect_msg,
+                      sizeof (connect_msg),
+                      GNUNET_TIME_UNIT_FOREVER_REL,
+                      NULL, NULL);
 }
 
 
@@ -564,7 +664,7 @@
 
   n = lookup_neighbour (target);
   if ( (NULL == n) ||
-       (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+       (n->is_connected == GNUNET_YES) )
        return GNUNET_NO; /* not connected */
   return GNUNET_YES;
 }
@@ -593,7 +693,7 @@
 
   n = lookup_neighbour (target);
   if ( (n == NULL) ||
-       (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) 
+       (GNUNET_YES != n->is_connected) )
     {
       GNUNET_STATISTICS_update (GST_stats,
                                gettext_noop ("# SET QUOTA messages ignored (no 
such peer)"),
@@ -620,7 +720,10 @@
   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
                                    n->messages_tail,
                                    mq);
-  // try_transmission_to_peer (n);
+  if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
+       (NULL == n->is_active) )
+    n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                    n);
 }
 
 
@@ -667,9 +770,6 @@
          n->quota_violation_count--;
        }
     }
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   GNUNET_SCHEDULER_cancel (n->timeout_task);
   n->timeout_task =
     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -773,8 +873,8 @@
   struct IteratorContext *ic = cls;
   struct NeighbourMapEntry *n = value;
 
-  if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0)
-    return GNUNET_OK; /* not connected */
+  if (GNUNET_YES != n->is_connected)
+    return GNUNET_OK; 
   GNUNET_assert (n->ats_count > 0);
   ic->cb (ic->cb_cls,
          &n->id,
@@ -813,9 +913,25 @@
 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
 {
   struct NeighbourMapEntry *n;
+  struct GNUNET_TRANSPORT_PluginFunctions *papi;
+  struct GNUNET_MessageHeader disconnect_msg;
 
   n = lookup_neighbour (target);
-  /* FIXME: send disconnect message to target... */
+  disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+  papi = GST_plugins_find (n->plugin_name);
+  if (papi != NULL)
+    papi->send (papi->cls,
+               target,
+               (const void*) &disconnect_msg,
+               sizeof (struct GNUNET_MessageHeader),
+               UINT32_MAX /* priority */,
+               GNUNET_TIME_UNIT_FOREVER_REL,
+               n->session,
+               n->addr,
+               n->addrlen,
+               GNUNET_YES,
+               NULL, NULL);
   disconnect_neighbour (n);
 }
 

Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2011-08-12 15:00:47 UTC (rev 
16472)
+++ gnunet/src/transport/plugin_transport_tcp.c 2011-08-12 15:49:57 UTC (rev 
16473)
@@ -834,6 +834,9 @@
         (session->transmit_handle);
       session->transmit_handle = NULL;
     }
+  session->plugin->env->session_end (session->plugin->env->cls,
+                                     &session->target,
+                                     session);
   while (NULL != (pm = session->pending_messages_head))
     {
 #if DEBUG_TCP
@@ -878,9 +881,6 @@
                            -1,
                            GNUNET_NO);
   GNUNET_free_non_null (session->connect_addr);
-  session->plugin->env->session_end (session->plugin->env->cls,
-                                     &session->target,
-                                     session);
   GNUNET_assert (NULL == session->transmit_handle);
   GNUNET_free (session);
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]