gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r34950 - gnunet/src/rps


From: gnunet
Subject: [GNUnet-SVN] r34950 - gnunet/src/rps
Date: Thu, 22 Jan 2015 01:18:44 +0100

Author: ch3
Date: 2015-01-22 01:18:44 +0100 (Thu, 22 Jan 2015)
New Revision: 34950

Modified:
   gnunet/src/rps/gnunet-service-rps.c
   gnunet/src/rps/gnunet-service-rps_sampler.c
   gnunet/src/rps/gnunet-service-rps_sampler.h
Log:
restructured service and sampler

Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-01-22 00:18:40 UTC (rev 34949)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-01-22 00:18:44 UTC (rev 34950)
@@ -70,23 +70,7 @@
  */
 static struct GNUNET_PeerIdentity *own_identity;
 
-/**
- * Closure to the callback cadet calls on each peer it passes to us
- */
-struct init_peer_cls
-{
-  /**
-   * The server handle to later listen to client requests
-   */
-  struct GNUNET_SERVER_Handle *server;
 
-  /**
-   * Counts how many peers cadet already passed to us
-   */
-  uint32_t i;
-};
-
-
   struct GNUNET_PeerIdentity *
 get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
 
@@ -122,7 +106,30 @@
   LIVING                = 0x10
 };
 
+
 /**
+ * Functions of this type can be used to be stored at a peer for later 
execution.
+ */
+typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
+
+/**
+ * Outstanding operation on peer consisting of callback and closure
+ */
+struct PeerOutstandingOp
+{
+  /**
+   * Callback
+   */
+  PeerOp op;
+
+  /**
+   * Closure
+   */
+  void *op_cls;
+};
+
+
+/**
  * Struct used to keep track of other peer's status
  *
  * This is stored in a multipeermap.
@@ -150,6 +157,17 @@
   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
 
   /**
+   * Array of outstanding operations on this peer.
+   */
+  struct PeerOutstandingOp *outstanding_ops;
+
+  /**
+   * Number of outstanding operations.
+   */
+  unsigned int num_outstanding_ops;
+  //size_t num_outstanding_ops;
+
+  /**
    * This is pobably followed by 'statistical' data (when we first saw
    * him, how did we get his ID, how many pushes (in a timeinterval),
    * ...)
@@ -310,6 +328,12 @@
 static struct GNUNET_TIME_Relative  request_rate;
 
 
+/**
+ * Number of history update tasks.
+ */
+uint32_t num_hist_update_tasks;
+
+
 /***********************************************************************
  * /Globals
 ***********************************************************************/
@@ -398,6 +422,8 @@
     ctx->mq = NULL;
     ctx->send_channel = NULL;
     ctx->recv_channel = NULL;
+    ctx->outstanding_ops = NULL;
+    ctx->num_outstanding_ops = 0;
     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
                                               
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
@@ -406,6 +432,22 @@
 
 
 /**
+ * Put random peer from sampler into the gossip list as history update.
+ */
+  void
+hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
+{
+  GNUNET_assert (1 == num_peers);
+
+  if (gossip_list_size < sampler_size_est_need)
+    GNUNET_array_append (gossip_list, gossip_list_size, *ids);
+
+  if (0 < num_hist_update_tasks)
+    num_hist_update_tasks--;
+}
+
+
+/**
  * Callback that is called when a channel was effectively established.
  * This is given to ntfy_tmt_rdy and called when the channel was
  * successfully established.
@@ -422,6 +464,15 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
 
+  if (0 != peer_ctx->num_outstanding_ops)
+  { /* Call outstanding operations */
+    unsigned int i;
+
+    for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+      peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, 
peer);
+    GNUNET_array_grow (peer_ctx->outstanding_ops, 
peer_ctx->num_outstanding_ops, 0);
+  }
+
   GNUNET_free (peer);
 
   buf = NULL;
@@ -437,7 +488,7 @@
              const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *ctx;
-  //struct GNUNET_PeerIdentity *tmp_peer;
+  struct GNUNET_PeerIdentity *tmp_peer;
 
   ctx = get_peer_ctx (peer_map, peer);
   if (NULL == ctx->send_channel)
@@ -446,11 +497,14 @@
                                                      GNUNET_RPS_CADET_PORT,
                                                      
GNUNET_CADET_OPTION_RELIABLE);
 
