gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13201 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r13201 - gnunet/src/fs
Date: Thu, 7 Oct 2010 15:52:14 +0200

Author: grothoff
Date: 2010-10-07 15:52:14 +0200 (Thu, 07 Oct 2010)
New Revision: 13201

Modified:
   gnunet/src/fs/fs.h
   gnunet/src/fs/gnunet-service-fs.c
Log:
adding support for artificial delays

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-10-07 12:52:19 UTC (rev 13200)
+++ gnunet/src/fs/fs.h  2010-10-07 13:52:14 UTC (rev 13201)
@@ -123,12 +123,6 @@
 #define AVAILABILITY_TRIALS_MAX 8
 
 /**
- * By how much (in ms) do we decrement the TTL
- * at each hop?
- */
-#define TTL_DECREMENT 5000
-
-/**
  * Length of the P2P success tracker.  Note that
  * having a very long list can also hurt performance.
  */

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-10-07 12:52:19 UTC (rev 13200)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-10-07 13:52:14 UTC (rev 13201)
@@ -45,6 +45,13 @@
 #define DEBUG_FS GNUNET_NO
 
 /**
+ * Should we introduce random latency in processing?  Required for proper
+ * implementation of GAP, but can be disabled for performance evaluation of
+ * the basic routing algorithm.
+ */
+#define SUPPORT_DELAYS GNUNET_NO
+
+/**
  * Maximum number of outgoing messages we queue per peer.
  */
 #define MAX_QUEUE_PER_PEER 16
@@ -140,6 +147,11 @@
   void *cont_cls;
 
   /**
+   * Do not transmit this pending message until this deadline.
+   */
+  struct GNUNET_TIME_Absolute delay_until;
+
+  /**
    * Size of the reply; actual reply message follows
    * at the end of this struct.
    */
@@ -226,6 +238,11 @@
   struct GNUNET_TIME_Absolute last_transmission_request_start;
 
   /**
+   * ID of delay task for scheduling transmission.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+
+  /**
    * Average priority of successful replies.  Calculated
    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
    */
@@ -976,7 +993,7 @@
     }
 
   /* consider scheduling transmission to cp for content migration */
-  if (cp->cth != NULL)
+  if (cp->cth != NULL)        
     return GNUNET_YES; 
   msize = 0;
   pos = mig_head;
@@ -1004,6 +1021,11 @@
              msize,
              GNUNET_h2s (key));
 #endif
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1336,12 +1358,12 @@
   TransmissionContinuation cont;
   void *cont_cls;
 
+  cont = pm->cont;
+  cont_cls = pm->cont_cls;
   if (pml != NULL)
     {
       GNUNET_assert (pml->pm == pm);
       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-      cont = pm->cont;
-      cont_cls = pm->cont_cls;
       destroy_pending_message_list_entry (pml);
     }
   else
@@ -1689,7 +1711,15 @@
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (NULL != (pm = cp->pending_messages_head))
     destroy_pending_message (pm, 0 /* delivery failed */);
   GNUNET_LOAD_value_free (cp->transmission_delay);
