gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 02/02: clean up transmission logic to have queues


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 02/02: clean up transmission logic to have queues 'pull' for pending messages while control traffic is 'pushed' into queues
Date: Sat, 11 May 2019 22:14:13 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

commit 84b3c87161116786074b16f54f2d22e526421db0
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat May 11 22:13:47 2019 +0200

    clean up transmission logic to have queues 'pull' for pending messages 
while control traffic is 'pushed' into queues
---
 src/transport/gnunet-service-tng.c | 702 +++++++++++++++++++++++--------------
 1 file changed, 445 insertions(+), 257 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index f07e1c88d..56cf61c2b 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,15 +24,8 @@
  *
  * TODO:
  * Implement next:
- * - realize "pull" based logic (#handle_client_send()) for
- *   `struct PendingMessage` which waits for a queue on any
- *   applicable route to be 'ready', in contrast
- *   to the 'push' based routing we use for control messages.
- *   Basically, when a queue goes idle, it should "search"
- *   via its neighbour for either virtual links or DVH's that
- *   have it as first hop and then find messages in those
- *   virtual links!
- * - realize transport-to-transport flow control (needed in case
+ * - FIXME-NEXT: logic to decide which pm to pick for a given queue (sorting!)
+ * - FIXME-FC: realize transport-to-transport flow control (needed in case
  *   communicators do not offer flow control).  Note that we may not
  *   want to simply delay the ACKs as that may cause unnecessary
  *   re-transmissions. => Introduce proper flow and congestion window(s)!
@@ -1375,7 +1368,7 @@ struct DistanceVector
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
 
   /**
    * Signature affirming @e ephemeral_key of type
@@ -1565,6 +1558,12 @@ struct Queue
    * Connection status for this queue.
    */
   enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+  /**
+   * Set to #GNUNET_YES if this queue is idle waiting for some
+   * virtual link to give it a pending message.
+   */
+  int idle;
 };
 
 
@@ -1696,7 +1695,7 @@ struct Neighbour
    * Do we have a confirmed working queue and are thus visible to
    * CORE?  If so, this is the virtual link, otherwise NULL.
    */
-  struct VirtualLink *link;
+  struct VirtualLink *vl;
 
   /**
    * Latest DVLearn monotonic time seen from this peer.  Initialized only
@@ -1766,17 +1765,7 @@ enum PendingMessageType
   /**
    * Reliability box.
    */
-  PMT_RELIABILITY_BOX = 2,
-
-  /**
-   * Any type of acknowledgement.
-   */
-  PMT_ACKNOWLEDGEMENT = 3,
-
-  /**
-   * Control traffic generated by the TRANSPORT service itself.
-   */
-  PMT_CONTROL = 4
+  PMT_RELIABILITY_BOX = 2
 
 };
 
