gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: tng: towards communicator f


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: tng: towards communicator flow control:
Date: Thu, 24 Jan 2019 21:08:17 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 0c8d3480c tng: towards communicator flow control:
0c8d3480c is described below

commit 0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3
Author: Christian Grothoff <address@hidden>
AuthorDate: Thu Jan 24 21:08:12 2019 +0100

    tng: towards communicator flow control:
---
 src/transport/gnunet-service-tng.c | 407 ++++++++++++++++++++++++++-----------
 1 file changed, 292 insertions(+), 115 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 6c3373013..8febbdfff 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -99,18 +99,26 @@
 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
- * How many messages can we have pending for a given client process
- * before we start to drop incoming messages?  We typically should
- * have only one client and so this would be the primary buffer for
- * messages, so the number should be chosen rather generously.
- *
- * The expectation here is that most of the time the queue is large
- * enough so that a drop is virtually never required.  Note that
- * this value must be about as large as 'TOTAL_MSGS' in the
- * 'test_transport_api_reliability.c', otherwise that testcase may
- * fail.
+ * How many messages can we have pending for a given communicator
+ * process before we start to throttle that communicator?
+ * 
+ * Used if a communicator might be CPU-bound and cannot handle the traffic.
+ */
+#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
+
+/**
+ * How many messages can we have pending for a given session (queue to
+ * a particular peer via a communicator) process before we start to
+ * throttle that queue?
+ * 
+ * Used if ATS assigns more bandwidth to a particular transmission
+ * method than that transmission method can right now handle. (Yes,
+ * ATS should eventually notice utilization below allocation and
+ * adjust, but we don't want to queue up tons of messages in the
+ * meantime). Must be significantly below
+ * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
  */
-#define MAX_PENDING (128 * 1024)
+#define SESSION_QUEUE_LIMIT 32
 
 
 GNUNET_NETWORK_STRUCT_BEGIN
@@ -554,6 +562,40 @@ struct TransportClient;
 struct Neighbour;
 
 
+/**
+ * Entry identifying transmission in one of our `struct
+ * GNUNET_ATS_Sessions` which still awaits an ACK.  This is used to
+ * ensure we do not overwhelm a communicator and limit the number of
+ * messages outstanding per communicator (say in case communicator is
+ * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
+ * what the communicator can actually provide towards a particular
+ * peer/target).
+ */
+struct QueueEntry
+{
+
+  /**
+   * Kept as a DLL.
+   */ 
+  struct QueueEntry *next;
+
+  /**
+   * Kept as a DLL.
+   */ 
+  struct QueueEntry *prev;
+
+  /**
+   * ATS session this entry is queued with.
+   */
+  struct GNUNET_ATS_Session *session;
+  
+  /**
+   * Message ID used for this message with the queue used for transmission.
+   */
+  uint64_t mid;
+};
+
+
 /**
  * An ATS session is a message queue provided by a communicator
  * via which we can reach a particular neighbour.
@@ -580,6 +622,16 @@ struct GNUNET_ATS_Session
    */
   struct GNUNET_ATS_Session *next_client;
 
+  /**
+   * Head of DLL of unacked transmission requests.
+   */ 
+  struct QueueEntry *queue_head;
+
+  /**
+   * End of DLL of unacked transmission requests.
+   */ 
+  struct QueueEntry *queue_tail;
+
   /**
    * Which neighbour is this ATS session for?
    */
@@ -611,6 +663,11 @@ struct GNUNET_ATS_Session
    */
   struct GNUNET_TIME_Relative rtt;
 
+  /**
+   * Message ID generator for transmissions on this queue.
+   */ 
+  uint64_t mid_gen;
+  
   /**
    * Unique identifier of this ATS session with the communicator.
    */
@@ -627,24 +684,29 @@ struct GNUNET_ATS_Session
   uint32_t distance;
 
   /**
-   * Network type offered by this ATS session.
+   * Messages pending.
    */
-  enum GNUNET_NetworkType nt;
+  uint32_t num_msg_pending;
 
   /**
-   * Connection status for this ATS session.
+   * Bytes pending.
    */
-  enum GNUNET_TRANSPORT_ConnectionStatus cs;
+  uint32_t num_bytes_pending;
 
   /**
-   * Messages pending.
+   * Length of the DLL starting at @e queue_head.
    */