@@ -1894,6 +1924,39 @@
 
 
 /**
+ * We've had to delay a request for transmission to core, but now
+ * we should be ready.  Run it.
+ *
+ * @param cls the 'struct ConnectedPeer' for which a request was delayed
+ * @param tc task context (unused)
+ */
+static void
+delayed_transmission_request (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct ConnectedPeer *cp = cls;
+  struct GNUNET_PeerIdentity pid;
+  struct PendingMessage *pm;
+
+  pm = cp->pending_messages_head;
+  cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (cp->cth == NULL);
+  if (pm == NULL)
+    return;
+  GNUNET_PEER_resolve (cp->pid,
+                      &pid);
+  cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              pm->priority,
+                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                                              &pid,
+                                              pm->msize,
+                                              &transmit_to_peer,
+                                              cp);
+}
+
+
+/**
  * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
  * for writing in the meantime.  In that case, do nothing
@@ -1912,13 +1975,16 @@
 {
   struct ConnectedPeer *cp = cls;
   char *cbuf = buf;
-  struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct PendingMessage *next_pm;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative min_delay;
   struct MigrationReadyBlock *mb;
   struct MigrationReadyBlock *next;
   struct PutMessage migm;
   size_t msize;
   unsigned int i;
+  struct GNUNET_PeerIdentity pid;
  
   cp->cth = NULL;
   if (NULL == buf)
@@ -1930,33 +1996,48 @@
       GNUNET_LOAD_update (cp->transmission_delay,
                          UINT64_MAX);
       return 0;
-    }
+    }  
   GNUNET_LOAD_update (cp->transmission_delay,
                      GNUNET_TIME_absolute_get_duration 
(cp->last_transmission_request_start).value);
+  now = GNUNET_TIME_absolute_get ();
   msize = 0;
-  while ( (NULL != (pm = cp->pending_messages_head) ) &&
+  min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+  next_pm = cp->pending_messages_head;
+  while ( (NULL != (pm = next_pm) ) &&
          (pm->msize <= size) )
     {
+      next_pm = pm->next;
+      if (pm->delay_until.value > now.value)
+       {
+         min_delay = GNUNET_TIME_relative_min (min_delay,
+                                               
GNUNET_TIME_absolute_get_remaining (pm->delay_until));
+         continue;
+       }
       memcpy (&cbuf[msize], &pm[1], pm->msize);
       msize += pm->msize;
       size -= pm->msize;
+      GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                  cp->pending_messages_tail,
+                                  pm);
+      if (NULL == pm->pml)
+       cp->pending_requests--;
       destroy_pending_message (pm, cp->pid);
     }
-  if (NULL != pm)
-    {
+  if (pm != NULL)
+    min_delay = GNUNET_TIME_UNIT_ZERO;
+  if (NULL != cp->pending_messages_head)
+    {     
+      GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == 
cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task
+       = GNUNET_SCHEDULER_add_delayed (sched,
+                                       min_delay,
+                                       &delayed_transmission_request,
+                                       cp);
+    }
+  if (pm == NULL)
+    {      
       GNUNET_PEER_resolve (cp->pid,
                           &pid);
-      cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  pm->priority,
-                                                  
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
-                                                  &pid,
-                                                  pm->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
-    }
-  else
-    {      
       next = mig_head;
       while (NULL != (mb = next))
        {
@@ -1984,7 +2065,7 @@
                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                              "Pushing migration block `%s' (%u bytes) to 
`%s'\n",
                              GNUNET_h2s (&mb->query),
-                             mb->size,
+                             (unsigned int) mb->size,
                              GNUNET_i2s (&pid));
 #endif   
                  break;
@@ -1995,7 +2076,7 @@
                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                              "Migration block `%s' (%u bytes) is not on 
migration list for peer `%s'\n",
                              GNUNET_h2s (&mb->query),
-                             mb->size,
+                             (unsigned int) mb->size,
                              GNUNET_i2s (&pid));
 #endif   
                }
@@ -2013,9 +2094,9 @@
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to peer %u\n",
-             msize,
-             cp->pid);
+             "Transmitting %u bytes to peer with PID %u\n",
+             (unsigned int) msize,
+             (unsigned int) cp->pid);
 #endif
   return msize;
 }
@@ -2063,7 +2144,15 @@
     destroy_pending_message (cp->pending_messages_tail, 0);  
   GNUNET_PEER_resolve (cp->pid, &pid);
   if (NULL != cp->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+    {
+      GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+      cp->cth = NULL;
+    }
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   /* need to schedule transmission */
   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
@@ -3119,6 +3208,12 @@
       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+      reply->delay_until 
+       = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
+                                                                          
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                               
                     TTL_DECREMENT)));
+#endif
       reply->msize = msize;
       reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
@@ -3557,10 +3652,10 @@
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
-  process_reply (&prq, key, pr);
   if ( (old_rf == 0) &&
-       (pr->results_found == 1) )
+       (pr->results_found == 0) )
     update_datastore_delays (pr->start_time);
+  process_reply (&prq, key, pr);
   if (prq.finished == GNUNET_YES)
     return;
   if (pr->qe == NULL)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]