@@ -2751,6 +2740,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 }
 
 
+/**
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
+ *
+ * @param cls a `struct VirtualLink`
+ */
+static void
+check_link_down (void *cls);
+
+
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Informing CORE clients about disconnect from %s\n",
+              GNUNET_i2s (pid));
+  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+  {
+    struct GNUNET_MQ_Envelope *env;
+    struct DisconnectInfoMessage *dim;
+
+    if (CT_CORE != tc->type)
+      continue;
+    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+    dim->peer = *pid;
+    GNUNET_MQ_send (tc->mq, env);
+  }
+}
+
+
 /**
  * Free entry in #dv_routes.  First frees all hops to the target, and
  * if there are no entries left, frees @a dv as well.
@@ -2766,11 +2790,33 @@ free_dv_route (struct DistanceVector *dv)
     free_distance_vector_hop (dvh);
   if (NULL == dv->dv_head)
   {
+    struct VirtualLink *vl;
+
     GNUNET_assert (
       GNUNET_YES ==
       GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+    if (NULL != (vl = dv->vl))
+    {
+      GNUNET_assert (dv == vl->dv);
+      vl->dv = NULL;
+      if (NULL == vl->n)
+      {
+        cores_send_disconnect_info (&dv->target);
+        free_virtual_link (vl);
+      }
+      else
+      {
+        GNUNET_SCHEDULER_cancel (vl->visibility_task);
+        vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+      }
+      dv->vl = NULL;
+    }
+
     if (NULL != dv->timeout_task)
+    {
       GNUNET_SCHEDULER_cancel (dv->timeout_task);
+      dv->timeout_task = NULL;
+    }
     GNUNET_free (dv);
   }
 }
@@ -2950,6 +2996,7 @@ static void
 free_neighbour (struct Neighbour *neighbour)
 {
   struct DistanceVectorHop *dvh;
+  struct VirtualLink *vl;
 
   GNUNET_assert (NULL == neighbour->queue_head);
   GNUNET_assert (GNUNET_YES ==
@@ -2989,6 +3036,22 @@ free_neighbour (struct Neighbour *neighbour)
     GNUNET_PEERSTORE_store_cancel (neighbour->sc);
     neighbour->sc = NULL;
   }
+  if (NULL != (vl = neighbour->vl))
+  {
+    GNUNET_assert (neighbour == vl->n);
+    vl->n = NULL;
+    if (NULL == vl->dv)
+    {
+      cores_send_disconnect_info (&vl->target);
+      free_virtual_link (vl);
+    }
+    else
+    {
+      GNUNET_SCHEDULER_cancel (vl->visibility_task);
+      vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+    }
+    neighbour->vl = NULL;
+  }
   GNUNET_free (neighbour);
 }
 
@@ -3033,31 +3096,6 @@ cores_send_connect_info (const struct 
GNUNET_PeerIdentity *pid)
 }
 
 
-/**
- * Send message to CORE clients that we lost a connection.
- *
- * @param pid peer the connection was for
- */
-static void
-cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Informing CORE clients about disconnect from %s\n",
-              GNUNET_i2s (pid));
-  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
-  {
-    struct GNUNET_MQ_Envelope *env;
-    struct DisconnectInfoMessage *dim;
-
-    if (CT_CORE != tc->type)
-      continue;
-    env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
-    dim->peer = *pid;
-    GNUNET_MQ_send (tc->mq, env);
-  }
-}
-
-
 /**
  * We believe we are ready to transmit a message on a queue. Gives the
  * message to the communicator for transmission (updating the tracker,
@@ -3070,19 +3108,15 @@ transmit_on_queue (void *cls);
 
 
 /**
- * Schedule next run of #transmit_on_queue().  Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards!  This function must only
- * be called if the message queue is non-empty!
+ * Called whenever something changed that might effect when we
+ * try to do the next transmission on @a queue using #transmit_on_queue().
  *
  * @param queue the queue to do scheduling for
- * @param inside_job set to #GNUNET_YES if called from
- *            #transmit_on_queue() itself and NOT setting
- *            the task means running immediately
+ * @param p task priority to use, if @a queue is scheduled
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue, int inside_job)
+schedule_transmit_on_queue (struct Queue *queue,
+                            enum GNUNET_SCHEDULER_Priority p)
 {
   if (queue->tc->details.communicator.total_queue_length >=
       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
@@ -3092,6 +3126,7 @@ schedule_transmit_on_queue (struct Queue *queue, int 
inside_job)
       "# Transmission throttled due to communicator queue limit",
       1,
       GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
@@ -3100,38 +3135,18 @@ schedule_transmit_on_queue (struct Queue *queue, int 
inside_job)
                               "# Transmission throttled due to queue queue 
limit",
                               1,
                               GNUNET_NO);
+    queue->idle = GNUNET_NO;
     return;
   }
-#if FIXME - NEXT
-  struct Neighbour *n = queue->neighbour;
-  struct GNUNET_TIME_Relative out_delay;
-
-  if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
-  {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Schedule transmission <%llu> on queue %llu of %s decides to run 
immediately\n",
-      pm->logging_uuid,
-      (unsigned long long) queue->qid,
-      GNUNET_i2s (&n->pid));
-    return; /* we should run immediately! */
-  }
-  /* queue has changed since we were scheduled, reschedule again */
+  /* queue might indeed be ready, schedule it */
+  if (NULL != queue->transmit_task)
+    GNUNET_SCHEDULER_cancel (queue->transmit_task);
   queue->transmit_task =