-  uint32_t num_msg_pending;
+  unsigned int queue_length;
+  
+  /**
+   * Network type offered by this ATS session.
+   */
+  enum GNUNET_NetworkType nt;
 
   /**
-   * Bytes pending.
+   * Connection status for this ATS session.
    */
-  uint32_t num_bytes_pending;
+  enum GNUNET_TRANSPORT_ConnectionStatus cs;
 
   /**
    * How much outbound bandwidth do we have available for this session?
@@ -842,11 +904,6 @@ struct PendingMessage
    * initialized if @e msg_uuid_set is #GNUNET_YES).
    */
   struct GNUNET_ShortHashCode msg_uuid;
-
-  /**
-   * Message ID used for this message with the queue used for transmission.
-   */
-  uint64_t mid;
   
   /**
    * Counter incremented per generated fragment.
@@ -1034,6 +1091,13 @@ struct TransportClient
        */
       struct AddressListEntry *addr_tail;
 
+      /**
+       * Number of queue entries in all queues to this communicator. Used
+       * throttle sending to a communicator if we see that the communicator
+       * is globally unable to keep up.
+       */
+      unsigned int total_queue_length;
+      
       /**
        * Characteristics of this communicator.
        */
@@ -1382,41 +1446,142 @@ cores_send_disconnect_info (const struct 
GNUNET_PeerIdentity *pid)
 
 
 /**
- * Free @a queue.
+ * We believe we are ready to transmit a message on a queue. Double-checks
+ * with the queue's "tracker_out" and then gives the message to the 
+ * communicator for transmission (updating the tracker, and re-scheduling
+ * itself if applicable).  
+ *
+ * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ */ 
+static void
+transmit_on_queue (void *cls);
+
+
+/**
+ * Schedule next run of #transmit_on_queue().  Does NOTHING if 
+ * we should run immediately or if the message queue is empty.
+ * Test for no task being added AND queue not being empty to
+ * transmit immediately afterwards!  This function must only
+ * be called if the message queue is non-empty!
  *
- * @param queue the queue to free
+ * @param queue the queue to do scheduling for
+ */ 
+static void
+schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
+{
+  struct Neighbour *n = queue->neighbour;
+  struct PendingMessage *pm = n->pending_msg_head;
+  struct GNUNET_TIME_Relative out_delay;
+  unsigned int wsize;
+
+  GNUNET_assert (NULL != pm);
+  if (queue->tc->details.communicator.total_queue_length >= 
COMMUNICATOR_TOTAL_QUEUE_LIMIT)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# Transmission throttled due to communicator 
queue limit",
+                             1,
+                             GNUNET_NO);    
+    return;
+  }
+  if (queue->queue_length >= SESSION_QUEUE_LIMIT)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# Transmission throttled due to session queue 
limit",
+                             1,
+                             GNUNET_NO);    
+    return;
+  }
+      
+  wsize = (0 == queue->mtu)
+    ? pm->bytes_msg /* FIXME: add overheads? */
+    : queue->mtu;
+  out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
+                                                 wsize);
+  out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining 
(pm->next_attempt),
+                                       out_delay);
+  if (0 == out_delay.rel_value_us)
+    return; /* we should run immediately! */
+  /* queue has changed since we were scheduled, reschedule again */
+  queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
+                                                      &transmit_on_queue,
+                                                      queue);
+  if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Next transmission on queue `%s' in %s (high delay)\n",
+               queue->address,
+               GNUNET_STRINGS_relative_time_to_string (out_delay,
+                                                       GNUNET_YES));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Next transmission on queue `%s' in %s\n",
+               queue->address,
+               GNUNET_STRINGS_relative_time_to_string (out_delay,
+                                                       GNUNET_YES));
+}
+
+
+/**
+ * Free @a session.
+ *
+ * @param session the session to free
  */
 static void
