gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r8511 - gnunet/src/core


From: gnunet
Subject: [GNUnet-SVN] r8511 - gnunet/src/core
Date: Tue, 9 Jun 2009 20:19:45 -0600

Author: grothoff
Date: 2009-06-09 20:19:45 -0600 (Tue, 09 Jun 2009)
New Revision: 8511

Modified:
   gnunet/src/core/gnunet-service-core.c
Log:
bound queue size, clean up code

Modified: gnunet/src/core/gnunet-service-core.c
===================================================================
--- gnunet/src/core/gnunet-service-core.c       2009-06-10 02:18:54 UTC (rev 
8510)
+++ gnunet/src/core/gnunet-service-core.c       2009-06-10 02:19:45 UTC (rev 
8511)
@@ -24,16 +24,9 @@
  * @author Christian Grothoff
  *
  * TODO:
- * TESTING:
- * - write test for basic core functions:
- *   + connect to peer
- *   + transmit (encrypted) message [with handshake]
- *   + receive (encrypted) message, forward plaintext to clients
  * POST-TESTING:
  * - revisit API (which arguments are used, needed)?
- * - add code to bound queue size when handling client's SEND message
  * - add code to bound message queue size when passing messages to clients
- * - add code to discard_expired_messages
  * - add code to re-transmit key if first attempt failed
  *   + timeout on connect / key exchange, etc.
  *   + timeout for automatic re-try, etc.
@@ -83,6 +76,16 @@
 
 
 /**
+ * After how much time past the "official" expiration time do
+ * we discard messages?  Should not be zero since we may 
+ * intentionally defer transmission until close to the deadline
+ * and then may be slightly past the deadline due to inaccuracy
+ * in sleep and our own CPU consumption.
+ */
+#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
+
+
+/**
  * What is the maximum delay for a SET_KEY message?
  */
 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
@@ -125,6 +128,12 @@
 
 
 /**
+ * How many messages do we queue per peer at most?
+ */
+#define MAX_PEER_QUEUE_SIZE 16
+
+
+/**
  * What is the maximum age of a message for us to consider
  * processing it?  Note that this looks at the timestamp used
  * by the other peer, so clock skew between machines does
@@ -933,6 +942,7 @@
 {
   struct Client *pos;
   struct Client *prev;
+  struct Event *e;
 
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -950,6 +960,11 @@
             prev->next = pos->next;
           if (pos->th != NULL)
             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
+         while (NULL != (e = pos->event_head))
+           {
+             pos->event_head = e->next;
+             GNUNET_free (e);
+           }
           GNUNET_free (pos);
           return;
         }
@@ -1104,6 +1119,8 @@
 static void
 process_encrypted_neighbour_queue (struct Neighbour *n)
 {
+  struct MessageEntry *m;
+ 
   if (n->th != NULL)
     return;  /* request already pending */
   if (n->encrypted_head == NULL)
@@ -1132,7 +1149,13 @@
     {
       /* message request too large (oops) */
       GNUNET_break (0);
-      /* FIXME: handle error somehow! */
+      /* discard encrypted message */
+      GNUNET_assert (NULL != (m = n->encrypted_head));
+      n->encrypted_head = m->next;
+      if (m->next == NULL)
+       n->encrypted_tail = NULL;
+      GNUNET_free (m);
+      process_encrypted_neighbour_queue (n);
     }
 }
 
@@ -1448,7 +1471,29 @@
 static void
 discard_expired_messages (struct Neighbour *n)
 {
-  /* FIXME */
+  struct MessageEntry *prev;
+  struct MessageEntry *next;
+  struct MessageEntry *pos;
+  struct GNUNET_TIME_Absolute cutoff;
+
+  cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME);
+  prev = NULL;
+  pos = n->messages;
+  while (pos != NULL) 
+    {
+      next = pos->next;
+      if (pos->deadline.value < cutoff.value)
+       {
+         if (prev == NULL)
+           n->messages = next;
+         else
+           prev->next = next;
+         GNUNET_free (pos);
+       }
+      else
+       prev = pos;
+      pos = next;
+    }
 }
 
 
@@ -1651,8 +1696,6 @@
                   GNUNET_i2s (&sm->peer));
 #endif
       GNUNET_free (sm);
-      /* FIXME: do we need to do something here to let the
-        client know about the failure!? */
       return 0;
     }
 #if DEBUG_CORE