-    GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
-  if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Next transmission <%llu> on queue `%s' in %s (high delay)\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, 
GNUNET_YES));
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Next transmission <%llu> on queue `%s' in %s\n",
-                pm->logging_uuid,
-                queue->address,
-                GNUNET_STRINGS_relative_time_to_string (out_delay, 
GNUNET_YES));
-#endif
+    GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Considering transmission on queue `%s' to %s\n",
+              queue->address,
+              GNUNET_i2s (&queue->neighbour->pid));
 }
 
 
@@ -3156,15 +3171,21 @@ check_link_down (void *cls)
        pos = pos->next_dv)
     dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, 
pos->path_valid_until);
   if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  {
+    vl->dv->vl = NULL;
     vl->dv = NULL;
+  }
   q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
     q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
-  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+  {
+    vl->n->vl = NULL;
     vl->n = NULL;
+  }
   if ((NULL == vl->n) && (NULL == vl->dv))
   {
-    cores_send_disconnect_info (&dv->target);
+    cores_send_disconnect_info (&vl->target);
     free_virtual_link (vl);
     return;
   }
@@ -3229,7 +3250,7 @@ free_queue (struct Queue *queue)
   if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
                    tc->details.communicator.total_queue_length))
   {
-    /* Communicator dropped below threshold, resume all queues */
+    /* Communicator dropped below threshold, resume all _other_ queues */
     GNUNET_STATISTICS_update (
       GST_stats,
       "# Transmission throttled due to communicator queue limit",
@@ -3237,7 +3258,7 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s, GNUNET_NO);
+      schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_free (queue);
@@ -3579,6 +3600,79 @@ pick_random_dv_hops (const struct DistanceVector *dv,
 }
 
 
+/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+  struct Neighbour *n = vl->n;
+  struct DistanceVector *dv = vl->dv;
+  struct GNUNET_TIME_Absolute now;
+  int elig;
+
+  /* FIXME-FC: need to implement virtual link flow control! */
+
+  /* Check that we have an eligible pending message!
+     (cheaper than having #transmit_on_queue() find out!) */
+  elig = GNUNET_NO;
+  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+       pm = pm->next_vl)
+  {
+    if (NULL != pm->qe)
+      continue; /* not eligible, is in a queue! */
+    elig = GNUNET_YES;
+    break;
+  }
+  if (GNUNET_NO == elig)
+    return;
+
+  /* Notify queues at direct neighbours that we are interested */
+  now = GNUNET_TIME_absolute_get ();
+  if (NULL != n)
+  {
+    for (struct Queue *queue = n->queue_head; NULL != queue;
+         queue = queue->next_neighbour)
+      if ((GNUNET_YES == queue->idle) &&
+          (queue->validated_until.abs_value_us > now.abs_value_us))
+        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  }
+  /* Notify queues via DV that we are interested */
+  if (NULL != dv)
+  {
+    /* Do DV with lower scheduler priority, which effectively means that
+       IF a neighbour exists and is available, we prefer it. */
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+    {
+      struct Neighbour *nh = pos->next_hop;
+
+      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+        continue; /* skip this one: path not validated */
+      for (struct Queue *queue = nh->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        if ((GNUNET_YES == queue->idle) &&
+            (queue->validated_until.abs_value_us > now.abs_value_us))
+          schedule_transmit_on_queue (queue,
+                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+    }
+  }
+}
+
+
 /**
  * Client asked for transmission to a peer.  Process the request.
  *
@@ -3594,7 +3688,6 @@ handle_client_send (void *cls, const struct 
OutboundMessage *obm)
   uint32_t bytes_msg;
   struct VirtualLink *vl;
   enum GNUNET_MQ_PriorityPreferences pp;
-  int was_empty;
 
   GNUNET_assert (CT_CORE == tc->type);
   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
@@ -3631,32 +3724,11 @@ handle_client_send (void *cls, const struct 
OutboundMessage *obm)
                                 tc->details.core.pending_msg_head,
                                 tc->details.core.pending_msg_tail,
                                 pm);
-  was_empty = (NULL == vl->pending_msg_head);
   GNUNET_CONTAINER_MDLL_insert (vl,
                                 vl->pending_msg_head,
                                 vl->pending_msg_tail,
                                 pm);
-  if (! was_empty)
-    return; /* all queues must already be busy */
-#if 0
-  // FIXME: check if any DVH or neighbour queue of 'vl'
-  // is ready for transmission now. If so, encapsulate
-  // 'pm' accordingly and send!
-  for (struct Queue *queue = target->queue_head; NULL != queue;
-       queue = queue->next_neighbour)
-  {
-    /* try transmission on any queue that is idle */
-    if (NULL == queue->transmit_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Queue %llu to %s is idle, triggering transmission\n",
-                  (unsigned long long) queue->qid,
-                  GNUNET_i2s (&queue->neighbour->pid));
-      queue->transmit_task =
-        GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
-    }
-  }
-#endif
+  check_vl_transmission (vl);
 }
 
 
@@ -3861,7 +3933,7 @@ update_ephemeral (struct DistanceVector *dv)
 
 
 /**
- * Send the control message @a payload on @a queue.
+ * Send the message @a payload on @a queue.
  *
  * @param queue the queue to use for transmission
  * @param pm pending message to update once transmission is done, may be NULL!
@@ -3879,6 +3951,7 @@ queue_send_msg (struct Queue *queue,
   struct GNUNET_TRANSPORT_SendMessageTo *smt;
   struct GNUNET_MQ_Envelope *env;
 
+  queue->idle = GNUNET_NO;
   GNUNET_log (
     GNUNET_ERROR_TYPE_DEBUG,
     "Queueing %u bytes of payload for transmission <%llu> on queue %llu to 
%s\n",
@@ -3910,6 +3983,11 @@ queue_send_msg (struct Queue *queue,
     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
     queue->queue_length++;
     queue->tc->details.communicator.total_queue_length++;
+    if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
+        queue->tc->details.communicator.total_queue_length)
+      queue->idle = GNUNET_NO;
+    if (QUEUE_LENGTH_LIMIT == queue->queue_length)
+      queue->idle = GNUNET_NO;
     GNUNET_MQ_send (queue->tc->mq, env);
   }
 }
@@ -5209,6 +5287,50 @@ update_dvh_performance (struct DistanceVectorHop *dvh,
 }
 
 
+/**
+ * We have completed transmission of @a pm, remove it from
+ * the transmission queues (and if it is a fragment, continue
+ * up the tree as necessary).
+ *
+ * @param pm pending message that was transmitted
+ */
+static void
+completed_pending_message (struct PendingMessage *pm)
+{
+  struct PendingMessage *pos;
+
+  switch (pm->pmt)
+  {
+  case PMT_CORE:
+  case PMT_RELIABILITY_BOX:
+    /* Full message sent, we are done */
+    client_send_response (pm);
+    return;
+  case PMT_FRAGMENT_BOX:
+    /* Fragment sent over reliabile channel */
+    free_fragment_tree (pm);
+    pos = pm->frag_parent;
+    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+    GNUNET_free (pm);
+    /* check if subtree is done */
+    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+           (pos != pm))
+    {
+      pm = pos;
+      pos = pm->frag_parent;
+      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+      GNUNET_free (pm);
+    }
+
+    /* Was this the last applicable fragmment? */
+    if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
+        (pos->frag_off == pos->bytes_msg))
+      client_send_response (pos);
+    return;
+  }
+}
+
+
 /**
  * The @a pa was acknowledged, process the acknowledgement.
  *
@@ -5220,7 +5342,6 @@ static void
 handle_acknowledged (struct PendingAcknowledgement *pa,
                      struct GNUNET_TIME_Relative ack_delay)
 {
-  struct PendingMessage *pm = pa->pm;
   struct GNUNET_TIME_Relative delay;
 
   delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
@@ -5232,25 +5353,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa,
     update_queue_performance (pa->queue, delay, pa->message_size);
   if (NULL != pa->dvh)
     update_dvh_performance (pa->dvh, delay, pa->message_size);
-  if (NULL != pm)
-  {
-    if (NULL != pm->frag_parent)
-    {
-      pm = pm->frag_parent;
-      free_fragment_tree (pa->pm);
-    }
-    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
-    {
-      struct PendingMessage *parent = pm->frag_parent;
-
-      free_fragment_tree (pm);
-      pm = parent;
-    }
-    if (NULL != pm->head_frag)
-      pm = NULL; /* we are done, otherwise free 'pm' below */
-  }
-  if (NULL != pm)
-    free_pending_message (pm);
+  if (NULL != pa->pm)
+    completed_pending_message (pa->pm);
   free_pending_acknowledgement (pa);
 }
 
@@ -5494,6 +5598,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop 
*hop)
     GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
   vl->target = dv->target;
   vl->dv = dv;
+  dv->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
@@ -7185,6 +7290,7 @@ handle_validation_response (
   vl = GNUNET_new (struct VirtualLink);
   vl->target = n->pid;
   vl->n = n;
+  n->vl = vl;
   vl->core_recv_window = RECV_WINDOW_SIZE;
   vl->visibility_task =
     GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
@@ -7604,10 +7710,134 @@ update_pm_next_attempt (struct PendingMessage *pm,
 
 
 /**
- * We believe we are ready to transmit a message on a queue.
- * Gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * Context for #select_best_pending_from_link().
+ */
+struct PendingMessageScoreContext
+{
+  /**
+   * Set to the best message that was found, NULL for none.
+   */
+  struct PendingMessage *best;
+
+  /**
+   * DVH that @e best should take, or NULL for direct transmission.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * What is the estimated total overhead for this message?
+   */
+  size_t real_overhead;
+
+  /**
+   * Number of pending messages we seriously considered this time.
+   */
+  unsigned int consideration_counter;
+
+  /**
+   * Did we have to fragment?
+   */
+  int frag;
+
+  /**
+   * Did we have to reliability box?
+   */
+  int relb;
+};
+
+
+/**
+ * Select the best pending message from @a vl for transmission
+ * via @a queue.
+ *
+ * @param sc[in,out] best message so far (NULL for none), plus scoring data
+ * @param queue the queue that will be used for transmission
+ * @param vl the virtual link providing the messages
+ * @param dvh path we are currently considering, or NULL for none
+ * @param overhead number of bytes of overhead to be expected
+ *        from DV encapsulation (0 for without DV)
+ */
+static void
+select_best_pending_from_link (struct PendingMessageScoreContext *sc,
+                               struct Queue *queue,
+                               struct VirtualLink *vl,
+                               struct DistanceVectorHop *dvh,
+                               size_t overhead)
+{
+  /* FIXME-NEXT: right now we ignore all the 'fancy' sorting
+     we do on the pending message list, resulting in a
+     linear time algorithm (PLUS linear time list management).
+     So we should probably either avoid keeping a sorted list,
+     or find a way to make the sorting useful here! */
+  for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
+       pos = pos->next_vl)
+  {
+    size_t real_overhead = overhead;
+    int frag;
+    int relb;
+
+    if (NULL != pos->qe)
+      continue; /* not eligible */
+    sc->consideration_counter++;
+    /* determine if we have to reliability-box, if so add reliability box
+       overhead */
+    relb = GNUNET_NO;
+    if ((GNUNET_NO == frag) &&
+        (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
+        (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
+    {
+      relb = GNUNET_YES;
+      real_overhead += sizeof (struct TransportReliabilityBoxMessage);
+    }
+    /* determine if we have to fragment, if so add fragmentation
+       overhead! */
+    frag = GNUNET_NO;
+    if ( ( (0 != queue->mtu) &&
+           (pos->bytes_msg + real_overhead > queue->mtu) ) ||
+         (pos->bytes_msg > UINT16_MAX - sizeof (struct 
GNUNET_TRANSPORT_SendMessageTo)) ||
+         (NULL != pos->head_frag /* fragments already exist, should
+                                    respect that even if MTU is 0 for
+                                    this queue */) )
+    {
+      frag = GNUNET_YES;
+      relb = GNUNET_NO; /* if we fragment, we never also reliability box */
+      if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+      {
+        /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation
+           header without the ACK UUID when using a *reliable* channel! */
+      }
+      real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage);
+    }
+
+    /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
+       message would beat it! */
+    if (NULL != sc->best)
+    {
+      /* FIXME-NEXT: CHECK if pos fits queue BETTER than pm, if not:
+         continue; */
+      /* NOTE: use 'overhead' to estimate need for fragmentation,
+         prefer it if MTU is sufficient and close! */
+    }
+    sc->best = pos;
+    sc->dvh = dvh;
+    sc->frag = frag;
+    sc->relb = relb;
+  }
+}
+
+
+/**
+ * We believe we are ready to transmit a `struct PendingMessage` on a
+ * queue, the big question is which one!  We need to see if there is
+ * one pending that is allowed by flow control and congestion control
+ * and (ideally) matches our queue's performance profile.
+ *
+ * If such a message is found, we give the message to the communicator
+ * for transmission (updating the tracker, and re-scheduling ourselves
+ * if applicable).
+ *
+ * If no such message is found, the queue's `idle` field must be set
+ * to #GNUNET_YES.
  *
  * @param cls the `struct Queue` to process transmissions for
  */
@@ -7615,128 +7845,99 @@ static void
 transmit_on_queue (void *cls)
 {
   struct Queue *queue = cls;
-
-  queue->transmit_task = NULL;
-#if FIXME - NEXT
   struct Neighbour *n = queue->neighbour;
+  struct PendingMessageScoreContext sc;
   struct PendingMessage *pm;
-  struct PendingMessage *s;
-  uint32_t overhead;
 
-  if (NULL == (pm = n->pending_msg_head))
+  queue->transmit_task = NULL;
+  if (NULL == n->vl)
   {
-    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No messages waiting on queue %s to %s, going to sleep\n",
-                queue->address,
-                GNUNET_i2s (&n->pid));
+                "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
+                GNUNET_i2s (&n->pid),
+                queue->address);
+    queue->idle = GNUNET_YES;
     return;
   }
-  if (NULL != pm->qe)
+  memset (&sc, 0, sizeof (sc));
+  select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
+  if (NULL == sc.best)
+  {
+    /* Also look at DVH that have the n as first hop! */
+    for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
+         dvh = dvh->next_neighbour)
+    {
+      select_best_pending_from_link (&sc,
+                                     queue,
+                                     dvh->dv->vl,
+                                     dvh,
+                                     sizeof (struct GNUNET_PeerIdentity) *
+                                         (1 + dvh->distance) +
+                                       sizeof (struct TransportDVBoxMessage) +
+                                       sizeof (struct TransportDVBoxPayloadP));
+    }
+  }
+  if (NULL == sc.best)
   {
-    /* message still pending with communciator!
-       LOGGING-FIXME: Use stats? Should this not be rare? */
+    /* no message pending, nothing to do here! */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Waiting on communicator for queue %s to %s, going to sleep\n",
+                "No pending messages, queue `%s' to %s now idle\n",
                 queue->address,
                 GNUNET_i2s (&n->pid));
