gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: reassembly logic


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: reassembly logic
Date: Sat, 26 Jan 2019 17:20:52 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new f8cea0ca2 reassembly logic
f8cea0ca2 is described below

commit f8cea0ca2c5a683cdd0209e3027599c56b2ec4f3
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Jan 26 17:20:47 2019 +0100

    reassembly logic
---
 src/transport/gnunet-service-tng.c | 123 +++++++++++++++++++++++++++++++++++--
 1 file changed, 117 insertions(+), 6 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index ac4a262d7..5a8bf5bc1 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -50,6 +50,9 @@
  *   directions, allow multiple messages per peer simultaneously (tag
  *   confirmations with unique message ID), and replace quota-out with
  *   proper flow control;
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ *   (requires planning at receiver, and additional MST-style demultiplex
+ *    at receiver!)
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
@@ -2799,6 +2802,37 @@ check_fragment_box (void *cls,
 }
 
 
+/**
+ * Generate a fragment acknowledgement for an @a rc.
+ *
+ * @param rc context to generate ACK for, @a rc ACK state is reset
+ */
+static void
+send_fragment_ack (struct ReassemblyContext *rc)
+{
+  struct TransportFragmentAckMessage *ack;
+
+  ack = GNUNET_new (struct TransportFragmentAckMessage);
+  ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
+  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
+  ack->frag_uuid = htonl (rc->frag_uuid);
+  ack->extra_acks = GNUNET_htonll (rc->extra_acks);
+  ack->msg_uuid = rc->msg_uuid;
+  ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
+  if (0 == rc->msg_missing)
+    ack->reassembly_timeout
+      = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal 
completion */
+  else
+    ack->reassembly_timeout
+      = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining 
(rc->reassembly_timeout));
+  route_message (&rc->neighbour->pid,
+                &ack->header);
+  rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
+  rc->num_acks = 0;
+  rc->extra_acks = 0LLU;
+}
+
+
 /**
  * Communicator gave us a fragment.  Process the request.
  *
@@ -2814,6 +2848,12 @@ handle_fragment_box (void *cls,
   struct ReassemblyContext *rc;
   const struct GNUNET_MessageHeader *msg;
   uint16_t msize;
+  uint16_t fsize;
+  uint16_t frag_off;
+  uint32_t frag_uuid;
+  char *target;
+  struct GNUNET_TIME_Relative cdelay;
+  int ack_now;
 
   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
                                         &cmc->im.sender);
@@ -2856,9 +2896,14 @@ handle_fragment_box (void *cls,
                                                       &rc->msg_uuid,
                                                       rc,
                                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size);
+    target = (char *) &rc[1];
+    rc->bitfield = (uint8_t *) (target + rc->msg_size);
     rc->msg_missing = rc->msg_size;
   }
+  else
+  {
+    target = (char *) &rc[1];
+  }
   if (msize != rc->msg_size)
   {
     GNUNET_break (0);
@@ -2866,12 +2911,78 @@ handle_fragment_box (void *cls,
     return;
   }
   
-  // FIXME: do work: reassemble
-
+  /* reassemble */
+  fsize = ntohs (fb->header.size) - sizeof (*fb);
+  frag_off = ntohs (fb->frag_off);
+  memcpy (&target[frag_off],
+         &fb[1],
+         fsize);
+  /* update bitfield and msg_missing */
+  for (unsigned int i=frag_off;i<frag_off+fsize;i++)
+  {
+    if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
+    {
+      rc->bitfield[i / 8] |= (1 << (i % 8));
+      rc->msg_missing--;
+    }
+  }
+                                   
+  /* Compute cummulative ACK */
+  frag_uuid = ntohl (fb->frag_uuid);
+  cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
+  cdelay = GNUNET_TIME_relative_multiply (cdelay,
+                                         rc->num_acks);
+  rc->last_frag = GNUNET_TIME_absolute_get ();
+  rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
+                                               cdelay);
+  ack_now = GNUNET_NO;
+  if (0 == rc->num_acks)
+  {
+    /* case one: first ack */
+    rc->frag_uuid = frag_uuid;
+    rc->extra_acks = 0LLU;
+    rc->num_acks = 1;
+  }
+  else if ( (frag_uuid >= rc->frag_uuid) &&
+           (frag_uuid <= rc->frag_uuid + 64) )
+  {
+    /* case two: ack fits after existing min UUID */
+    if ( (frag_uuid == rc->frag_uuid) ||
+        (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
+    {
+      /* duplicate fragment, ack now! */
+      ack_now = GNUNET_YES;
+    }
+    else
+    {
+      rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
+      rc->num_acks++;
+    }
+  }
+  else if ( (rc->frag_uuid > frag_uuid) &&
+           ( ( (rc->frag_uuid == frag_uuid + 64) &&
+               (0 == rc->extra_acks) ) ||
+             ( (rc->frag_uuid < frag_uuid + 64) &&
+               (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - 
(rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
+  {
+    /* can fit ack by shifting extra acks and starting at 
+       frag_uid, test above esured that the bits we will
+       shift 'extra_acks' by are all zero. */
+    rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
+    rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
+    rc->frag_uuid = frag_uuid;
+    rc->num_acks++;
+  }
+  if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very 
aggressive. */
+    ack_now = GNUNET_YES; /* maximum acks received */
+  // FIXME: possibly also ACK based on RTT (but for that we'd need to
+  // determine the session used for the ACK first!)  
+  
   /* is reassembly complete? */
   if (0 != rc->msg_missing)
   {
-    /* FIXME: possibly send ACK! */
+    if (ack_now)
+      send_fragment_ack (rc);
     finish_cmc_handling (cmc);
     return;
   }
@@ -2885,7 +2996,7 @@ handle_fragment_box (void *cls,
     return;
   }
   /* successful reassembly */
-  /* FIXME: definitively send ACK! */
+  send_fragment_ack (rc);
   demultiplex_with_cmc (cmc,
                        msg);
   /* FIXME: really free here? Might be bad if fragments are still
@@ -3146,7 +3257,7 @@ handle_dv_box (void *cls,
               const struct TransportDVBox *dvb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  uint16_t size = ntohs (dvb->header.size);
+  uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
   uint16_t num_hops = ntohs (dvb->num_hops);
   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity 
*) &dvb[1];
   const struct GNUNET_MessageHeader *inbox = (const struct 
GNUNET_MessageHeader *) &hops[num_hops];

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]