gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13200 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r13200 - gnunet/src/fs
Date: Thu, 7 Oct 2010 14:52:19 +0200

Author: grothoff
Date: 2010-10-07 14:52:19 +0200 (Thu, 07 Oct 2010)
New Revision: 13200

Modified:
   gnunet/src/fs/gnunet-service-fs.c
Log:
entry lifetime tracking

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2010-10-07 11:34:28 UTC (rev 13199)
+++ gnunet/src/fs/gnunet-service-fs.c   2010-10-07 12:52:19 UTC (rev 13200)
@@ -24,9 +24,6 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - track per-peer request latency (using new load API)
- * - consider more precise latency estimation (per-peer & request) -- again 
load API?
- * - implement test_load_too_high, make decision priority-based, implement 
forwarding, etc.
  * - introduce random latency in processing
  * - more statistics
  */
@@ -594,13 +591,18 @@
   /**
    * Remove this request after transmission of the current response.
    */
-  int16_t do_remove;
+  int8_t do_remove;
 
   /**
    * GNUNET_YES if we should not forward this request to other peers.
    */
-  int16_t local_only;
+  int8_t local_only;
 
+  /**
+   * GNUNET_YES if we should not forward this request to other peers.
+   */
+  int8_t forward_only;
+
 };
 
 
@@ -808,6 +810,10 @@
  */
 static struct GNUNET_LOAD_Value *datastore_put_load;
 
+/**
+ * How long do requests typically stay in the routing table?
+ */
+static struct GNUNET_LOAD_Value *rt_entry_lifetime;
 
 /**
  * We've just now completed a datastore request.  Update our
@@ -1378,12 +1384,14 @@
                                -1,
                                GNUNET_NO);
     }
-  /* might have already been removed from map in 'process_reply' (if
-     there was a unique reply) or never inserted if it was a
-     duplicate; hence ignore the return value here */
-  (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
-                                              &pr->query,
-                                              pr);
+  if (GNUNET_YES == 
+      GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                           &pr->query,
+                                           pr))
+    {
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration 
(pr->start_time).value);
+    }
   if (pr->qe != NULL)
      {
       GNUNET_DATASTORE_cancel (pr->qe);
@@ -1843,6 +1851,8 @@
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
   query_request_map = NULL;
+  GNUNET_LOAD_value_free (rt_entry_lifetime);
+  rt_entry_lifetime = NULL;
   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
   peer_request_map = NULL;
@@ -2078,19 +2088,72 @@
 
 
 /**
- * Test if the load on this peer is too high
+ * Test if the DATABASE (GET) load on this peer is too high
  * to even consider processing the query at
- * all.
+ * all.  
  * 
- * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to 
forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal)
+ *         GNUNET_SYSERR to process for free (load low)
  */
 static int
-test_load_too_high ()
+test_get_load_too_high (uint32_t priority)
 {
-  return GNUNET_SYSERR; // FIXME
+  double ld;
+
+  ld = GNUNET_LOAD_get_load (datastore_get_load);
+  if (ld < 1)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for free (low 
load)"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_SYSERR;
+    }
+  if (ld <= priority)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests done for a price 
(normal load)"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_NO;
+    }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# requests dropped due to high 
load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
 }
 
 
+
+
+/**
+ * Test if the DATABASE (PUT) load on this peer is too high
+ * to even consider processing the query at
+ * all.  
+ * 
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal or low)
+ */
+static int
+test_put_load_too_high (uint32_t priority)
+{
+  double ld;
+
+  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+    return GNUNET_NO; /* very fast */
+  ld = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (ld < 1) || (ld < priority) )
+    return GNUNET_NO;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# storage requests dropped due to 
high load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
+}
+
+
 /* ******************* Pending Request Refresh Task ******************** */
 
 
@@ -2329,8 +2392,6 @@
                                                     pr);
          return;  /* this target round failed */
        }
-      /* FIXME: if we are "quite" busy, we may still want to skip
-        this round; need more load detection code! */
       no_route = GNUNET_YES;
     }
   
@@ -2486,7 +2547,7 @@
                                        P2P_SUCCESS_LIST_SIZE);
       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
        if (cp->last_p2p_replies[i] == pr->cp->pid)