-    //tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
-    //*tmp_peer = *peer;
-    //(void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
-    //                                         GNUNET_TIME_UNIT_FOREVER_REL,
-    //                                         0, peer_is_live, tmp_peer);
+    if (NULL == ctx->recv_channel)
+    {
+      tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
+      *tmp_peer = *peer;
+      (void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 0, peer_is_live, tmp_peer);
+    }
 
     // do I have to explicitly put it in the peer_map?
     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
@@ -509,18 +563,93 @@
   struct GNUNET_TIME_Relative
 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t 
arr_size)
 {
-  return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), 
arr_size); // FIXME find a way to devide that by arr_size
+  return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), 
arr_size);
 }
 
 
-/***********************************************************************
- * /Util functions
-***********************************************************************/
+/**
+ * Insert PeerID in #pull_list
+ *
+ * Called once we know a peer is live.
+ */
+  void
+insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
+    GNUNET_array_append (pull_list, pull_list_size, *peer);
+}
 
 /**
- * Wrapper around _sampler_resize()
+ * Check whether #insert_in_pull_list was already scheduled
  */
+  int
+insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Insert PeerID in #gossip_list
+ *
+ * Called once we know a peer is live.
+ */
   void
+insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
+    GNUNET_array_append (gossip_list, gossip_list_size, *peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+  int
+insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Update sampler with given PeerID.
+ */
+  void
+insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  RPS_sampler_update_list (peer);
+}
+
+/**
+ * Check whether #insert_in_sampler was already scheduled
+ */
+  int
+insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+
+/**
+ * Wrapper around #RPS_sampler_resize()
+ */
+  void
 resize_wrapper ()
 {
   uint32_t bigger_size;
@@ -544,6 +673,10 @@
 }
 
 
+/***********************************************************************
+ * /Util functions
+***********************************************************************/
+
 /**
  * Function called by NSE.
  *
@@ -660,7 +793,7 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random 
peer(s).\n", num_peers);
 
-  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers);
+  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -806,6 +939,8 @@
 
   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
   struct GNUNET_PeerIdentity *peers;
+  struct PeerContext *peer_ctx;
+  struct PeerOutstandingOp out_op;
   uint32_t i;
 
   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
@@ -828,8 +963,19 @@
   peers = (struct GNUNET_PeerIdentity *) &msg[1];
   for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
   {
-    if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
-      GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+    peer_ctx = get_peer_ctx (peer_map, &peers[i]);
+
+    if ((0 != (peer_ctx->peer_flags && LIVING)) ||
+        NULL != peer_ctx->recv_channel)
+    {
+      if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
+        GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+    }
+    else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
+    {
+      out_op.op = insert_in_pull_list;
+      GNUNET_array_append (peer_ctx->outstanding_ops, 
peer_ctx->num_outstanding_ops, out_op);
+    }
   }
 
   // TODO check that id is valid - whether it is reachable
@@ -865,44 +1011,50 @@
 
   /* Send PUSHes */
   //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 
(unsigned int) gossip_list_size);
-  n_peers = round (alpha * gossip_list_size);
-  if (0 == n_peers)
-    n_peers = 1;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) 
peers.\n",
-      n_peers, alpha, gossip_list_size);
-  for ( i = 0 ; i < n_peers ; i++ )
+  if (0 != gossip_list_size)
   {
-    peer = get_rand_peer (gossip_list, gossip_list_size);
-    if (own_identity != peer)
-    { // FIXME if this fails schedule/loop this for later
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped 
list.\n", GNUNET_i2s (peer));
+    n_peers = round (alpha * gossip_list_size);
+    if (0 == n_peers)
+      n_peers = 1;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) 
peers.\n",
+        n_peers, alpha, gossip_list_size);
+    for ( i = 0 ; i < n_peers ; i++ )
+    {
+      peer = get_rand_peer (gossip_list, gossip_list_size);
+      if (own_identity != peer)
+      { // FIXME if this fails schedule/loop this for later
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped 
list.\n", GNUNET_i2s (peer));
 
-      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
-      // FIXME sometimes it returns a pointer to a freed mq
-      mq = get_mq (peer_map, peer);
-      GNUNET_MQ_send (mq, ev);
+        ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
+        // FIXME sometimes it returns a pointer to a freed mq
+        mq = get_mq (peer_map, peer);
+        GNUNET_MQ_send (mq, ev);
+      }
     }
   }
 
 
   /* Send PULL requests */
   //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, 