+    queue->idle = GNUNET_YES;
     return;
   }
-  schedule_transmit_on_queue (queue, GNUNET_YES);
-  if (NULL != queue->transmit_task)
+
+  /* Given selection in `sc`, do transmission */
+  pm = sc.best;
+  if (GNUNET_YES == sc.frag)
   {
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Scheduled transmission on queue %s to %s for later, going to sleep\n",
-      queue->address,
-      GNUNET_i2s (&n->pid));
-    return; /* do it later */
-  }
-  overhead = 0;
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    overhead += sizeof (struct TransportReliabilityBoxMessage);
-  s = pm;
-  if ( ( (0 != queue->mtu) &&
-        (pm->bytes_msg + overhead > queue->mtu) ) ||
-       (pm->bytes_msg > UINT16_MAX - sizeof (struct 
GNUNET_TRANSPORT_SendMessageTo)) ||
-       (NULL != pm->head_frag /* fragments already exist, should
-                                respect that even if MTU is 0 for
-                                this queue */) )
-    s = fragment_message (queue, pm->dvh, s);
-  if (NULL == s)
-  {
-    /* Fragmentation failed, try next message... */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Fragmentation failed queue %s to %s for <%llu>, trying 
again\n",
-                queue->address,
-                GNUNET_i2s (&n->pid),
-                pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
-    return;
+    pm = fragment_message (queue, sc.dvh, sc.best);
+    if (NULL == pm)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Fragmentation failed queue %s to %s for <%llu>, trying 
again\n",
+                  queue->address,
+                  GNUNET_i2s (&n->pid),
+                  pm->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+    }
   }
