gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r9034 - in gnunet/src: fs include


From: gnunet
Subject: [GNUnet-SVN] r9034 - in gnunet/src: fs include
Date: Sun, 27 Sep 2009 17:06:56 -0600

Author: grothoff
Date: 2009-09-27 17:06:55 -0600 (Sun, 27 Sep 2009)
New Revision: 9034

Modified:
   gnunet/src/fs/fs.h
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/include/gnunet_container_lib.h
   gnunet/src/include/gnunet_core_service.h
Log:
more work on fs service

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2009-09-25 22:10:51 UTC (rev 9033)
+++ gnunet/src/fs/fs.h  2009-09-27 23:06:55 UTC (rev 9034)
@@ -75,6 +75,19 @@
 #define TTL_DECREMENT 5000
 
 /**
+ * Length of the P2P success tracker.  Note that
+ * having a very long list can also hurt performance.
+ */
+#define P2P_SUCCESS_LIST_SIZE 8
+
+
+/**
+ * Length of the CS-2-P success tracker.  Note that
+ * having a very long list can also hurt performance.
+ */
+#define CS2P_SUCCESS_LIST_SIZE 8
+
+/**
  * How long are we willing to wait for the datastore to be ready to
  * process a request for a query without priority?
  */

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2009-09-25 22:10:51 UTC (rev 9033)
+++ gnunet/src/fs/gnunet-service-fs.c   2009-09-27 23:06:55 UTC (rev 9034)
@@ -38,6 +38,7 @@
  * - check that we decrement PIDs always where necessary (can wait)
  */
 #include "platform.h"
+#include <values.h>
 #include "gnunet_core_service.h"
 #include "gnunet_datastore_service.h"
 #include "gnunet_peer_lib.h"
@@ -407,7 +408,10 @@
 
   /**
    * Pending transmission request with the core service for the target
-   * peer (for processing of 'replies_pending').
+   * peer (for processing of 'replies_pending') or Handle for a
+   * pending query-request for P2P-transmission with the core service.
+   * If non-NULL, this request must be cancelled should this struct be
+   * destroyed!
    */
   struct GNUNET_CORE_TransmitHandle *cth;
 
@@ -436,6 +440,12 @@
   GNUNET_HashCode query;
 
   /**
+   * The task responsible for transmitting queries
+   * for this request.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier task;
+
+  /**
    * (Interned) Peer identifier (only valid if "client" is NULL)
    * that identifies a peer that gave us this request.
    */
@@ -608,6 +618,69 @@
 
 
 /**
+ * Information about a peer that we are connected to.
+ * We track data that is useful for determining which
+ * peers should receive our requests.
+ */
+struct ConnectedPeer
+{
+
+  /**
+   * List of the last clients for which this peer
+   * successfully answered a query. 
+   */
+  struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
+
+  /**
+   * List of the last PIDs for which
+   * this peer successfully answered a query;
+   * We use 0 to indicate no successful reply.
+   */
+  GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
+
+  /**
+   * Average delay between sending the peer a request and
+   * getting a reply (only calculated over the requests for
+   * which we actually got a reply).   Calculated
+   * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+   */ 
+  struct GNUNET_TIME_Relative avg_delay;
+
+  /**
+   * Average priority of successful replies.  Calculated
+   * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
+   */
+  double avg_priority;
+
+  /**
+   * The peer's identity.
+   */
+  GNUNET_PEER_Id pid;  
+
+  /**
+   * Number of requests we have currently pending
+   * with this peer (that is, requests that were
+   * transmitted so recently that we would not retransmit
+   * them right now).
+   */
+  unsigned int pending_requests;
+
+  /**
+   * Which offset in "last_p2p_replies" will be updated next?
+   * (we go round-robin).
+   */
+  unsigned int last_p2p_replies_woff;
+
+  /**
+   * Which offset in "last_client_replies" will be updated next?
+   * (we go round-robin).
+   */
+  unsigned int last_client_replies_woff;
+
+};
+
+
+/**
  * Our connection to the datastore.
  */
 static struct GNUNET_DATASTORE_Handle *dsh;
