gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 02/02: add prototypes for handlers for incoming me


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 02/02: add prototypes for handlers for incoming messages
Date: Fri, 25 Jan 2019 18:35:10 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

commit 8e157def22a61958cd4b6c791da505a38062cc1d
Author: Christian Grothoff <address@hidden>
AuthorDate: Fri Jan 25 18:34:41 2019 +0100

    add prototypes for handlers for incoming messages
---
 src/include/gnunet_mq_lib.h        |  45 +++
 src/include/gnunet_protocols.h     |   5 +
 src/transport/gnunet-service-tng.c | 565 ++++++++++++++++++++++++++++++++-----
 src/util/mq.c                      |  44 ++-
 4 files changed, 589 insertions(+), 70 deletions(-)

diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 3f67dc365..2a459636a 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -527,6 +527,51 @@ struct GNUNET_MQ_MessageHandler
   }
 
 
+/**
+ * Insert code for a "check_" function that verifies that
+ * a given variable-length message received over the network
+ * is followed by another variable-length message that fits
+ * exactly with the given size.  If the message @a m
+ * is not followed by another `struct GNUNET_MessageHeader`
+ * with a size that adds up to the total size, an error is logged
+ * and the function is returned with #GNUNET_NO.
+ *
+ * @param an IPC message with proper type to determine
+ *  the size, starting with a `struct GNUNET_MessageHeader`
+ */
+#define GNUNET_MQ_check_boxed_message(m)                \
+  {                                                     \
+    const struct GNUNET_MessageHeader *inbox =          \
+      (const struct GNUNET_MessageHeader *) &m[1];      \
+    const struct GNUNET_MessageHeader *hdr =            \
+      (const struct GNUNET_MessageHeader *) m;          \
+    uint16_t slen = ntohs (hdr->size) - sizeof (*m);    \
+    if ( (slen < sizeof (struct GNUNET_MessageHeader))||\
+         (slen != ntohs (inbox->size)) )                \
+    {                                                   \
+      GNUNET_break (0);                                 \
+      return GNUNET_NO;                                 \
+    }                                                   \
+  }
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ *         #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+                          const struct GNUNET_MessageHeader *mh);
+
+
 /**
  * Create a new envelope.
  *
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 8593005d7..a8d716b3f 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3124,6 +3124,11 @@ extern "C"
  */ 
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219
 
+/**
+ * Transport affirming receipt of an ephemeral key.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION 1220
+
 /**
  * Message sent to indicate to the transport that a monitor
  * wants to observe certain events.
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 8febbdfff..3cccf5173 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,7 +33,7 @@
  *       transport-to-transport traffic)
  *
  * Implement:
- * - manage fragmentation/defragmentation, retransmission, track RTT, loss, 
etc.
+ * - manage defragmentation, retransmission, track RTT, loss, etc.
  *
  * Easy:
  * - use ATS bandwidth allocation callback and schedule transmissions!
@@ -165,8 +165,8 @@ struct TransportBackchannelEncapsulationMessage
 
 
 /**
- * Message by which a peer confirms that it is using an
- * ephemeral key.
+ * Body by which a peqer confirms that it is using an ephemeral
+ * key.
  */
 struct EphemeralConfirmation
 {
@@ -191,6 +191,37 @@ struct EphemeralConfirmation
 };
 
 
+/**
+ * Message by which a peqer confirms that it is using an ephemeral
+ * key.
+ */
+struct EphemeralConfirmationMessage
+{
+
+  /**
+   * Message header, type is 
#GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Must be zero.
+   */  
+  uint32_t reserved;
+  
+  /**
+   * How long is this signature over the ephemeral key
+   * valid?
+   */
+  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+  /**
+   * Ephemeral key setup by the sender for @e target, used
+   * to encrypt the payload.
+   */
+  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+};
+
+
 /**
  * Plaintext of the variable-size payload that is encrypted
  * within a `struct TransportBackchannelEncapsulationMessage`
@@ -863,7 +894,12 @@ struct PendingMessage
    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is 
#PMT_FRAGMENT_BOX)
    */
   struct PendingMessage *prev_frag;
-    
+
+  /**
+   * This message, reliability boxed. Only possibly available if @e pmt is 
#PMT_CORE.
+   */ 
+  struct PendingMessage *bpm;
+  
   /**
    * Target of the request.
    */
@@ -1797,6 +1833,36 @@ free_fragment_tree (struct PendingMessage *root)
 }
 
 