-  if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
-    s = reliability_box_message (queue, pm->dvh, s);
-  if (NULL == s)
+  else if (GNUNET_YES == sc.relb)
   {
-    /* Reliability boxing failed, try next message... */
-    GNUNET_log (
-      GNUNET_ERROR_TYPE_DEBUG,
-      "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
-      queue->address,
-      GNUNET_i2s (&n->pid),
-      pm->logging_uuid);
-    schedule_transmit_on_queue (queue, GNUNET_NO);
-    return;
+    pm = reliability_box_message (queue, sc.dvh, sc.best);
+    if (NULL == pm)
+    {
+      /* Reliability boxing failed, try next message... */
+      GNUNET_log (
+        GNUNET_ERROR_TYPE_DEBUG,
+        "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
+        queue->address,
+        GNUNET_i2s (&n->pid),
+        pm->logging_uuid);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      return;
+    }
   }
+  else
+    pm = sc.best; /* no boxing required */
 
-  /* Pass 's' for transission to the communicator */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Passing message <%llu> to queue %s for peer %s\n",
-              s->logging_uuid,
-              queue->address,
-              GNUNET_i2s (&n->pid));
-  queue_send_msg (queue, s, &s[1], s->bytes_msg);
-  // FIXME: do something similar to the logic below
-  // in defragmentation / reliability ACK handling!
+  /* Pass 'pm' for transission to the communicator */
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
+    pm->logging_uuid,
+    queue->address,
+    GNUNET_i2s (&n->pid),
+    sc.consideration_counter);
+  queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
 
   /* Check if this transmission somehow conclusively finished handing 'pm'
      even without any explicit ACKs */