(unsigned int) sampler_list->size);
-  n_peers = round (beta * gossip_list_size);
-  if (0 == n_peers)
-    n_peers = 1;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
-      n_peers, beta, gossip_list_size);
-  for ( i = 0 ; i < n_peers ; i++ )
+  if (0 != gossip_list_size)
   {
-    peer = get_rand_peer (gossip_list, gossip_list_size);
-    if (own_identity != peer)
-    { // FIXME if this fails schedule/loop this for later
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of 
gossiped list.\n", GNUNET_i2s (peer));
+    n_peers = round (beta * gossip_list_size);
+    if (0 == n_peers)
+      n_peers = 1;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) 
peers.\n",
+        n_peers, beta, gossip_list_size);
+    for ( i = 0 ; i < n_peers ; i++ )
+    {
+      peer = get_rand_peer (gossip_list, gossip_list_size);
+      if (own_identity != peer)
+      { // FIXME if this fails schedule/loop this for later
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of 
gossiped list.\n", GNUNET_i2s (peer));
 
-      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
-      //pull_msg = NULL;
-      mq = get_mq (peer_map, peer);
-      GNUNET_MQ_send (mq, ev);
+        ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
+        //pull_msg = NULL;
+        mq = get_mq (peer_map, peer);
+        GNUNET_MQ_send (mq, ev);
+      }
     }
   }
 
@@ -914,14 +1066,16 @@
        push_list_size != 0 &&
        pull_list_size != 0 )
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n");
 
     uint32_t first_border;
     uint32_t second_border;
     
-    GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
+    first_border = round (alpha * sampler_size_est_need);
+    second_border = first_border + round (beta * sampler_size_est_need);
 
-    first_border = round (alpha * gossip_list_size);
+    GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
+
     for ( i = 0 ; i < first_border ; i++ )
     { // TODO use RPS_sampler_get_n_rand_peers
       /* Update gossip list with peers received through PUSHes */
@@ -931,7 +1085,6 @@
       // TODO change the peer_flags accordingly
     }
 
-    second_border = first_border + round (beta * gossip_list_size);
     for ( i = first_border ; i < second_border ; i++ )
     {
       /* Update gossip list with peers received through PULLs */
@@ -944,8 +1097,8 @@
     for ( i = second_border ; i < gossip_list_size ; i++ )
     {
       /* Update gossip list with peers from history */
-      peer = RPS_sampler_get_n_rand_peers_ (1);
-      gossip_list[i] = *peer;
+      RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+      num_hist_update_tasks++;
       // TODO change the peer_flags accordingly
     }
 
@@ -1058,37 +1211,38 @@
               unsigned int best_path) // "How long is the best path?
                                       // (0 = unknown, 1 = ourselves, 2 = 
neighbor)"
 {
-  struct init_peer_cls *ipc;
+  struct GNUNET_SERVER_Handle *server;
+  struct PeerOutstandingOp out_op;
+  struct PeerContext *peer_ctx;
 
-  ipc = (struct init_peer_cls *) cls;
+  server = (struct GNUNET_SERVER_Handle *) cls;
   if ( NULL != peer )
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n",
-        ipc->i, GNUNET_i2s (peer), peer, gossip_list_size);
-    RPS_sampler_update_list (peer);
-    (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB
+        "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
+        GNUNET_i2s (peer), peer, gossip_list_size);
 
-    if (ipc->i < gossip_list_size)
+    // maybe create a function for that
+    peer_ctx = get_peer_ctx (peer_map, peer);
+    if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
     {
-      gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid 
space here
-                                   // not sure whether fixed
-      ipc->i++;
+      out_op.op = insert_in_sampler;
+      GNUNET_array_append (peer_ctx->outstanding_ops, 
peer_ctx->num_outstanding_ops, out_op);
     }
 
+    if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
+    {
+      out_op.op = insert_in_gossip_list;
+      GNUNET_array_append (peer_ctx->outstanding_ops, 
peer_ctx->num_outstanding_ops, out_op);
+    }
+
+    /* Issue livelyness test on peer */
+    (void) get_channel (peer_map, peer);
+
     // send push/pull to each of those peers?
   }
   else