-         score += 1; /* likely successful based on hot path */
+         score += 1.0; /* likely successful based on hot path */
     }
   else
     {
@@ -2494,14 +2555,14 @@
                                        CS2P_SUCCESS_LIST_SIZE);
       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
        if (cp->last_client_replies[i] == 
pr->client_request_list->client_list->client)
-         score += 1; /* likely successful based on hot path */
+         score += 1.0; /* likely successful based on hot path */
     }
   /* 3b) include latency */
   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
-    score += 1; /* likely fast based on latency */
+    score += 1.0; /* likely fast based on latency */
   /* 3c) include priorities */
   if (cp->avg_priority <= pr->remaining_priority / 2.0)
-    score += 1; /* likely successful based on priorities */
+    score += 1.0; /* likely successful based on priorities */
   /* 3d) penalize for queue size */  
   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
   /* 3e) include peer proximity */
@@ -2616,6 +2677,7 @@
     return; /* configured to not do P2P search */
   /* (0) try DHT */
   if ( (0 == pr->anonymity_level) &&
+       (GNUNET_YES != pr->forward_only) &&
        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
     {
@@ -2677,18 +2739,28 @@
     }
 
   /* (3) reserve reply bandwidth */
-  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
-                                         &psc.target.hashPubKey);
-  GNUNET_assert (NULL != cp);
-  pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
-                                               &psc.target,
-                                               
GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init 
(UINT32_MAX),
-                                               DBLOCK_SIZE * 2, 
-                                               cp->inc_preference,
-                                               &target_reservation_cb,
-                                               pr);
-  cp->inc_preference = 0;
+  if (GNUNET_NO == pr->forward_only)
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &psc.target.hashPubKey);
+      GNUNET_assert (NULL != cp);
+      pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
+                                                   &psc.target,
+                                                   
GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                                                   GNUNET_BANDWIDTH_value_init 
(UINT32_MAX),
+                                                   DBLOCK_SIZE * 2, 
+                                                   cp->inc_preference,
+                                                   &target_reservation_cb,
+                                                   pr);
+      cp->inc_preference = 0;
+    }
+  else
+    {
+      /* force forwarding */
+      static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
+      target_reservation_cb (pr, &psc.target,
+                            zerobw, zerobw, 0, 0.0);
+    }
 }
 
 
@@ -2871,8 +2943,6 @@
                            GNUNET_NO);
   if (prq->sender != NULL)
     {
-      /* FIXME: should we be more precise here and not use
-        "start_time" but a peer-specific time stamp? */
       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
       prq->sender->avg_delay.value
        = (prq->sender->avg_delay.value * 
@@ -2936,6 +3006,8 @@
                    GNUNET_CONTAINER_multihashmap_remove (query_request_map,
                                                          key,
                                                          pr));
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration 
(pr->start_time).value);
       break;
     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
       GNUNET_STATISTICS_update (stats,
@@ -3216,8 +3288,9 @@
       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * 
prq.priority;
       prq.sender->trust += prq.priority;
     }