-  if ((PMT_CORE == s->pmt) &&
+  if ((PMT_CORE == pm->pmt) ||
       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
   {
-    /* Full message sent, and over reliabile channel */
-    client_send_response (pm);
-  }
-  else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
-            queue->tc->details.communicator.cc) &&
-           (PMT_FRAGMENT_BOX == s->pmt))
-  {
-    struct PendingMessage *pos;
-
-    /* Fragment sent over reliabile channel */
-    free_fragment_tree (s);
-    pos = s->frag_parent;
-    GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-    GNUNET_free (s);
-    /* check if subtree is done */
-    while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
-           (pos != pm))
-    {
-      s = pos;
-      pos = s->frag_parent;
-      GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
-      GNUNET_free (s);
-    }
-
-    /* Was this the last applicable fragmment? */
-    if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
-      client_send_response (pm);
-  }
-  else if (PMT_CORE != pm->pmt)
-  {
-    /* This was an acknowledgement of some type, always free */
-    free_pending_message (pm);
+    completed_pending_message (pm);
   }
   else
   {
@@ -7748,15 +7949,13 @@ transmit_on_queue (void *cls)
        retransmitting.  Note that in the future this heuristic should
        likely be improved further (measure RTT stability, consider
        message urgency and size when delaying ACKs, etc.) */
-    update_pm_next_attempt (s,
+    update_pm_next_attempt (pm,
                             GNUNET_TIME_relative_to_absolute (
                               GNUNET_TIME_relative_multiply 
(queue->pd.aged_rtt,
                                                              4)));
   }
-
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
 }
 
 