-  {
-    if (ipc->i < gossip_list_size)
-    {
-      memcpy (&gossip_list[ipc->i],
-          RPS_sampler_get_n_rand_peers_ (1),
-          (gossip_list_size - ipc->i) * sizeof (struct GNUNET_PeerIdentity));
-    }
-    rps_start (ipc->server);
-    GNUNET_free (ipc);
-  }
+    rps_start (server);
 }
 
 
@@ -1236,6 +1390,7 @@
   (void) peer_remove_cb (peer, peer, peer_ctx);
 }
 
+
 /**
  * Actually start the service.
  */
@@ -1256,6 +1411,8 @@
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
 
 
+  num_hist_update_tasks = 0;
+
   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
 
@@ -1283,7 +1440,6 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
 
-  struct init_peer_cls *ipc;
 
   cfg = c;
 
@@ -1316,16 +1472,13 @@
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", 
sampler_size_est_need);
 
-  //gossip_list_size = sampler_size; // TODO rename sampler_size
 
   gossip_list = NULL;
-  GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
 
 
   /* connect to NSE */
   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
   // TODO check whether that was successful
-  // TODO disconnect on shutdown
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
 
 
@@ -1383,7 +1536,7 @@
   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
   max_round_interval = GNUNET_TIME_relative_add (round_interval, 
half_round_interval);
 
-  RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval,
+  RPS_sampler_init (sampler_size_est_need, max_round_interval,
       insertCB, NULL, removeCB, NULL);
   sampler_size = sampler_size_est_need;
 
@@ -1394,11 +1547,8 @@
   pull_list_size = 0;
 
 
-  ipc = GNUNET_new (struct init_peer_cls);
-  ipc->server = server;
-  ipc->i = 0;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
-  GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
+  GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, server);
 
   // TODO send push/pull to each of those peers?
 }

Modified: gnunet/src/rps/gnunet-service-rps_sampler.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-22 00:18:40 UTC (rev 
34949)
+++ gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-22 00:18:44 UTC (rev 
34950)
@@ -151,7 +151,7 @@
 /**
  * Closure to _get_n_rand_peers_ready_cb()
  */