-  if (GNUNET_YES == active_migration)
-    {
+  if ( (GNUNET_YES == active_migration) &&
+       (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+    {      
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Replicating result for query `%s' with priority %u\n",
@@ -3498,7 +3571,7 @@
       return;
     }
   if ( (pr->client_request_list == NULL) &&
-       ( (GNUNET_YES == test_load_too_high()) ||
+       ( (GNUNET_YES == test_get_load_too_high (0)) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
 #if DEBUG_FS > 2
@@ -3524,7 +3597,7 @@
  * @param cp the peer making the request
  * @return effective priority
  */
-static uint32_t
+static int32_t
 bound_priority (uint32_t prio_in,
                struct ConnectedPeer *cp)
 {
@@ -3533,7 +3606,7 @@
   double rret;
   int ld;
 
-  ld = test_load_too_high ();
+  ld = test_get_load_too_high (0);
   if (ld == GNUNET_SYSERR)
     return 0; /* excess resources */
   ret = change_host_trust (cp, prio_in);
@@ -3546,6 +3619,18 @@
       current_priorities 
        = (current_priorities * (N-1) + rret)/N;
     }
+  if ( (ld == GNUNET_YES) && (ret > 0) )
+    {
+      /* try with charging */
+      ld = test_get_load_too_high (ret);
+    }
+  if (ld == GNUNET_YES)
+    {
+      /* undo charge */
+      if (ret != 0)
+       change_host_trust (cp, -ret);
+      return -1; /* not enough resources */
+    }
 #undef N
   return ret;
 }
@@ -3612,9 +3697,9 @@
   uint32_t bm;
   size_t bfsize;
   uint32_t ttl_decrement;
+  int32_t priority;
   enum GNUNET_BLOCK_Type type;
   int have_ns;
-  int ld;
 
   msize = ntohs(message->size);
   if (msize < sizeof (struct GetMessage))
@@ -3680,11 +3765,8 @@
   /* note that we can really only check load here since otherwise
      peers could find out that we are overloaded by not being
      disconnected after sending us a malformed query... */
-
-  /* FIXME: query priority should play
-     a major role here! */
-  ld = test_load_too_high ();
-  if (GNUNET_YES == ld)
+  priority = bound_priority (ntohl (gm->priority), cps);
+  if (priority < 0)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3697,9 +3779,6 @@
                                GNUNET_NO);
       return GNUNET_OK;
     }
-  /* FIXME: if ld == GNUNET_NO, forward
-     instead of indirecting! */
-
 #if DEBUG_FS 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received request for `%s' of type %u from peer `%4s' with flags 
%u\n",
@@ -3716,12 +3795,21 @@
       pr->namespace = (GNUNET_HashCode*) &pr[1];
       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
     }
+  if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
+       (GNUNET_LOAD_get_average (cp->transmission_delay) > 
+       GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average 
(rt_entry_lifetime)) )
+    {
+      /* don't have BW to send to peer, or would likely take longer than we 
have for it,
+        so at best indirect the query */
+      priority = 0;
+      pr->forward_only = GNUNET_YES;
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) 
&opt[bits++]);
   pr->anonymity_level = 1;
-  pr->priority = bound_priority (ntohl (gm->priority), cps);
+  pr->priority = (uint32_t) priority;
   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
   pr->query = gm->query;
   /* decrement ttl (always) */
@@ -3828,22 +3916,29 @@
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
-  pr->qe = GNUNET_DATASTORE_get (dsh,
-                                &gm->query,
-                                type,                         
-                                pr->priority + 1,
-                                MAX_DATASTORE_QUEUE,                           
 
-                                timeout,
-                                &process_local_reply,
-                                pr);
+  if (GNUNET_YES != pr->forward_only)
+    pr->qe = GNUNET_DATASTORE_get (dsh,
+                                  &gm->query,
+                                  type,                               
+                                  pr->priority + 1,
+                                  MAX_DATASTORE_QUEUE,                         
 
+                                  timeout,
+                                  &process_local_reply,
+                                  pr);
+  else
+    GNUNET_STATISTICS_update (stats,
+                             gettext_noop ("# requests forwarded due to high 
load"),
+                             1,
+                             GNUNET_NO);
 
-  /* Are multiple results possible?  If so, start processing remotely now! */
+  /* Are multiple results possible (and did we look locally)?  If so, start 
processing remotely now! */
   switch (pr->type)
     {
     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one result, wait for datastore */
-      break;
+      if (GNUNET_YES != pr->forward_only)
+       break;
     default:
       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
        pr->task = GNUNET_SCHEDULER_add_now (sched,
@@ -4085,6 +4180,7 @@
     }
   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
   query_request_map = GNUNET_CONTAINER_multihashmap_create 
(max_pending_requests);
+  rt_entry_lifetime = GNUNET_LOAD_value_init ();
   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN); 
   core = GNUNET_CORE_connect (sched,
@@ -4107,6 +4203,8 @@
       connected_peers = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
       query_request_map = NULL;
+      GNUNET_LOAD_value_free (rt_entry_lifetime);
+      rt_entry_lifetime = NULL;
       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
       requests_by_expiration_heap = NULL;
       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]