@@ -686,6 +759,11 @@
 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
 
 /**
+ * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
+
+/**
  * Maximum number of requests (from other peers) that we're
  * willing to have pending at any given point in time.
  * FIXME: set from configuration (and 32 is a tiny value for testing only).
@@ -1457,6 +1535,279 @@
 
 
 /**
+ * Closure used for "target_peer_select_cb".
+ */
+struct PeerSelectionContext 
+{
+  /**
+   * The request for which we are selecting
+   * peers.
+   */
+  struct PendingRequest *pr;
+
+  /**
+   * Current "prime" target.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * How much do we like this target?
+   */
+  double target_score;
+
+};
+
+
+/**
+ * Function called for each connected peer to determine
+ * which one(s) would make good targets for forwarding.
+ *
+ * @param cls closure (struct PeerSelectionContext)
+ * @param key current key code (peer identity)
+ * @param value value in the hash map (struct ConnectedPeer)
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+target_peer_select_cb (void *cls,
+                      const GNUNET_HashCode * key,
+                      void *value)
+{
+  struct PeerSelectionContext *psc = cls;
+  // struct ConnectedPeer *cp = value;
+  double score;
+  // FIXME (CRITICAL: would  always sent to same peer without this!)
+  // 1) check if we have already (recently) forwarded to this peer, if so, skip
+  // 2) calculate how much we'd like to forward to this peer
+  score = 0;
+  
+  // 3) store in closure
+  if (score > psc->target_score)
+    {
+      psc->target_score = score;
+      psc->target.hashPubKey = *key; 
+    }
+  return GNUNET_YES;
+}
+
+
+
+
+/**
+ * We use a random delay to make the timing of requests
+ * less predictable.  This function returns such a random
+ * delay.
+ *
+ * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                       GNUNET_CRYPTO_random_u32 
(GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                 
TTL_DECREMENT));
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers.  The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We've selected a peer for forwarding of a query.
+ * Construct the message and then re-schedule the
+ * task to forward again to (other) peers.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request_cb (void *cls,
+                    size_t size, 
+                    void *buf)
+{
+  struct PendingRequest *pr = cls;
+  uint16_t msize;
+
+  pr->cth = NULL;
+  /* (1) check for timeout */
+  if (NULL == buf)
+    {
+      /* timeout, try another peer immediately again */
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              GNUNET_NO,
+                                              GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                              GNUNET_SCHEDULER_NO_TASK,
+                                              GNUNET_TIME_UNIT_ZERO,
+                                              &forward_request_task,
+                                              pr);
+      return 0;
+    }
+  /* (2) build query message */
+  msize = 0;
+  // CRITICAL-FIXME! (nothing goes without this!)
+  /* (3) schedule job to do it again (or another peer, etc.) */
+  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          GNUNET_NO,
+                                          GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                          GNUNET_SCHEDULER_NO_TASK,
+                                          get_processing_delay (), // FIXME!
+                                          &forward_request_task,
+                                          pr);
+
+  return msize;
+}
+
+
+/**
+ * Function called after we've tried to reserve
+ * a certain amount of bandwidth for a reply.
+ * Check if we succeeded and if so send our query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param latency current latency estimate, "FOREVER" if we have been
+ *                disconnected
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
+ */
+static void
+target_reservation_cb (void *cls,
+                      const struct
+                      GNUNET_PeerIdentity * peer,
+                      unsigned int bpm_in,
+                      unsigned int bpm_out,
+                      struct GNUNET_TIME_Relative
+                      latency, int amount,
+                      unsigned long long preference)
+{
+  struct PendingRequest *pr = cls;
+  uint32_t priority;
+  uint16_t size;
+  struct GNUNET_TIME_Relative maxdelay;
+
+  GNUNET_assert (peer != NULL);
+  if ( (amount != DBLOCK_SIZE) ||
+       (pr->cth != NULL) )
+    {
+      /* try again later; FIXME: we may need to un-reserve "amount"? */
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              GNUNET_NO,
+                                              GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                              GNUNET_SCHEDULER_NO_TASK,
+                                              get_processing_delay (), // 
FIXME: longer?
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  // (2) transmit, update ttl/priority
+  // FIXME: calculate priority, maxdelay, size properly!
+  priority = 0;
+  size = 60000;
+  maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+  pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              priority,
+                                              maxdelay,
+                                              peer,
+                                              size,
+                                              &transmit_request_cb,
+                                              pr);
+  if (pr->cth == NULL)
+    {
+      /* try again later */
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              GNUNET_NO,
+                                              GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                              GNUNET_SCHEDULER_NO_TASK,
+                                              get_processing_delay (), // 
FIXME: longer?
+                                              &forward_request_task,
+                                              pr);
+    }
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers.  The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PendingRequest *pr = cls;
+  struct PeerSelectionContext psc;
+
+  pr->task = GNUNET_SCHEDULER_NO_TASK;
+  if (pr->cth != NULL) 
+    {
+      /* we're busy transmitting a result, wait a bit */
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              GNUNET_NO,
+                                              GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                              GNUNET_SCHEDULER_NO_TASK,
+                                              get_processing_delay (), 
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  /* (1) select target */
+  psc.pr = pr;
+  psc.target_score = MINDOUBLE;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &target_peer_select_cb,
+                                        &psc);
+  if (psc.target_score == MINDOUBLE)
+    {
+      /* no possible target found, wait some time */
+      pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                              GNUNET_NO,
+                                              GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                              GNUNET_SCHEDULER_NO_TASK,
+                                              get_processing_delay (), // 
FIXME: exponential back-off? or at least wait longer...
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  /* (2) reserve reply bandwidth */
+  // FIXME: need a way to cancel; this
+  // async operation is problematic (segv-problematic)
+  // if "pr" is destroyed while it happens!
+  GNUNET_CORE_peer_configure (core,
+                             &psc.target,
+                             GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
+                             -1,
+                             DBLOCK_SIZE, // FIXME: make dependent on type?
+                             0,
+                             &target_reservation_cb,
+                             pr);
+}
+
+
+/**
  * We're processing (local) results for a search request
  * from a (local) client.  Pass applicable results to the
  * client and if we are done either clean up (operation
@@ -1547,8 +1898,13 @@
                                             &pr->query,
                                             pr,
                                             
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-         
-         // FIXME: trigger some processing NOW!
+         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                  GNUNET_NO,
+                                                  
GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                                  GNUNET_SCHEDULER_NO_TASK,
+                                                  get_processing_delay (),
+                                                  &forward_request_task,
+                                                  pr);
          local_get_context_free (lgc);
          return;
        }
@@ -1765,10 +2121,12 @@
                                   cl->tail,
                                   pr->crl_entry);
     }
+  if (GNUNET_SCHEDULER_NO_TASK != pr->task)
+    GNUNET_SCHEDULER_cancel (sched, pr->task);
+  if (NULL != pr->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
   if (NULL != pr->bf)
     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-  if (NULL != pr->cth)
-    GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
   if (NULL != pr->th)
     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
   while (NULL != (reply = pr->replies_pending))
@@ -1883,6 +2241,10 @@
   requests_by_peer = NULL;
   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
   requests_by_expiration = NULL;
+  // FIXME: iterate over entries and free individually?
+  // (or do we get disconnect notifications?)
+  GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+  connected_peers = NULL;
   GNUNET_CONTAINER_multihashmap_destroy (ifm);
   ifm = NULL;
   while (NULL != (pos = indexed_files))
@@ -1917,7 +2279,30 @@
 }
 
 
+
 /**
+ * Method called whenever a given peer connects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ */
+static void 
+peer_connect_handler (void *cls,
+                     const struct
+                     GNUNET_PeerIdentity * peer)
+{
+  struct ConnectedPeer *cp;
+
+  cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+  cp->pid = GNUNET_PEER_intern (peer);
+  GNUNET_CONTAINER_multihashmap_put (connected_peers,
+                                    &peer->hashPubKey,
+                                    cp,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+}
+
+
+/**
  * Method called whenever a peer disconnects.
  *
  * @param cls closure, not used
@@ -1928,6 +2313,13 @@
                         const struct
                         GNUNET_PeerIdentity * peer)
 {
+  struct ConnectedPeer *cp;
+
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &peer->hashPubKey);
+  GNUNET_PEER_change_rc (cp->pid, -1);
+  GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+  GNUNET_free (cp);
   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
                                              &peer->hashPubKey,
                                              &destroy_request,
@@ -1993,11 +2385,15 @@
                                            eer);
       destroy_pending_request (eer);     
     }
-  // FIXME: trigger actual forwarding NOW!
+  pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          GNUNET_NO,
+                                          GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                          GNUNET_SCHEDULER_NO_TASK,
+                                          get_processing_delay (),
+                                          &forward_request_task,
+                                          pr);
   GNUNET_free (pgc); 
 }
-
-
 /**
  * Transmit the given message by copying it to
  * the target buffer "buf".  "buf" will be
@@ -2831,7 +3227,7 @@
                       GNUNET_TIME_UNIT_FOREVER_REL,
                       NULL,
                       &core_start_cb,
-                      NULL,
+                      &peer_connect_handler,
                       &peer_disconnect_handler,
                       NULL, 
                       NULL, GNUNET_NO,
@@ -2860,6 +3256,7 @@
   ifm = GNUNET_CONTAINER_multihashmap_create (128);
   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: 
get size from config
   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get 
size from config
+  connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
   requests_by_expiration = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN); 
   read_index_list ();
   dsh = GNUNET_DATASTORE_connect (cfg,

Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h   2009-09-25 22:10:51 UTC (rev 
9033)
+++ gnunet/src/include/gnunet_container_lib.h   2009-09-27 23:06:55 UTC (rev 
9034)
@@ -472,6 +472,7 @@
   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
 };
 
+
 /**
  * Iterator over HashCodes.
  *

Modified: gnunet/src/include/gnunet_core_service.h
===================================================================
--- gnunet/src/include/gnunet_core_service.h    2009-09-25 22:10:51 UTC (rev 
9033)
+++ gnunet/src/include/gnunet_core_service.h    2009-09-27 23:06:55 UTC (rev 
9034)
@@ -197,6 +197,7 @@
 /**
  * Function called with statistics about the given peer.
  *
+ * @param cls closure
  * @param peer identifies the peer
  * @param latency current latency estimate, "FOREVER" if we have been
  *                disconnected
@@ -239,6 +240,7 @@
  * @param info function to call with the resulting configuration information
  * @param info_cls closure for info
  */
+// FIXME: should return handle for cancellation!
 void
 GNUNET_CORE_peer_configure (struct GNUNET_CORE_Handle *handle,
                             const struct GNUNET_PeerIdentity *peer,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]