@@ -1678,9 +1721,13 @@
   struct SendMessage *smc;
   const struct GNUNET_MessageHeader *mh;
   struct Neighbour *n;
-  struct MessageEntry *pred;
+  struct MessageEntry *prev;
   struct MessageEntry *pos;
-  struct MessageEntry *e;
+  struct MessageEntry *e; 
+  struct MessageEntry *min_prio_entry;
+  struct MessageEntry *min_prio_prev;
+  unsigned int min_prio;
+  unsigned int queue_size;
   uint16_t msize;
 
   msize = ntohs (message->size);
@@ -1714,20 +1761,26 @@
 #endif
       msize += sizeof (struct SendMessage);
       /* ask transport to connect to the peer */
-      /* FIXME: this code does not handle the
-         case where we get multiple SendMessages before
-         transport responds to this request;
-         => need to track pending requests! */
       smc = GNUNET_malloc (msize);
       memcpy (smc, sm, msize);
-      GNUNET_TRANSPORT_notify_transmit_ready (transport,
-                                              &sm->peer,
-                                              0,
-                                              
GNUNET_TIME_absolute_get_remaining
-                                              (GNUNET_TIME_absolute_ntoh
-                                               (sm->deadline)),
-                                              &send_connect_continuation,
-                                              smc);
+      if (NULL ==
+         GNUNET_TRANSPORT_notify_transmit_ready (transport,
+                                                 &sm->peer,
+                                                 0,
+                                                 
GNUNET_TIME_absolute_get_remaining
+                                                 (GNUNET_TIME_absolute_ntoh
+                                                  (sm->deadline)),
+                                                 &send_connect_continuation,
+                                                 smc))
+       {
+         /* transport has already a request pending for this peer! */
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     "Dropped second message destined for `%4s' since 
connection is still down.\n",
+                     GNUNET_i2s(&sm->peer));
+#endif
+         GNUNET_free (smc);
+       }
       if (client != NULL)
         GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
@@ -1739,7 +1792,50 @@
               msize, 
              GNUNET_i2s (&sm->peer));
 #endif
-  /* FIXME: consider bounding queue size */
+  /* bound queue size */
+  discard_expired_messages (n);
+  min_prio = (unsigned int) -1;
+  queue_size = 0;
+  prev = NULL;
+  pos = n->messages;
+  while (pos != NULL) 
+    {
+      if (pos->priority < min_prio)
+       {
+         min_prio_entry = pos;
+         min_prio_prev = prev;
+         min_prio = pos->priority;
+       }
+      queue_size++;
+      prev = pos;
+      pos = pos->next;
+    }
+  if (queue_size >= MAX_PEER_QUEUE_SIZE)
+    {
+      /* queue full */
+      if (ntohl(sm->priority) <= min_prio)
+       {
+         /* discard new entry */
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Queue full, discarding new request\n");
+#endif
+         if (client != NULL)
+           GNUNET_SERVER_receive_done (client, GNUNET_OK);
+         return;
+       }
+      /* discard "min_prio_entry" */
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Queue full, discarding existing older request\n");
+#endif
+      if (min_prio_prev == NULL)
+       n->messages = min_prio_entry->next;
+      else
+       min_prio_prev->next = min_prio_entry->next;      
+      GNUNET_free (min_prio_entry);    
+    }
+  
   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
   e->priority = ntohl (sm->priority);
@@ -1747,17 +1843,17 @@
   memcpy (&e[1], mh, msize);
 
   /* insert, keep list sorted by deadline */
-  pred = NULL;
+  prev = NULL;
   pos = n->messages;
   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
     {
-      pred = pos;
+      prev = pos;
       pos = pos->next;
     }
-  if (pred == NULL)
+  if (prev == NULL)
     n->messages = e;
   else
-    pred->next = e;
+    prev->next = e;
   e->next = pos;
 
   /* consider scheduling now */
@@ -2813,6 +2909,7 @@
 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Neighbour *n;
+  struct Client *c;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2826,6 +2923,8 @@
       neighbours = n->next;
       free_neighbour (n);
     }
+  while (NULL != (c = clients))
+    handle_client_disconnect (NULL, c->client_handle);
 }
 
 
@@ -2926,10 +3025,6 @@
 
   if (my_private_key != NULL)
     GNUNET_CRYPTO_rsa_key_free (my_private_key);
-  /*
-     FIXME:
-     - free clients
-   */
 }
 
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]