+/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures.  @a pm must be a top-level pending message and not
+ * a fragment in the tree.  The entire tree is freed (if applicable).
+ *
+ * @param pm the pending message to free
+ */
+static void
+free_pending_message (struct PendingMessage *pm)
+{
+  struct TransportClient *tc = pm->client;
+  struct Neighbour *target = pm->target;
+
+  if (NULL != tc)
+  {
+    GNUNET_CONTAINER_MDLL_remove (client,
+                                 tc->details.core.pending_msg_head,
+                                 tc->details.core.pending_msg_tail,
+                                 pm);
+  }
+  GNUNET_CONTAINER_MDLL_remove (neighbour,
+                               target->pending_msg_head,
+                               target->pending_msg_tail,
+                               pm);
+  free_fragment_tree (pm);
+  GNUNET_free_non_null (pm->bpm);
+  GNUNET_free (pm);
+}
+
+
 /**
  * Send a response to the @a pm that we have processed a
  * "send" request with status @a success. We
@@ -1829,17 +1895,8 @@ client_send_response (struct PendingMessage *pm,
     som->peer = target->pid;
     GNUNET_MQ_send (tc->mq,
                    env);
-    GNUNET_CONTAINER_MDLL_remove (client,
-                                 tc->details.core.pending_msg_head,
-                                 tc->details.core.pending_msg_tail,
-                                 pm);
   }
-  GNUNET_CONTAINER_MDLL_remove (neighbour,
-                               target->pending_msg_head,
-                               target->pending_msg_tail,
-                               pm);
-  free_fragment_tree (pm);
-  GNUNET_free (pm);
+  free_pending_message (pm);
 }
 
 
@@ -2175,37 +2232,292 @@ handle_del_address (void *cls,
 }
 
 
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+  /**
+   * Which communicator provided us with the message.
+   */
+  struct TransportClient *tc;
+
+  /**
+   * Additional information for flow control and about the sender.
+   */
+  struct GNUNET_TRANSPORT_IncomingMessage im;
+};
+
+
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  // FIXME: if (0 != ntohl (im->fc_on)) => send ACK when done to communicator 
for flow control!
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+
+  GNUNET_free (cmc);
+}
+
+
+/**
+ * Communicator gave us an unencapsulated message to pass
+ * as-is to CORE.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param mh the message that was received
+ */
+static void
+handle_raw_message (void *cls,
+                   const struct GNUNET_MessageHeader *mh)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param fb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_fragment_box (void *cls,
+                   const struct TransportFragmentBox *fb)
+{
+  // FIXME! check that off + size-of-payload <= total-length!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a fragment.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param fb the message that was received
+ */
+static void
+handle_fragment_box (void *cls,
+                    const struct TransportFragmentBox *fb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment acknowledgement.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param fa the message that was received
+ */
+static void
+handle_fragment_ack (void *cls,
+                    const struct TransportFragmentAckMessage *fa)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_reliability_box (void *cls,
+                      const struct TransportReliabilityBox *rb)
+{
+  GNUNET_MQ_check_boxed_message (rb);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a reliability box.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param rb the message that was received
+ */
+static void
+handle_reliability_box (void *cls,
+                       const struct TransportReliabilityBox *rb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability ack.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param ra the message that was received
+ */
+static void
+handle_reliability_ack (void *cls,
+                       const struct TransportReliabilityAckMessage *ra)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param be the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_backchannel_encapsulation (void *cls,
+                                const struct 
TransportBackchannelEncapsulationMessage *be)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param be the message that was received
+ */
+static void
+handle_backchannel_encapsulation (void *cls,
+                                 const struct 
TransportBackchannelEncapsulationMessage *be)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us an ephemeral confirmation.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param ec the message that was received
+ */
+static void
+handle_ephemeral_confirmation (void *cls,
+                              const struct EphemeralConfirmationMessage *ec)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV learn message.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_learn (void *cls,
+               const struct TransportDVLearn *dvl)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV learn message.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls,
+                const struct TransportDVLearn *dvl)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_box (void *cls,
+             const struct TransportDVBox *dvb)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV box.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param dvb the message that was received
+ */
+static void
+handle_dv_box (void *cls,
+              const struct TransportDVBox *dvb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
 /**
  * Client notified us about transmission from a peer.  Process the request.
  *
- * @param cls the client
+ * @param cls a `struct TransportClient` which sent us the message
  * @param obm the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
 static int
 check_incoming_msg (void *cls,
                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
   struct TransportClient *tc = cls;
-  uint16_t size;
-  const struct GNUNET_MessageHeader *obmm;
 
   if (CT_COMMUNICATOR != tc->type)
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  size = ntohs (im->header.size) - sizeof (*im);
-  if (size < sizeof (struct GNUNET_MessageHeader))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  obmm = (const struct GNUNET_MessageHeader *) &im[1];
-  if (size != ntohs (obmm->size))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
+  GNUNET_MQ_check_boxed_message (im);
   return GNUNET_OK;
 }
 
@@ -2213,7 +2525,6 @@ check_incoming_msg (void *cls,
 /**
  * Incoming meessage.  Process the request.
  *
- * @param cls the client
  * @param im the send message that was received
  */
 static void
@@ -2221,8 +2532,61 @@ handle_incoming_msg (void *cls,
                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
   struct TransportClient *tc = cls;
+  struct CommunicatorMessageContext *cmc = GNUNET_new (struct 
CommunicatorMessageContext);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (fragment_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+                          struct TransportFragmentBox,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (fragment_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
+                            struct TransportFragmentAckMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (reliability_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+                          struct TransportReliabilityBox,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (reliability_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                            struct TransportReliabilityAckMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+                          
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+                          struct TransportBackchannelEncapsulationMessage,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
+                            
GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
+                            struct EphemeralConfirmationMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (dv_learn,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+                          struct TransportDVLearn,
+                          &cmc),
+    GNUNET_MQ_hd_var_size (dv_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+                          struct TransportDVBox,
+                          &cmc),
+    GNUNET_MQ_handler_end()
+  };
+  int ret;
 
-  GNUNET_SERVICE_client_continue (tc->client);
+  cmc->tc = tc;
+  cmc->im = *im;
+  ret = GNUNET_MQ_handle_message (handlers,
+                                 (const struct GNUNET_MessageHeader *) &im[1]);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    GNUNET_free (cmc);
+    return;
+  }
+  if (GNUNET_NO == ret)
+  {
+    /* unencapsulated 'raw' message */
+    handle_raw_message (&cmc,
+                       (const struct GNUNET_MessageHeader *) &im[1]);
+  }
 }
 
 
@@ -2268,6 +2632,23 @@ tracker_update_in_cb (void *cls)
 }
 
 
+/**
+ * If necessary, generates the UUID for a @a pm
+ *
+ * @param pm pending message to generate UUID for.
+ */
+static void
+set_pending_message_uuid (struct PendingMessage *pm)
+{
+  if (pm->msg_uuid_set)
+    return;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                             &pm->msg_uuid,
+                             sizeof (pm->msg_uuid));
+  pm->msg_uuid_set = GNUNET_YES;
+}
+
+
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds 
  * additional fragments to the neighbour as well. If the
@@ -2284,13 +2665,7 @@ fragment_message (struct PendingMessage *pm,
 {
   struct PendingMessage *ff;
 
-  if (GNUNET_NO == pm->msg_uuid_set)
-  {
-    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                               &pm->msg_uuid,
-                               sizeof (pm->msg_uuid));
-    pm->msg_uuid_set = GNUNET_YES;
-  }
+  set_pending_message_uuid (pm);
   
   /* This invariant is established in #handle_add_queue_message() */
   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
@@ -2390,24 +2765,50 @@ fragment_message (struct PendingMessage *pm,
 static struct PendingMessage *
 reliability_box_message (struct PendingMessage *pm)
 {
-  if (PMT_CORE != pm->pmt) 
-  {
-    /* already fragmented or reliability boxed, or control message: do nothing 
*/
-    return pm;
-  }
-  
-  if (0) // FIXME
+  struct TransportReliabilityBox rbox;
+  struct PendingMessage *bpm;
+  char *msg;
+
+  if (PMT_CORE != pm->pmt)
+    return pm;  /* already fragmented or reliability boxed, or control 
message: do nothing */
+  if (NULL != pm->bpm)
+    return pm->bpm; /* already computed earlier: do nothing */
+  GNUNET_assert (NULL == pm->head_frag);
+  if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) 
   {
     /* failed hard */
-    // FIMXE: bitch
+    GNUNET_break (0);
     client_send_response (pm,
                          GNUNET_NO,
                          0);
     return NULL;
   }
-
-  /* FIXME: return boxed PM here! */
-  return NULL;
+  bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
+                      sizeof (rbox) + 
+                      pm->bytes_msg);
+  bpm->target = pm->target;
+  bpm->frag_parent = pm;
+  GNUNET_CONTAINER_MDLL_insert (frag,
+                               pm->head_frag,
+                               pm->tail_frag,
+                               bpm);
+  bpm->timeout = pm->timeout;
+  bpm->pmt = PMT_RELIABILITY_BOX;
+  bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+  set_pending_message_uuid (bpm);
+  rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
+  rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
+  rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+  rbox.msg_uuid = pm->msg_uuid;
+  msg = (char *) &bpm[1];
+  memcpy (msg,
+         &rbox,
+         sizeof (rbox));
+  memcpy (&msg[sizeof (rbox)],
+         &pm[1],
+         pm->bytes_msg);
+  pm->bpm = bpm;
+  return bpm;
 }
 
 
@@ -2542,26 +2943,64 @@ transmit_on_queue (void *cls)
   else if (PMT_CORE != pm->pmt)
   {
     /* This was an acknowledgement of some type, always free */
-
-    struct Neighbour *neighbour = pm->target;
-    GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                 neighbour->pending_msg_head,
-                                 neighbour->pending_msg_tail,
-                                 pm);
-    GNUNET_free (pm);
+    free_pending_message (pm);
   }
   else
   {
     /* message not finished, waiting for acknowledgement */
-    // FIXME: update time by which we might retransmit 's' based on
-    // queue characteristics (i.e. RTT)
-    
-    // FIXME: move 'pm' back in the transmission queue (simplistic: to
-    // the end, better: with position depending on type, timeout,
-    // etc.)
+    struct Neighbour *neighbour = pm->target;
+    /* Update time by which we might retransmit 's' based on queue
+       characteristics (i.e. RTT); it takes one RTT for the message to
+       arrive and the ACK to come back in the best case; but the other
+       side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
+       retransmitting.  Note that in the future this heuristic should
+       likely be improved further (measure RTT stability, consider
+       message urgency and size when delaying ACKs, etc.) */
+    s->next_attempt = GNUNET_TIME_relative_to_absolute
+      (GNUNET_TIME_relative_multiply (queue->rtt,
+                                     4));
+    if (s == pm)
+    {
+      struct PendingMessage *pos;
+
+      /* re-insert sort in neighbour list */
+      GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                   neighbour->pending_msg_head,
+                                   neighbour->pending_msg_tail,
+                                   pm);
+      pos = neighbour->pending_msg_tail;
+      while ( (NULL != pos) &&
+             (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+       pos = pos->prev_neighbour;
+      GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+                                         neighbour->pending_msg_head,
+                                         neighbour->pending_msg_tail,
+                                         pos,
+                                         pm);
+    }
+    else
+    {
+      /* re-insert sort in fragment list */
+      struct PendingMessage *fp = s->frag_parent;
+      struct PendingMessage *pos;
+
+      GNUNET_CONTAINER_MDLL_remove (frag,
+                                   fp->head_frag,
+                                   fp->tail_frag,
+                                   s);
+      pos = fp->tail_frag;
+      while ( (NULL != pos) &&
+             (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+       pos = pos->prev_frag;
+      GNUNET_CONTAINER_MDLL_insert_after (frag,
+                                         fp->head_frag,
+                                         fp->tail_frag,
+                                         pos,
+                                         s);
+    }
   }
   
-  /* finally, re-schedule self */
+  /* finally, re-schedule queue transmission task itself */
   schedule_transmit_on_queue (queue);
 }
 
diff --git a/src/util/mq.c b/src/util/mq.c
index 4dfcb72be..d2f5add19 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -214,6 +214,35 @@ struct GNUNET_MQ_Handle
 void
 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
                           const struct GNUNET_MessageHeader *mh)
+{
+  int ret;
+
+  ret = GNUNET_MQ_handle_message (mq->handlers,
+                                 mh);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_MQ_inject_error (mq,
+                           GNUNET_MQ_ERROR_MALFORMED);
+    return;
+  }
+}
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ *         #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+                          const struct GNUNET_MessageHeader *mh)
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
@@ -224,9 +253,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
        "Received message of type %u and size %u\n",
        mtype, msize);
 
-  if (NULL == mq->handlers)
+  if (NULL == handlers)
     goto done;
-  for (handler = mq->handlers; NULL != handler->cb; handler++)
+  for (handler = handlers; NULL != handler->cb; handler++)
   {
     if (handler->type == mtype)
     {
@@ -240,9 +269,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
         LOG (GNUNET_ERROR_TYPE_ERROR,
              "Received malformed message of type %u\n",
              (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
-       break;
+       return GNUNET_SYSERR;
       }
       if ( (NULL == handler->mv) ||
           (GNUNET_OK ==
@@ -257,17 +284,20 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
         LOG (GNUNET_ERROR_TYPE_ERROR,
              "Received malformed message of type %u\n",
              (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
+       return GNUNET_SYSERR;
       }
       break;
     }
   }
  done:
   if (GNUNET_NO == handled)
+  {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "No handler for message of type %u and size %u\n",
          mtype, msize);
+    return GNUNET_NO;
+  }
+  return GNUNET_OK;
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]