@@ -7871,7 +8070,7 @@ handle_send_message_ack (void *cls,
     for (struct Queue *queue = tc->details.communicator.queue_head;
          NULL != queue;
          queue = queue->next_client)
-      schedule_transmit_on_queue (queue, GNUNET_NO);
+      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
   {
@@ -7880,7 +8079,7 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to queue queue 
limit",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue, GNUNET_NO);
+    schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
 
   if (NULL != (pm = qe->pm))
@@ -7894,21 +8093,7 @@ handle_send_message_ack (void *cls,
        transmit on queue for queues of the neighbour */
     vl = pm->vl;
     if (vl->pending_msg_head == pm)
-    {
-#if FIXME - NEXT
-      for (struct Queue *queue = n->queue_head; NULL != queue;
-           queue = queue->next_neighbour)
-        schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
-    }
-    if (GNUNET_OK != ntohl (sma->status))
-    {
-      GNUNET_log (
-        GNUNET_ERROR_TYPE_INFO,
-        "Queue failed in transmission <%llu>, will try retransmission 
immediately\n",
-        pm->logging_uuid);
-      update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
-    }
+      check_vl_transmission (vl);
   }
   GNUNET_free (qe);
 }
@@ -8431,6 +8616,7 @@ handle_add_queue_message (void *cls,
   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->neighbour = neighbour;
+  queue->idle = GNUNET_YES;
   memcpy (&queue[1], addr, addr_len);
   /* notify monitors about new queue */
   {
@@ -8452,6 +8638,8 @@ handle_add_queue_message (void *cls,
                                                 &aqm->receiver,
                                                 
&check_validation_request_pending,
                                                 queue);
+  /* look for traffic for this queue */
+  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   /* might be our first queue, try launching DV learning */
   if (NULL == dvlearn_task)
     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]