-struct RPS_GetNRandPeersReadyCls
+struct NRandPeersReadyCls
 {
   /**
    * Number of peers we are waiting for.
@@ -255,15 +255,15 @@
  * give those back.
  */
   void
-RPS_sampler_get_n_rand_peers_ready_cb (void *cls,
+check_n_peers_ready (void *cls,
     const struct GNUNET_PeerIdentity *id)
 {
-  struct RPS_GetNRandPeersReadyCls *n_peers_cls;
+  struct NRandPeersReadyCls *n_peers_cls;
 
-  n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls;
+  n_peers_cls = (struct NRandPeersReadyCls *) cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "SAMPLER: Got %" PRIX32 "th of %" PRIX32 " peers\n",
+      "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
 
   if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
@@ -297,12 +297,6 @@
 
   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
 
-  /* We might want to keep the previous peer */
-
-  //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
-  //                   sizeof(struct GNUNET_PeerIdentity),
-  //                   &sampler_el->peer_id_hash);
-
   sampler_el->birth = GNUNET_TIME_absolute_get ();
   sampler_el->num_peers = 0;
   sampler_el->num_change = 0;
@@ -479,7 +473,6 @@
  * Initialise a tuple of sampler elements.
  *
  * @param init_size the size the sampler is initialised with
- * @param id with which all newly created sampler elements are initialised
  * @param ins_cb the callback that will be called on every PeerID that is 
  *               newly inserted into a sampler element
  * @param ins_cls the closure given to #ins_cb
@@ -489,7 +482,6 @@
  */
   void
 RPS_sampler_init (size_t init_size,
-    const struct GNUNET_PeerIdentity *id,
     struct GNUNET_TIME_Relative max_round_interval,
     RPS_sampler_insert_cb ins_cb, void *ins_cls,
     RPS_sampler_remove_cb rem_cb, void *rem_cls)
@@ -513,7 +505,6 @@
   //sampler->sampler_elements = GNUNET_new_array(init_size, struct 
GNUNET_PeerIdentity);
   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 
min_size);
   RPS_sampler_resize (init_size);
-  RPS_sampler_update_list (id); // no super nice desing but ok for the moment
 
   client_get_index = 0;
 
@@ -568,13 +559,14 @@
  * corrsponding peer to the client.
  * Only used internally
  */
-  const struct GNUNET_PeerIdentity * 
-RPS_sampler_get_rand_peer_ ()
+  void
+RPS_sampler_get_rand_peer_ (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GetPeerCls *gpc;
   uint32_t r_index;
-  const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
+  struct GNUNET_HashCode *hash;
 
-  // TODO implement extra logic
+  gpc = (struct GetPeerCls *) cls;
 
   /**;
    * Choose the r_index of the peer we want to return
@@ -583,50 +575,25 @@
   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
       sampler->sampler_size);
 
-  //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
-  //  // TODO schedule for later
-  //  peer = NULL;
-  //else
-    peer = &(sampler->sampler_elements[r_index]->peer_id);
-  //sampler->sampler_elements[r_index]->last_client_request = 
GNUNET_TIME_absolute_get();
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", 
GNUNET_i2s(peer));
-
-  return peer;
-}
-
-
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- * Only used internally
- */
-  const struct GNUNET_PeerIdentity *
-RPS_sampler_get_n_rand_peers_ (uint32_t n)
-{
-  if ( 0 == sampler->sampler_size )
+  if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Sgrp: List empty - Returning NULL\n");
-    return NULL;
+    gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_TIME_relative_multiply(
+                                                                   
GNUNET_TIME_UNIT_SECONDS,
+                                                                   .1),
+                                                       
&RPS_sampler_get_rand_peer_,
+                                                       cls);
+    return;
   }
-  else
-  {
-    // TODO check if we have too much (distinct) sampled peers
-    // If we are not ready yet maybe schedule for later
-    struct GNUNET_PeerIdentity *peers;
-    uint32_t i;
 
-    peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+  *gpc->id = sampler->sampler_elements[r_index]->peer_id;
 
-    for ( i = 0 ; i < n ; i++ ) {
-      //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
-      memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct 
GNUNET_PeerIdentity));
-    }
-    return peers;
-  }
+  hash = GNUNET_new (struct GNUNET_HashCode);
+  GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct 
GNUNET_SCHEDULER_Task *), hash);
+  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, 
&gpc->get_peer_task))
+      LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the 
hashmap\n");
+  GNUNET_free (gpc->get_peer_task);
+
+  gpc->cb (gpc->cb_cls, gpc->id);
 }
 
 
@@ -639,28 +606,28 @@
  * @return a random PeerID of the PeerIDs previously put into the sampler.
  */
   void
-//RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
-//    void *cls, struct GNUNET_PeerIdentity *id)
 RPS_sampler_get_rand_peer (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GetPeerCls *gpc;
+  struct GNUNET_PeerIdentity tmp_id;
   struct RPS_SamplerElement *s_elem;
   struct GNUNET_TIME_Relative last_request_diff;
   struct GNUNET_HashCode *hash;
   uint32_t tmp_client_get_index;
-  //struct GNUNET_TIME_Relative inv_last_request_diff;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
 
   gpc = (struct GetPeerCls *) cls;
   hash = GNUNET_new (struct GNUNET_HashCode);
+
+  /* Store the next #client_get_index to check whether we cycled over the 
whole list */
   if (0 < client_get_index)
     tmp_client_get_index = client_get_index - 1;
   else
     tmp_client_get_index = sampler->sampler_size - 1;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler 
size: %" PRIX32 ".\n",
+      "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler 
size: %" PRIX32 ").\n",
       tmp_client_get_index, sampler->sampler_size);
 
   do
@@ -674,17 +641,22 @@
       return;
     }
 
-    *gpc->id = sampler->sampler_elements[client_get_index]->peer_id;
+    tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
+    RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
+    RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], 
&tmp_id,
+                           NULL, NULL, NULL, NULL);
 
-    RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
+    /* Cycle the #client_get_index one step further */
     if ( client_get_index == sampler->sampler_size - 1 )
       client_get_index = 0;
     else
       client_get_index++;
+
     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 
".\n", client_get_index);
   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
 
   s_elem = sampler->sampler_elements[client_get_index];
+  *gpc->id = s_elem->peer_id;
 
   /* Check whether we may use this sampler to give it back to the client */
   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != 
s_elem->last_client_request.abs_value_us)
@@ -729,54 +701,49 @@
  *
  * @param cb callback that will be called once the ids are ready.
  * @param cls closure given to @a cb