-free_queue (struct GNUNET_ATS_Session *queue)
+free_session (struct GNUNET_ATS_Session *session)
 {
-  struct Neighbour *neighbour = queue->neighbour;
-  struct TransportClient *tc = queue->tc;
+  struct Neighbour *neighbour = session->neighbour;
+  struct TransportClient *tc = session->tc;
   struct MonitorEvent me = {
     .cs = GNUNET_TRANSPORT_CS_DOWN,
     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
   };
+  struct QueueEntry *qe;
+  int maxxed;
 
-  if (NULL != queue->transmit_task)
+  if (NULL != session->transmit_task)
   {
-    GNUNET_SCHEDULER_cancel (queue->transmit_task);
-    queue->transmit_task = NULL;
+    GNUNET_SCHEDULER_cancel (session->transmit_task);
+    session->transmit_task = NULL;
   }
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                neighbour->session_head,
                                neighbour->session_tail,
-                               queue);
+                               session);
   GNUNET_CONTAINER_MDLL_remove (client,
                                tc->details.communicator.session_head,
                                tc->details.communicator.session_tail,
-                               queue);  
+                               session);
+  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= 
tc->details.communicator.total_queue_length);
+  while (NULL != (qe = session->queue_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (session->queue_head,
+                                session->queue_tail,
+                                qe);
+    session->queue_length--;
+    tc->details.communicator.total_queue_length--;
+    GNUNET_free (qe);
+  }
+  GNUNET_assert (0 == session->queue_length);
+  if ( (maxxed) &&
+       (COMMUNICATOR_TOTAL_QUEUE_LIMIT < 
tc->details.communicator.total_queue_length) )
+  {
+    /* Communicator dropped below threshold, resume all queues */
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# Transmission throttled due to communicator 
queue limit",
+                             -1,
+                             GNUNET_NO);
+    for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
+        NULL != s;
+        s = s->next_client)
+      schedule_transmit_on_queue (s);
+  }
   notify_monitors (&neighbour->pid,
-                  queue->address,
-                  queue->nt,
+                  session->address,
+                  session->nt,
                   &me);
