gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (49227270d -> 8f49f0228)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (49227270d -> 8f49f0228)
Date: Tue, 21 May 2019 17:24:09 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a change to branch master
in repository gnunet.

    from 49227270d use bytes, not kilobytes
     new c6777519b implement GNUNET_TRANSPORT_core_receive_continue
     new 8f49f0228 implement FC window tracking for incoming messages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/transport/gnunet-service-tng.c | 157 ++++++++++++++++++++++++++++++++-----
 src/transport/transport_api_core.c |  46 +++++++++++
 2 files changed, 184 insertions(+), 19 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 2af699dc1..1e77937e4 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -34,16 +34,6 @@
  *     at the beginning when the limit is zero!)
  *   - Retransmit challenge if it goes unanswered!
  *
- *   for RECEIVING)
- *   - increment incoming_fc_window_size_ram when receiving
- *     incoming packets!
- *   - OR drop if incoming_fc_window_size_ram goes
- *     (significantly?) beyond available_fc_window_size
- *   - decrement incoming_fc_window_size_ram when CORE is done
- *     with incoming packets!
- *   - increment incoming_fc_window_size_used when CORE is done
- *     with incoming packets!
- *
  *   for DV)
  *   - send challenges via DV (when DVH is confirmed *and* we care about
  *     the target to get window size, or when DVH is unconfirmed (passive
@@ -92,7 +82,7 @@
  * - re-sending challenge response without a challenge when we have
  *   significantly increased the FC window (upon CORE being done with messages)
  *   so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
- *   Also can re-use signature in this case [CPU].
+ *   Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
@@ -1118,6 +1108,16 @@ struct PendingMessage;
  */
 struct DistanceVectorHop;
 
+/**
+ * A virtual link is another reachable peer that is known to CORE.  It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`.  With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink;
+
 
 /**
  * Context from #handle_incoming_msg().  Closure for many
@@ -1156,6 +1156,42 @@ struct CommunicatorMessageContext
 };
 
 
+/**
+ * Closure for #core_env_sent_cb.
+ */
+struct CoreSentContext
+{
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *next;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *prev;
+
+  /**
+   * Virtual link this is about.
+   */
+  struct VirtualLink *vl;
+
+  /**
+   * How big was the message.
+   */
+  uint16_t size;
+
+  /**
+   * By how much should we increment @e vl's
+   * incoming_fc_window_size_used once we are done sending to CORE?
+   * Use to ensure we do not increment twice if there is more than one
+   * CORE client.
+   */
+  uint16_t isize;
+};
+
+
 /**
  * A virtual link is another reachable peer that is known to CORE.  It
  * can be either a `struct Neighbour` with at least one confirmed
@@ -1193,6 +1229,16 @@ struct VirtualLink
    */
   struct PendingMessage *pending_msg_tail;
 
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_tail;
+
+  /**
+   * Kept in a DLL to clear @e vl in case @e vl is lost.
+   */
+  struct CoreSentContext *csc_head;
+
   /**
    * Task scheduled to possibly notfiy core that this peer is no
    * longer counting as confirmed.  Runs the #core_visibility_check(),
@@ -1300,7 +1346,10 @@ struct VirtualLink
    * we must tell communicators to stop providing us more messages
    * for this peer.  In fact, the window can go negative as we
    * have multiple communicators, so per communicator we can go
-   * down by one into the negative range.
+   * down by one into the negative range. Furthermore, we count
+   * delivery per CORE client, so if we had multiple cores, that
+   * might also cause a negative window size here (as one message
+   * would decrement the window by one per CORE client).
    */
   int core_recv_window;
 };
@@ -2846,6 +2895,7 @@ static void
 free_virtual_link (struct VirtualLink *vl)
 {
   struct PendingMessage *pm;
+  struct CoreSentContext *csc;
 
   while (NULL != (pm = vl->pending_msg_head))
     free_pending_message (pm);
@@ -2855,6 +2905,12 @@ free_virtual_link (struct VirtualLink *vl)
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
     vl->visibility_task = NULL;
   }
+  while (NULL != (csc = vl->csc_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
+    GNUNET_assert (vl == csc->vl);
+    csc->vl = NULL;
+  }
   GNUNET_break (NULL == vl->n);
   GNUNET_break (NULL == vl->dv);
   GNUNET_free (vl);
@@ -4950,6 +5006,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext 
*cmc,
                       const struct GNUNET_MessageHeader *msg);
 
 
+/**
+ * Function called when we are done giving a message of a certain
+ * size to CORE and should thus decrement the number of bytes of
+ * RAM reserved for that peer's MQ.
+ *
+ * @param cls a `struct CoreSentContext`
+ */
+static void
+core_env_sent_cb (void *cls)
+{
+  struct CoreSentContext *ctx = cls;
+  struct VirtualLink *vl = ctx->vl;
+
+  if (NULL == vl)
+  {
+    /* lost the link in the meantime, ignore */
+    GNUNET_free (ctx);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
+  GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
+  vl->incoming_fc_window_size_ram -= ctx->size;
+  vl->incoming_fc_window_size_used += ctx->isize;
+  /* TODO-M1 */
+  GNUNET_free (ctx);
+}
+
+
 /**
  * Communicator gave us an unencapsulated message to pass as-is to
  * CORE.  Process the request.
@@ -4995,30 +5079,65 @@ handle_raw_message (void *cls, const struct 
GNUNET_MessageHeader *mh)
     finish_cmc_handling (cmc);
     return;
   }
+  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC arithmetic 
overflow)",
+                              1,
+                              GNUNET_NO);
+
+    finish_cmc_handling (cmc);
+    return;
+  }
+  if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# CORE messages droped (FC window overflow)",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+
   /* Forward to all CORE clients */
   have_core = GNUNET_NO;
   for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
   {
     struct GNUNET_MQ_Envelope *env;
     struct InboundMessage *im;
+    struct CoreSentContext *ctx;
 
     if (CT_CORE != tc->type)
       continue;
-    have_core = GNUNET_YES;
+    vl->incoming_fc_window_size_ram += size;
     env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+    ctx = GNUNET_new (struct CoreSentContext);
+    ctx->vl = vl;
+    ctx->size = size;
+    ctx->isize = (GNUNET_NO == have_core) ? size : 0;
+    have_core = GNUNET_YES;
+    GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
+    GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
     im->peer = cmc->im.sender;
     memcpy (&im[1], mh, size);
     GNUNET_MQ_send (tc->mq, env);
+    vl->core_recv_window--;
   }
-  vl->core_recv_window--;
   if (GNUNET_NO == have_core)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Dropped message to CORE: no CORE client connected!\n");
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Delivered message from %s of type %u to CORE\n",
-                GNUNET_i2s (&cmc->im.sender),
-                ntohs (mh->type));
+    /* Nevertheless, count window as used, as it is from the
+       perspective of the other peer! */
+    vl->incoming_fc_window_size_used += size;
+    /* TODO-M1 */
+    finish_cmc_handling (cmc);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivered message from %s of type %u to CORE\n",
+              GNUNET_i2s (&cmc->im.sender),
+              ntohs (mh->type));
   if (vl->core_recv_window > 0)
   {
     finish_cmc_handling (cmc);
diff --git a/src/transport/transport_api_core.c 
b/src/transport/transport_api_core.c
index 54dc7f4c3..224af5de2 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -179,6 +179,15 @@ struct GNUNET_TRANSPORT_CoreHandle
    */
   struct GNUNET_TIME_Relative reconnect_delay;
 
+  /**
+   * Internal counter to check how many more receive OK messages this
+   * CORE service is allowed to send in total. Just to detect easy
+   * cases of protocol violations by the CORE implementation.
+   * NOTE: we may want to make this stronger by counting per peer
+   * instead of globally.
+   */
+  unsigned int rom_pending;
+
   /**
    * Should we check that @e self matches what the service thinks?
    * (if #GNUNET_NO, then @e self is all zeros!).
@@ -695,6 +704,7 @@ handle_recv (void *cls, const struct InboundMessage *im)
     disconnect_and_schedule_reconnect (h);
     return;
   }
+  h->rom_pending++;
   GNUNET_MQ_inject_message (n->mq, imm);
 }
 
@@ -919,4 +929,40 @@ GNUNET_TRANSPORT_core_disconnect (struct 
GNUNET_TRANSPORT_CoreHandle *handle)
 }
 
 
+/**
+ * Notification from the CORE service to the TRANSPORT service
+ * that the CORE service has finished processing a message from
+ * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
+ * and that it is thus now OK for TRANSPORT to send more messages
+ * for @a pid.
+ *
+ * Used to provide flow control, this is our equivalent to
+ * #GNUNET_SERVICE_client_continue() of an ordinary service.
+ *
+ * Note that due to the use of a window, TRANSPORT may send multiple
+ * messages destined for the same peer even without an intermediate
+ * call to this function. However, CORE must still call this function
+ * once per message received, as otherwise eventually the window will
+ * be full and TRANSPORT will stop providing messages to CORE for @a
+ * pid.
+ *
+ * @param ch core handle
+ * @param pid which peer was the message from that was fully processed by CORE
+ */
+void
+GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
+                                        const struct GNUNET_PeerIdentity *pid)
+{
+  struct RecvOkMessage *rom;
+  struct GNUNET_MQ_Envelope *env;
+
+  GNUNET_assert (ch->rom_pending > 0);
+  ch->rom_pending--;
+  env = GNUNET_MQ_msg (rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
+  rom->increase_window_delta = htonl (1);
+  rom->peer = *pid;
+  GNUNET_MQ_send (ch->mq, env);
+}
+
+
 /* end of transport_api_core.c */

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]