+ * @param for_client #GNUNET_YES if result is used for client,
+ *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
   void
 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
-    void *cls, uint32_t num_peers)
+    void *cls, uint32_t num_peers, int for_client)
 {
-  if ( 0 == sampler->sampler_size )
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Sgrp: List empty - Returning NULL\n");
-    cb (cls, NULL, 0);
-  }
-  else
-  {
-    // TODO check if we have too much (distinct) sampled peers
-    // If we are not ready yet maybe schedule for later
-    uint32_t i;
-    struct RPS_GetNRandPeersReadyCls *cb_cls;
-    struct GetPeerCls *gpc;
-    struct GNUNET_HashCode *hash;
-    
-    hash = GNUNET_new (struct GNUNET_HashCode);
+  GNUNET_assert (0 != sampler->sampler_size);
 
-    cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls);
-    cb_cls->num_peers = num_peers;
-    cb_cls->cur_num_peers = 0;
-    cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
-    cb_cls->callback = cb;
-    cb_cls->cls = cls;
+  // TODO check if we have too much (distinct) sampled peers
+  uint32_t i;
+  struct NRandPeersReadyCls *cb_cls;
+  struct GetPeerCls *gpc;
+  struct GNUNET_HashCode *hash;
 
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
+  hash = GNUNET_new (struct GNUNET_HashCode);
 
-    for ( i = 0 ; i < num_peers ; i++ )
-    {
-      gpc = GNUNET_new (struct GetPeerCls);
-      gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb;
-      gpc->cb_cls = cb_cls;
-      gpc->id = &cb_cls->ids[i];
+  cb_cls = GNUNET_new (struct NRandPeersReadyCls);
+  cb_cls->num_peers = num_peers;
+  cb_cls->cur_num_peers = 0;
+  cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
+  cb_cls->callback = cb;
+  cb_cls->cls = cls;
 
-      // maybe add a little delay
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
+
+  for ( i = 0 ; i < num_peers ; i++ )
+  {
+    gpc = GNUNET_new (struct GetPeerCls);
+    gpc->cb = check_n_peers_ready;
+    gpc->cb_cls = cb_cls;
+    gpc->id = &cb_cls->ids[i];
+
+    // maybe add a little delay
+    if (GNUNET_YES == for_client)
       gpc->get_peer_task = GNUNET_SCHEDULER_add_now 
(&RPS_sampler_get_rand_peer, gpc);
-      GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct 
GNUNET_SCHEDULER_Task *), hash);
-      (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, 
&gpc->get_peer_task,
-                                                
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-      //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
-      //    cb_cls, &peers[i]);
-    }
+    else if (GNUNET_NO == for_client)
+      gpc->get_peer_task = GNUNET_SCHEDULER_add_now 
(&RPS_sampler_get_rand_peer_, gpc);
+    GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct 
GNUNET_SCHEDULER_Task *), hash);
+    (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, 
&gpc->get_peer_task,
+        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   }
 }
 

Modified: gnunet/src/rps/gnunet-service-rps_sampler.h
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-22 00:18:40 UTC (rev 
34949)
+++ gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-22 00:18:44 UTC (rev 
34950)
@@ -89,7 +89,6 @@
  */
   void
 RPS_sampler_init (size_t init_size,
-    const struct GNUNET_PeerIdentity *id,
     struct GNUNET_TIME_Relative max_round_interval,
     RPS_sampler_insert_cb ins_cb, void *ins_cls,
     RPS_sampler_remove_cb rem_cb, void *rem_cls);
@@ -121,26 +120,16 @@
  * We might want to reinitialise this sampler after giving the
  * corrsponding peer to the client.
  * Random with or without consumption?
- * Only used internally
- */
-  const struct GNUNET_PeerIdentity *
-RPS_sampler_get_n_rand_peers_ (uint32_t n);
-
-
-/**
- * Get n random peers out of the sampled peers.
  *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- *
  * @param cb callback that will be called once the ids are ready.
  * @param cls closure given to @a cb
+ * @param for_client #GNUNET_YES if result is used for client,
+ *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
     void
 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
-    void *cls, uint32_t num_peers);
+    void *cls, uint32_t num_peers, int for_client);
 
 
 /**




reply via email to

[Prev in Thread] Current Thread [Next in Thread]