-  GNUNET_ATS_session_del (queue->sr);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
-  GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
-  GNUNET_free (queue);
+  GNUNET_ATS_session_del (session->sr);
+  GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
+  GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
+  GNUNET_free (session);
   if (NULL == neighbour->session_head)
   {
     cores_send_disconnect_info (&neighbour->pid);
@@ -1499,7 +1664,7 @@ client_disconnect_cb (void *cls,
       struct AddressListEntry *ale;
 
       while (NULL != (q = tc->details.communicator.session_head))
-       free_queue (q);
+       free_session (q);
       while (NULL != (ale = tc->details.communicator.addr_head))
        free_address_list_entry (ale);
       GNUNET_free (tc->details.communicator.address_prefix);
@@ -2103,64 +2268,6 @@ tracker_update_in_cb (void *cls)
 }
 
 
-/**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the 
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).  
- *
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
- */ 
-static void
-transmit_on_queue (void *cls);
-
-
-/**
- * Schedule next run of #transmit_on_queue().  Does NOTHING if 
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards!  This function must only
- * be called if the message queue is non-empty!
- *
- * @param queue the queue to do scheduling for
- */ 
-static void
-schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
-{
-  struct Neighbour *n = queue->neighbour;
-  struct PendingMessage *pm = n->pending_msg_head;
-  struct GNUNET_TIME_Relative out_delay;
-  unsigned int wsize;
-
-  GNUNET_assert (NULL != pm);
-  wsize = (0 == queue->mtu)
-    ? pm->bytes_msg /* FIXME: add overheads? */
-    : queue->mtu;
-  out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
-                                                 wsize);
-  out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining 
(pm->next_attempt),
-                                       out_delay);
-  if (0 == out_delay.rel_value_us)
-    return; /* we should run immediately! */
-  /* queue has changed since we were scheduled, reschedule again */
-  queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
-                                                      &transmit_on_queue,
-                                                      queue);
-  if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-               "Next transmission on queue `%s' in %s (high delay)\n",
-               queue->address,
-               GNUNET_STRINGS_relative_time_to_string (out_delay,
-                                                       GNUNET_YES));
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Next transmission on queue `%s' in %s\n",
-               queue->address,
-               GNUNET_STRINGS_relative_time_to_string (out_delay,
-                                                       GNUNET_YES));
-}
-
-
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds 
  * additional fragments to the neighbour as well. If the
@@ -2317,6 +2424,7 @@ transmit_on_queue (void *cls)
 {
   struct GNUNET_ATS_Session *queue = cls;
   struct Neighbour *n = queue->neighbour;
+  struct QueueEntry *qe;
   struct PendingMessage *pm;
   struct PendingMessage *s;
   uint32_t overhead;
@@ -2361,19 +2469,29 @@ transmit_on_queue (void *cls)
     return;
   }
 
-  // pm->mid = queue->mid_gen++;
+  /* Pass 's' for transission to the communicator */
+  qe = GNUNET_new (struct QueueEntry);
+  qe->mid = queue->mid_gen++;
+  qe->session = queue;
+  // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
+  GNUNET_CONTAINER_DLL_insert (queue->queue_head,
+                              queue->queue_tail,
+                              qe);
   env = GNUNET_MQ_msg_extra (smt,
                             s->bytes_msg,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
   smt->qid = queue->qid;
-  // smt->mid = pm->mid;
-  // smt->receiver = pid;
+  smt->mid = qe->mid;
+  smt->receiver = n->pid;
   memcpy (&smt[1],
          &s[1],
          s->bytes_msg);
+  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
+  queue->queue_length++;
+  queue->tc->details.communicator.total_queue_length++;
+  GNUNET_MQ_send (queue->tc->mq,
+                 env);
   
-  // FIXME: actually give 's' to communicator for transmission here!
-
   // FIXME: do something similar to the logic below
   // in defragmentation / reliability ACK handling!
 
@@ -2653,18 +2771,18 @@ handle_del_queue_message (void *cls,
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
-  for (struct GNUNET_ATS_Session *queue = 
tc->details.communicator.session_head;
-       NULL != queue;
-       queue = queue->next_client)
+  for (struct GNUNET_ATS_Session *session = 
tc->details.communicator.session_head;
+       NULL != session;
+       session = session->next_client)
   {
-    struct Neighbour *neighbour = queue->neighbour;
+    struct Neighbour *neighbour = session->neighbour;
 
-    if ( (dqm->qid != queue->qid) ||
+    if ( (dqm->qid != session->qid) ||
         (0 != memcmp (&dqm->receiver,
                       &neighbour->pid,
                       sizeof (struct GNUNET_PeerIdentity))) )
       continue;
-    free_queue (queue);
+    free_session (session);
     GNUNET_SERVICE_client_continue (tc->client);
     return;
   }
@@ -2684,20 +2802,79 @@ handle_send_message_ack (void *cls,
                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
 {
   struct TransportClient *tc = cls;
-
+  struct QueueEntry *queue;
+  
   if (CT_COMMUNICATOR != tc->type)
   {
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (tc->client);
     return;
   }
+
+  /* find our queue entry matching the ACK */
+  queue = NULL;
+  for (struct GNUNET_ATS_Session *session = 
tc->details.communicator.session_head;
+       NULL != session;
+       session = session->next_client)
+  {
+    if (0 != memcmp (&session->neighbour->pid,
+                    &sma->receiver,
+                    sizeof (struct GNUNET_PeerIdentity)))
+      continue;
+    for (struct QueueEntry *qe = session->queue_head;
+        NULL != qe;
+        qe = qe->next)
+    {
+      if (qe->mid != sma->mid)
+       continue;
+      queue = qe;
+      break;
+    }
+    break;     
+  }
+  if (NULL == queue)
+  {
+    /* this should never happen */
+    GNUNET_break (0); 
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
+                              queue->session->queue_tail,
+                              queue);
+  queue->session->queue_length--;
+  tc->details.communicator.total_queue_length--;
+  GNUNET_SERVICE_client_continue (tc->client);
+
+  /* if applicable, resume transmissions that waited on ACK */
+  if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == 
tc->details.communicator.total_queue_length)
+  {
+    /* Communicator dropped below threshold, resume all queues */
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# Transmission throttled due to communicator 
queue limit",
+                             -1,
+                             GNUNET_NO);
+    for (struct GNUNET_ATS_Session *session = 
tc->details.communicator.session_head;
+        NULL != session;
+        session = session->next_client)
+      schedule_transmit_on_queue (session);
+  }
+  else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
+  {
+    /* queue dropped below threshold; only resume this one queue */
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# Transmission throttled due to session queue 
limit",
+                             -1,
+                             GNUNET_NO);    
+    schedule_transmit_on_queue (queue->session);
+  }
+ 
+  /* TODO: we also should react on the status! */
+  // FIXME: this probably requires queue->pm = s assignment!
   // FIXME: react to communicator status about transmission request. We got:
   sma->status; // OK success, SYSERR failure
-  sma->mid; // message ID of original message
-  sma->receiver; // receiver of original message
 
-  
-  GNUNET_SERVICE_client_continue (tc->client);
+  GNUNET_free (queue);
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]