gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36161 - in gnunet/src: include rps


From: gnunet
Subject: [GNUnet-SVN] r36161 - in gnunet/src: include rps
Date: Sun, 2 Aug 2015 16:48:28 +0200

Author: ch3
Date: 2015-08-02 16:48:28 +0200 (Sun, 02 Aug 2015)
New Revision: 36161

Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_rps_service.h
   gnunet/src/rps/Makefile.am
   gnunet/src/rps/gnunet-service-rps.c
   gnunet/src/rps/gnunet-service-rps_sampler.c
   gnunet/src/rps/gnunet-service-rps_sampler.h
   gnunet/src/rps/rps.h
   gnunet/src/rps/rps_api.c
   gnunet/src/rps/test_rps.c
Log:
cancellation of request and according test improvements

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2015-07-31 13:53:26 UTC (rev 
36160)
+++ gnunet/src/include/gnunet_protocols.h       2015-08-02 14:48:28 UTC (rev 
36161)
@@ -2706,17 +2706,17 @@
 /**
  * RPS PUSH message to push own ID to another peer
  */
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH         950
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH           950
 
 /**
  * RPS PULL REQUEST message to request the local view of another peer
  */
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST 951
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST   951
 
 /**
  * RPS PULL REPLY message which contains the view of the other peer
  */
-#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY   952
+#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY     952
 
 
 
@@ -2725,23 +2725,28 @@
 /**
  * RPS CS REQUEST Message for the Client to request (a) random peer(s)
  */
-#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST      953
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST        953
 
 /**
  * RPS CS REPLY Message for the Server to send (a) random peer(s)
  */
-#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY        954
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY          954
 
 /**
+ * RPS CS REQUEST CANCEL Message for the Client to cancel a request
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 955
+
+/**
  * RPS CS SEED Message for the Client to seed peers into rps
  */
-#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED         955
+#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED           956
 
 #ifdef ENABLE_MALICIOUS
 /**
  * Turn RPS service malicious
  */
-#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS   956
+#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS     957
 
 #endif /* ENABLE_MALICIOUS */
 

Modified: gnunet/src/include/gnunet_rps_service.h
===================================================================
--- gnunet/src/include/gnunet_rps_service.h     2015-07-31 13:53:26 UTC (rev 
36160)
+++ gnunet/src/include/gnunet_rps_service.h     2015-08-02 14:48:28 UTC (rev 
36161)
@@ -56,7 +56,9 @@
  * @param num_peers the number of peers returned
  * @param peers array with num_peers PeerIDs
  */
-typedef void (* GNUNET_RPS_NotifyReadyCB) (void *cls, uint64_t num_peers, 
const struct GNUNET_PeerIdentity *peers);
+typedef void (* GNUNET_RPS_NotifyReadyCB) (void *cls,
+    uint64_t num_peers,
+    const struct GNUNET_PeerIdentity *peers);
 
 /**
  * Connect to the rps service
@@ -125,7 +127,8 @@
 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
                           uint32_t type,
                           uint32_t num_peers,
-                          const struct GNUNET_PeerIdentity *ids);
+                          const struct GNUNET_PeerIdentity *ids,
+                          const struct GNUNET_PeerIdentity *target_peer);
 #endif /* ENABLE_MALICIOUS */
 
 

Modified: gnunet/src/rps/Makefile.am
===================================================================
--- gnunet/src/rps/Makefile.am  2015-07-31 13:53:26 UTC (rev 36160)
+++ gnunet/src/rps/Makefile.am  2015-08-02 14:48:28 UTC (rev 36161)
@@ -72,7 +72,8 @@
  test_rps_malicious_2 \
  test_rps_malicious_3 \
  test_rps_seed_request \
- test_rps_single_req
+ test_rps_single_req \
+ test_rps_req_cancel
 endif
 
 ld_rps_test_lib = \
@@ -106,6 +107,9 @@
 test_rps_seed_request_SOURCES = $(rps_test_src)
 test_rps_seed_request_LDADD = $(ld_rps_test_lib)
 
+test_rps_req_cancel_SOURCES = $(rps_test_src)
+test_rps_req_cancel_LDADD = $(ld_rps_test_lib)
+
 gnunet_rps_profiler_SOURCES = $(rps_test_src)
 gnunet_rps_profiler_LDADD = $(ld_rps_test_lib)
 

Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-07-31 13:53:26 UTC (rev 36160)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-08-02 14:48:28 UTC (rev 36161)
@@ -75,22 +75,73 @@
 ***********************************************************************/
 
 /**
+ * Closure used to pass the client and the id to the callback
+ * that replies to a client's request
+ */
+struct ReplyCls
+{
+  /**
+   * DLL
+   */
+  struct ReplyCls *next;
+  struct ReplyCls *prev;
+
+  /**
+   * The identifier of the request
+   */
+  uint32_t id;
+
+  /**
+   * The handle to the request
+   */
+  struct RPS_SamplerRequestHandle *req_handle;
+
+  /**
+   * The client handle to send the reply to
+   */
+  struct GNUNET_SERVER_Client *client;
+};
+
+
+/**
  * Struct used to store the context of a connected client.
  */
 struct ClientContext
 {
   /**
+   * DLL
+   */
+  struct ClientContext *next;
+  struct ClientContext *prev;
+
+  /**
    * The message queue to communicate with the client.
    */
   struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * DLL with handles to single requests from the client
+   */
+  struct ReplyCls *rep_cls_head;
+  struct ReplyCls *rep_cls_tail;
 };
 
 /**
+ * DLL with all clients currently connected to us
+ */
+struct ClientContext *cli_ctx_head;
+struct ClientContext *cli_ctx_tail;
+
+/**
  * Used to keep track in what lists single peerIDs are.
  */
 enum PeerFlags
 {
+  /**
+   * If we are waiting for a reply from that peer (sent a pull request).
+   */
   PULL_REPLY_PENDING   = 0x01,
+
   IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
   IN_OWN_SAMPLER_LIST  = 0x04, // unneeded?
   IN_OWN_GOSSIP_LIST   = 0x08, // unneeded?
@@ -365,24 +416,6 @@
 uint32_t num_hist_update_tasks;
 
 
-/**
- * Closure used to pass the client and the id to the callback
- * that replies to a client's request
- */
-struct ReplyCls
-{
-  /**
-   * The identifier of the request
-   */
-  uint32_t id;
-
-  /**
-   * The client handle to send the reply to
-   */
-  struct GNUNET_SERVER_Client *client;
-};
-
-
 #ifdef ENABLE_MALICIOUS
 /**
  * Type of malicious peer
@@ -1234,8 +1267,36 @@
  * /Util functions
 ***********************************************************************/
 
+static void
+destroy_reply_cls (struct ReplyCls *rep_cls)
+{
+  struct ClientContext *cli_ctx;
 
+  cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client,
+                                                   struct ClientContext);
+  GNUNET_assert (NULL != cli_ctx);
+  GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
+                               cli_ctx->rep_cls_tail,
+                               rep_cls);
+  GNUNET_free (rep_cls);
+}
 
+static void
+destroy_cli_ctx (struct ClientContext *cli_ctx)
+{
+  GNUNET_assert (NULL != cli_ctx);
+  if (NULL != cli_ctx->rep_cls_head)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Trying to destroy the context of a client that still has pending 
requests. Going to clean those\n");
+    while (NULL != cli_ctx->rep_cls_head)
+      destroy_reply_cls (cli_ctx->rep_cls_head);
+  }
+  GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
+                               cli_ctx_tail,
+                               cli_ctx);
+  GNUNET_free (cli_ctx);
+}
 
 
 /**
@@ -1316,15 +1377,10 @@
           num_peers * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_free (peer_ids);
 
-  cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client, struct 
ClientContext);
-  if (NULL == cli_ctx) {
-    cli_ctx = GNUNET_new (struct ClientContext);
-    cli_ctx->mq = GNUNET_MQ_queue_for_server_client (reply_cls->client);
-    GNUNET_SERVER_client_set_user_context (reply_cls->client, cli_ctx);
-  }
-
-  GNUNET_free (reply_cls);
-
+  cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client,
+                                                   struct ClientContext);
+  GNUNET_assert (NULL != cli_ctx);
+  destroy_reply_cls (reply_cls);
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -1346,6 +1402,7 @@
   uint32_t size_needed;
   struct ReplyCls *reply_cls;
   uint32_t i;
+  struct ClientContext *cli_ctx;
 
   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
 
@@ -1371,12 +1428,50 @@
   reply_cls = GNUNET_new (struct ReplyCls);
   reply_cls->id = ntohl (msg->id);
   reply_cls->client = client;
+  reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
+                                                        client_respond,
+                                                        reply_cls,
+                                                        num_peers);
 
-  RPS_sampler_get_n_rand_peers (client_sampler,
-                                client_respond,
-                                reply_cls,
-                                num_peers);
+  cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct 
ClientContext);
+  GNUNET_assert (NULL != cli_ctx);
+  GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
+                               cli_ctx->rep_cls_tail,
+                               reply_cls);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
+}
 
+
+/**
+ * @brief Handle a message that requests the cancellation of a request
+ *
+ * @param cls unused
+ * @param client the client that requests the cancellation
+ * @param message the message containing the id of the request
+ */
+static void
+handle_client_request_cancel (void *cls,
+                              struct GNUNET_SERVER_Client *client,
+                              const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_RPS_CS_RequestCancelMessage *msg =
+    (struct GNUNET_RPS_CS_RequestCancelMessage *) message;
+  struct ClientContext *cli_ctx;
+  struct ReplyCls *rep_cls;
+
+  cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct 
ClientContext);
+  GNUNET_assert (NULL != cli_ctx->rep_cls_head);
+  rep_cls = cli_ctx->rep_cls_head;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "Client cancels request with id %lu\n",
+      ntohl (msg->id));
+  while ( (NULL != rep_cls->next) &&
+          (rep_cls->id != ntohl (msg->id)) )
+    rep_cls = rep_cls->next;
+  GNUNET_assert (rep_cls->id == ntohl (msg->id));
+  RPS_sampler_request_cancel (rep_cls->req_handle);
+  destroy_reply_cls (rep_cls);
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
 }
@@ -2584,6 +2679,30 @@
 
 
 /**
+ * @brief Get informed about a connecting client.
+ *
+ * @param cls unused
+ * @param client the client that connects
+ */
+static void
+handle_client_connect (void *cls,
+                       struct GNUNET_SERVER_Client *client)
+{
+  struct ClientContext *cli_ctx;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client connected\n");
+  if (NULL == client)
+    return; /* Server was destroyed before a client connected. Shutting down */
+  cli_ctx = GNUNET_new (struct ClientContext);
+  cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
+  GNUNET_SERVER_client_set_user_context (client, cli_ctx);
+  GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
+                               cli_ctx_tail,
+                               cli_ctx);
+}
+
+/**
  * A client disconnected.  Remove all of its data structure entries.
  *
  * @param cls closure, NULL
@@ -2591,8 +2710,20 @@
  */
 static void
 handle_client_disconnect (void *cls,
-                         struct GNUNET_SERVER_Client * client)
+                                           struct GNUNET_SERVER_Client *client)
 {
+  struct ClientContext *cli_ctx;
+
+  if (NULL == client)
+  {/* shutdown task */
+    while (NULL != cli_ctx_head)
+      destroy_cli_ctx (cli_ctx_head);
+  }
+  else
+  {
+    cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct 
ClientContext);
+    destroy_cli_ctx (cli_ctx);
+  }
 }
 
 
@@ -2716,16 +2847,21 @@
 rps_start (struct GNUNET_SERVER_Handle *server)
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    {&handle_client_request,     NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
+    {&handle_client_request,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
-    {&handle_client_seed,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
+    {&handle_client_request_cancel, NULL, 
GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
+      sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)},
+    {&handle_client_seed,           NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
     #ifdef ENABLE_MALICIOUS
-    {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 
, 0},
+    {&handle_client_act_malicious,  NULL, 
GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
     #endif /* ENABLE_MALICIOUS */
     {NULL, NULL, 0, 0}
   };
 
   GNUNET_SERVER_add_handlers (server, handlers);
+  GNUNET_SERVER_connect_notify (server,
+                                &handle_client_connect,
+                                NULL);
   GNUNET_SERVER_disconnect_notify (server,
                                    &handle_client_disconnect,
                                    NULL);

Modified: gnunet/src/rps/gnunet-service-rps_sampler.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.c 2015-07-31 13:53:26 UTC (rev 
36160)
+++ gnunet/src/rps/gnunet-service-rps_sampler.c 2015-08-02 14:48:28 UTC (rev 
36161)
@@ -73,16 +73,12 @@
    * DLL
    */
   struct GetPeerCls *next;
-
-  /**
-   * DLL
-   */
   struct GetPeerCls *prev;
 
   /**
-   * The sampler this function operates on.
+   * The #RPS_SamplerRequestHandle this single request belongs to.
    */
-  struct RPS_Sampler *sampler;
+  struct RPS_SamplerRequestHandle *req_handle;
 
   /**
    * The task for this function.
@@ -166,15 +162,11 @@
   RPS_get_peers_type get_peers;
 
   /**
-   * Head for the DLL to store the closures to pending requests.
+   * Head and tail for the DLL to store the #RPS_SamplerRequestHandle
    */
-  struct GetPeerCls *gpc_head;
+  struct RPS_SamplerRequestHandle *req_handle_head;
+  struct RPS_SamplerRequestHandle *req_handle_tail;
 
-  /**
-   * Tail for the DLL to store the closures to pending requests.
-   */
-  struct GetPeerCls *gpc_tail;
-
   #ifdef TO_FILE
   /**
    * File name to log to
@@ -186,9 +178,15 @@
 /**
  * Closure to _get_n_rand_peers_ready_cb()
  */
-struct NRandPeersReadyCls
+struct RPS_SamplerRequestHandle
 {
   /**
+   * DLL
+   */
+  struct RPS_SamplerRequestHandle *next;
+  struct RPS_SamplerRequestHandle *prev;
+
+  /**
    * Number of peers we are waiting for.
    */
   uint32_t num_peers;
@@ -204,6 +202,17 @@
   struct GNUNET_PeerIdentity *ids;
 
   /**
+   * Head and tail for the DLL to store the tasks for single requests
+   */
+  struct GetPeerCls *gpc_head;
+  struct GetPeerCls *gpc_tail;
+
+  /**
+   * Sampler.
+   */
+  struct RPS_Sampler *sampler;
+
+  /**
    * Callback to be called when all ids are available.
    */
   RPS_sampler_n_rand_peers_ready_cb callback;
@@ -251,23 +260,23 @@
 check_n_peers_ready (void *cls,
     const struct GNUNET_PeerIdentity *id)
 {
-  struct NRandPeersReadyCls *n_peers_cls = cls;
+  struct RPS_SamplerRequestHandle *req_handle = cls;
 
-  n_peers_cls->cur_num_peers++;
+  req_handle->cur_num_peers++;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
-      n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
+      req_handle->cur_num_peers, req_handle->num_peers);
 
-  if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
+  if (req_handle->num_peers == req_handle->cur_num_peers)
   { /* All peers are ready -- return those to the client */
-    GNUNET_assert (NULL != n_peers_cls->callback);
+    GNUNET_assert (NULL != req_handle->callback);
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "returning %" PRIX32 " peers to the client\n",
-        n_peers_cls->num_peers);
-    n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, 
n_peers_cls->num_peers);
+        req_handle->num_peers);
+    req_handle->callback (req_handle->cls, req_handle->ids, 
req_handle->num_peers);
 
-    GNUNET_free (n_peers_cls);
+    RPS_sampler_request_cancel (req_handle);
   }
 }
 
@@ -420,12 +429,8 @@
        sampler->file_name);
   #endif /* TO_FILE */
 
-  sampler->sampler_size = 0;
-  sampler->sampler_elements = NULL;
   sampler->max_round_interval = max_round_interval;
   sampler->get_peers = sampler_get_rand_peer;
-  sampler->gpc_head = NULL;
-  sampler->gpc_tail = NULL;
   //sampler->sampler_elements = GNUNET_new_array(init_size, struct 
GNUNET_PeerIdentity);
   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 
min_size);
   RPS_sampler_resize (sampler, init_size);
@@ -530,10 +535,12 @@
 {
   struct GetPeerCls *gpc = cls;
   uint32_t r_index;
+  struct RPS_Sampler *sampler;
 
   gpc->get_peer_task = NULL;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
+  sampler = gpc->req_handle->sampler;
 
   /**;
    * Choose the r_index of the peer we want to return
@@ -540,9 +547,9 @@
    * at random from the interval of the gossip list
    */
   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
-      gpc->sampler->sampler_size);
+      sampler->sampler_size);
 
-  if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty)
+  if (EMPTY == sampler->sampler_elements[r_index]->is_empty)
   {
     //LOG (GNUNET_ERROR_TYPE_DEBUG,
     //     "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
@@ -552,21 +559,19 @@
      * Counter?
      */
     gpc->get_peer_task =
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
-                                        GNUNET_TIME_UNIT_SECONDS, 0.1),
-                                    &sampler_get_rand_peer,
-                                    cls);
+      GNUNET_SCHEDULER_add_delayed (
+          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
+          &sampler_get_rand_peer,
+          cls);
     return;
   }
 
-  *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id;
-
+  GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
+                               gpc->req_handle->gpc_tail,
+                               gpc);
+  *gpc->id = sampler->sampler_elements[r_index]->peer_id;
   gpc->cont (gpc->cont_cls, gpc->id);
 
-  GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
-                               gpc->sampler->gpc_tail,
-                               gpc);
-
   GNUNET_free (gpc);
 }
 
@@ -584,17 +589,19 @@
   struct GetPeerCls *gpc = cls;
   struct RPS_SamplerElement *s_elem;
   struct GNUNET_TIME_Relative last_request_diff;
+  struct RPS_Sampler *sampler;
 
   gpc->get_peer_task = NULL;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
+  sampler = gpc->req_handle->sampler;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
 
   /* Cycle the #client_get_index one step further */
-  client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size;
+  client_get_index = (client_get_index + 1) % sampler->sampler_size;
 
-  s_elem = gpc->sampler->sampler_elements[client_get_index];
+  s_elem = sampler->sampler_elements[client_get_index];
   *gpc->id = s_elem->peer_id;
   GNUNET_assert (NULL != s_elem);
 
@@ -603,7 +610,7 @@
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, 
rescheduling.\n");
     GNUNET_assert (NULL == gpc->get_peer_task);
     gpc->get_peer_task =
-      GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
+      GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
                                     &sampler_mod_get_rand_peer,
                                     cls);
     return;
@@ -617,7 +624,7 @@
                                            GNUNET_TIME_absolute_get ());
     /* We're not going to give it back now if it was
      * already requested by a client this round */
-    if (last_request_diff.rel_value_us < 
gpc->sampler->max_round_interval.rel_value_us)
+    if (last_request_diff.rel_value_us < 
sampler->max_round_interval.rel_value_us)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
           "Last client request on this sampler was less than max round 
interval ago -- scheduling for later\n");
@@ -629,7 +636,7 @@
       /* Schedule it one round later */
       GNUNET_assert (NULL == gpc->get_peer_task);
       gpc->get_peer_task =
-        GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
+        GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
                                       &sampler_mod_get_rand_peer,
                                       cls);
       return;
@@ -639,8 +646,8 @@
 
   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
 
-  GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
-                               gpc->sampler->gpc_tail,
+  GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
+                               gpc->req_handle->gpc_tail,
                                gpc);
   gpc->cont (gpc->cont_cls, gpc->id);
   GNUNET_free (gpc);
@@ -661,7 +668,7 @@
  *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
-  void
+struct RPS_SamplerRequestHandle *
 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
                               RPS_sampler_n_rand_peers_ready_cb cb,
                               void *cls, uint32_t num_peers)
@@ -668,19 +675,23 @@
 {
   GNUNET_assert (0 != sampler->sampler_size);
   if (0 == num_peers)
-    return;
+    return NULL;
 
   // TODO check if we have too much (distinct) sampled peers
   uint32_t i;
-  struct NRandPeersReadyCls *cb_cls;
+  struct RPS_SamplerRequestHandle *req_handle;
   struct GetPeerCls *gpc;
 
-  cb_cls = GNUNET_new (struct NRandPeersReadyCls);
-  cb_cls->num_peers = num_peers;
-  cb_cls->cur_num_peers = 0;
-  cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
-  cb_cls->callback = cb;
-  cb_cls->cls = cls;
+  req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
+  req_handle->num_peers = num_peers;
+  req_handle->cur_num_peers = 0;
+  req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
+  req_handle->sampler = sampler;
+  req_handle->callback = cb;
+  req_handle->cls = cls;
+  GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
+                               sampler->req_handle_tail,
+                               req_handle);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Scheduling requests for %" PRIu32 " peers\n", num_peers);
@@ -688,18 +699,43 @@
   for (i = 0 ; i < num_peers ; i++)
   {
     gpc = GNUNET_new (struct GetPeerCls);
-    gpc->sampler = sampler;
+    gpc->req_handle = req_handle;
     gpc->cont = check_n_peers_ready;
-    gpc->cont_cls = cb_cls;
-    gpc->id = &cb_cls->ids[i];
+    gpc->cont_cls = req_handle;
+    gpc->id = &req_handle->ids[i];
 
+    GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
+                                 req_handle->gpc_tail,
+                                 gpc);
     // maybe add a little delay
     gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc);
+  }
+  return req_handle;
+}
 
-    GNUNET_CONTAINER_DLL_insert (sampler->gpc_head,
-                                 sampler->gpc_tail,
-                                 gpc);
+/**
+ * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
+ *
+ * @param req_handle the handle to the request
+ */
+void
+RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
+{
+  struct GetPeerCls *i;
+
+  while (NULL != (i = req_handle->gpc_head) )
+  {
+    GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
+                                 req_handle->gpc_tail,
+                                 i);
+    if (NULL != i->get_peer_task)
+      GNUNET_SCHEDULER_cancel (i->get_peer_task);
+    GNUNET_free (i);
   }
+  GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
+                               req_handle->sampler->req_handle_tail,
+                               req_handle);
+  GNUNET_free (req_handle);
 }
 
 
@@ -735,17 +771,13 @@
   void
 RPS_sampler_destroy (struct RPS_Sampler *sampler)
 {
-  struct GetPeerCls *i;
-
-  for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head)
+  if (NULL != sampler->req_handle_head)
   {
-    GNUNET_CONTAINER_DLL_remove (sampler->gpc_head,
-                                 sampler->gpc_tail,
-                                 i);
-    GNUNET_SCHEDULER_cancel (i->get_peer_task);
-    GNUNET_free (i);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+        "There are still pending requests. Going to remove them.\n");
+    while (NULL != sampler->req_handle_head)
+      RPS_sampler_request_cancel (sampler->req_handle_head);
   }
-
   sampler_empty (sampler);
   GNUNET_free (sampler);
 }

Modified: gnunet/src/rps/gnunet-service-rps_sampler.h
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.h 2015-07-31 13:53:26 UTC (rev 
36160)
+++ gnunet/src/rps/gnunet-service-rps_sampler.h 2015-08-02 14:48:28 UTC (rev 
36161)
@@ -34,7 +34,12 @@
  */
 struct RPS_Sampler;
 
+/**
+ * A handle to cancel a request.
+ */
+struct RPS_SamplerRequestHandle;
 
+
 /**
  * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready.
  *
@@ -130,12 +135,20 @@
  *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
-    void
+struct RPS_SamplerRequestHandle *
 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
                               RPS_sampler_n_rand_peers_ready_cb cb,
                               void *cls, uint32_t num_peers);
 
+/**
+ * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
+ *
+ * @param req_handle the handle to the request
+ */
+void
+RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle);
 
+
 /**
  * Counts how many Samplers currently hold a given PeerID.
  *

Modified: gnunet/src/rps/rps.h
===================================================================
--- gnunet/src/rps/rps.h        2015-07-31 13:53:26 UTC (rev 36160)
+++ gnunet/src/rps/rps.h        2015-08-02 14:48:28 UTC (rev 36161)
@@ -106,6 +106,22 @@
 };
 
 /**
+ * Message from client to RPS service to cancel request.
+ */
+struct GNUNET_RPS_CS_RequestCancelMessage
+{
+  /**
+   * Header including size and type in NBO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Identifyer of the message.
+   */
+  uint32_t id GNUNET_PACKED;
+};
+
+/**
  * Message from client to service with seed of peers.
  */
 struct GNUNET_RPS_CS_SeedMessage

Modified: gnunet/src/rps/rps_api.c
===================================================================
--- gnunet/src/rps/rps_api.c    2015-07-31 13:53:26 UTC (rev 36160)
+++ gnunet/src/rps/rps_api.c    2015-08-02 14:48:28 UTC (rev 36161)
@@ -339,7 +339,8 @@
 GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
                           uint32_t type,
                           uint32_t num_peers,
-                          const struct GNUNET_PeerIdentity *peer_ids)
+                          const struct GNUNET_PeerIdentity *peer_ids,
+                          const struct GNUNET_PeerIdentity *target_peer)
 {
   size_t size_needed;
   uint32_t num_peers_max;
@@ -379,8 +380,8 @@
                               GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
     msg->type = htonl (type);
     msg->num_peers = htonl (num_peers_max);
-    if (2 == type
-        || 3 == type)
+    if ( (2 == type) ||
+         (3 == type) )
       msg->attacked_peer = peer_ids[num_peers];
     memcpy (&msg[1],
             tmp_peer_pointer,
@@ -400,9 +401,9 @@
                             GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
   msg->type = htonl (type);
   msg->num_peers = htonl (num_peers);
-  if (2 == type
-      || 3 == type)
-    msg->attacked_peer = peer_ids[num_peers];
+  if ( (2 == type) ||
+       (3 == type) )
+    msg->attacked_peer = *target_peer;
   memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct 
GNUNET_PeerIdentity));
 
   GNUNET_MQ_send (h->mq, ev);
@@ -418,7 +419,17 @@
   void
 GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
 {
-  // TODO
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_RPS_CS_RequestCancelMessage*msg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Cancelling request with id %" PRIu32 "\n",
+       rh->id);
+
+  GNUNET_array_append (req_handlers, req_handlers_size, *rh);
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL);
+  msg->id = htonl (rh->id);
+  GNUNET_MQ_send (rh->rps_handle->mq, ev);
 }
 
 

Modified: gnunet/src/rps/test_rps.c
===================================================================
--- gnunet/src/rps/test_rps.c   2015-07-31 13:53:26 UTC (rev 36160)
+++ gnunet/src/rps/test_rps.c   2015-08-02 14:48:28 UTC (rev 36161)
@@ -46,6 +46,7 @@
 //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
 static struct GNUNET_TIME_Relative timeout;
 
+
 /**
  * Portion of malicious peers
  */
@@ -106,6 +107,52 @@
 
 
 /**
+ * A pending reply: A request was sent and the reply is pending.
+ */
+struct PendingReply
+{
+  /**
+   * DLL next,prev ptr
+   */
+  struct PendingReply *next;
+  struct PendingReply *prev;
+
+  /**
+   * Handle to the request we are waiting for
+   */
+  struct GNUNET_RPS_Request_Handle *req_handle;
+
+  /**
+   * The peer that requested
+   */
+  struct RPSPeer *rps_peer;
+};
+
+
+/**
+ * A pending request: A request was not made yet but is scheduled for later.
+ */
+struct PendingRequest
+{
+  /**
+   * DLL next,prev ptr
+   */
+  struct PendingRequest *next;
+  struct PendingRequest *prev;
+
+  /**
+   * Handle to the request we are waiting for
+   */
+  struct GNUNET_SCHEDULER_Task *request_task;
+
+  /**
+   * The peer that requested
+   */
+  struct RPSPeer *rps_peer;
+};
+
+
+/**
  * Information we track for each peer.
  */
 struct RPSPeer
@@ -141,6 +188,33 @@
   int online;
 
   /**
+   * Number of Peer IDs to request
+   */
+  unsigned int num_ids_to_request;
+
+  /**
+   * Pending requests DLL
+   */
+  struct PendingRequest *pending_req_head;
+  struct PendingRequest *pending_req_tail;
+
+  /**
+   * Number of pending requests
+   */
+  unsigned int num_pending_reqs;
+
+  /**
+   * Pending replies DLL
+   */
+  struct PendingReply *pending_rep_head;
+  struct PendingReply *pending_rep_tail;
+
+  /**
+   * Number of pending replies
+   */
+  unsigned int num_pending_reps;
+
+  /**
    * Received PeerIDs
    */
   struct GNUNET_PeerIdentity *rec_ids;
@@ -168,6 +242,16 @@
 static struct GNUNET_PeerIdentity *rps_peer_ids;
 
 /**
+ * ID of the targeted peer.
+ */
+static struct GNUNET_PeerIdentity *target_peer;
+
+/**
+ * ID of the peer that requests for the evaluation.
+ */
+static struct RPSPeer *eval_peer;
+
+/**
  * Number of online peers.
  */
 static unsigned int num_peers_online;
@@ -185,6 +269,11 @@
 
 
 /**
+ * Called to initialise the given RPSPeer
+ */
+typedef void (*InitPeer) (struct RPSPeer *rps_peer);
+
+/**
  * Called directly after connecting to the service
  */
 typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h);
@@ -224,6 +313,11 @@
   char *name;
 
   /**
+   * Called to initialise peer
+   */
+  InitPeer init_peer;
+
+  /**
    * Called directly after connecting to the service
    */
   PreTest pre_test;
@@ -398,80 +492,6 @@
 
 
 /**
- * Callback to be called when RPS service is started or stopped at peers
- *
- * @param cls NULL
- * @param op the operation handle
- * @param emsg NULL on success; otherwise an error description
- */
-static void
-churn_cb (void *cls,
-          struct GNUNET_TESTBED_Operation *op,
-          const char *emsg)
-{
-  // FIXME
-  struct OpListEntry *entry = cls;
-
-  GNUNET_TESTBED_operation_done (entry->op);
-  if (NULL != emsg)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a 
peer\n");
-    GNUNET_SCHEDULER_shutdown ();
-    return;
-  }
-  GNUNET_assert (0 != entry->delta);
-
-  num_peers_online += entry->delta;
-
-  if (0 > entry->delta)
-  { /* Peer hopefully just went offline */
-    if (GNUNET_YES != rps_peers[entry->index].online)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "peer %s was expected to go offline but is still marked as 
online\n",
-                  GNUNET_i2s (rps_peers[entry->index].peer_id));
-      GNUNET_break (0);
-    }
-    else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "peer %s probably went offline as expected\n",
-                  GNUNET_i2s (rps_peers[entry->index].peer_id));
-    }
-    rps_peers[entry->index].online = GNUNET_NO;
-  }
-
-  else if (0 < entry->delta)
-  { /* Peer hopefully just went online */
-    if (GNUNET_NO != rps_peers[entry->index].online)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "peer %s was expected to go online but is still marked as 
offline\n",
-                  GNUNET_i2s (rps_peers[entry->index].peer_id));
-      GNUNET_break (0);
-    }
-    else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "peer %s probably went online as expected\n",
-                  GNUNET_i2s (rps_peers[entry->index].peer_id));
-      if (NULL != cur_test_run.pre_test)
-      {
-        cur_test_run.pre_test (&rps_peers[entry->index],
-            rps_peers[entry->index].rps_handle);
-      }
-    }
-    rps_peers[entry->index].online = GNUNET_YES;
-  }
-
-  GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
-  GNUNET_free (entry);
-  //if (num_peers_in_round[current_round] == peers_running)
-  //  run_round ();
-}
-
-
-/**
  * Task run on timeout to shut everything down.
  */
 static void
@@ -543,14 +563,15 @@
       &rps_peer_ids[entry->index],
       &rps_peers[entry->index],
       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-
   tofile ("/tmp/rps/peer_ids",
            "%u\t%s\n",
            entry->index,
            GNUNET_i2s_full (&rps_peer_ids[entry->index]));
 
+  if (NULL != cur_test_run.init_peer)
+    cur_test_run.init_peer (&rps_peers[entry->index]);
+
   GNUNET_TESTBED_operation_done (entry->op);
-
   GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
   GNUNET_free (entry);
 }
@@ -650,10 +671,18 @@
 static int
 no_eval (void)
 {
-  return 1;
+  return 0;
 }
 
 /**
+ * Initialise given RPSPeer
+ */
+static void default_init_peer (struct RPSPeer *rps_peer)
+{
+  rps_peer->num_ids_to_request = 1;
+}
+
+/**
  * Callback to call on receipt of a reply
  *
  * @param cls closure
@@ -665,9 +694,15 @@
                       uint64_t n,
                       const struct GNUNET_PeerIdentity *recv_peers)
 {
-  struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
+  struct RPSPeer *rps_peer;
+  struct PendingReply *pending_rep = (struct PendingReply *) cls;
   unsigned int i;
 
+  rps_peer = pending_rep->rps_peer;
+  GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
+                               rps_peer->pending_rep_tail,
+                               pending_rep);
+  rps_peer->num_pending_reps--;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "[%s] got %" PRIu64 " peers:\n",
               GNUNET_i2s (rps_peer->peer_id),
@@ -691,21 +726,120 @@
 request_peers (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
+  struct RPSPeer *rps_peer;
+  struct PendingRequest *pending_req = (struct PendingRequest *) cls;
+  struct PendingReply *pending_rep;
 
   if (GNUNET_YES == in_shutdown)
     return;
+  rps_peer = pending_req->rps_peer;
+  GNUNET_assert (1 <= rps_peer->num_pending_reqs);
+  GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
+                               rps_peer->pending_req_tail,
+                               pending_req);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Requesting one peer\n");
+  pending_rep = GNUNET_new (struct PendingReply);
+  pending_rep->rps_peer = rps_peer;
+  pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
+      1,
+      cur_test_run.reply_handle,
+      pending_rep);
+  GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
+                                    rps_peer->pending_rep_tail,
+                                    pending_rep);
+  rps_peer->num_pending_reps++;
+  rps_peer->num_pending_reqs--;
+}
 
-  GNUNET_free (GNUNET_RPS_request_peers (rps_peer->rps_handle,
-                                         1,
-                                         cur_test_run.reply_handle,
-                                         rps_peer));
-  //rps_peer->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1, 
handle_reply, rps_peer);
+static void
+cancel_pending_req (struct PendingRequest *pending_req)
+{
+  struct RPSPeer *rps_peer;
+
+  rps_peer = pending_req->rps_peer;
+  GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
+                               rps_peer->pending_req_tail,
+                               pending_req);
+  rps_peer->num_pending_reqs--;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cancelling pending request\n");
+  GNUNET_SCHEDULER_cancel (pending_req->request_task);
+  GNUNET_free (pending_req);
 }
 
+static void
+cancel_request (struct PendingReply *pending_rep)
+{
+  struct RPSPeer *rps_peer;
 
+  rps_peer = pending_rep->rps_peer;
+  GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
+                               rps_peer->pending_rep_tail,
+                               pending_rep);
+  rps_peer->num_pending_reps--;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Cancelling request\n");
+  GNUNET_RPS_request_cancel (pending_rep->req_handle);
+  GNUNET_free (pending_rep);
+}
+
+/**
+ * Cancel a request.
+ */
+static void
+cancel_request_cb (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PendingReply *pending_rep;
+  struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
+
+  if (GNUNET_YES == in_shutdown)
+    return;
+  pending_rep = rps_peer->pending_rep_head;
+  GNUNET_assert (1 <= rps_peer->num_pending_reps);
+  cancel_request (pending_rep);
+}
+
+
+/**
+ * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
+ * issued, nor replied
+ */
+void
+schedule_missing_requests (struct RPSPeer *rps_peer)
+{
+  unsigned int i;
+  struct PendingRequest *pending_req;
+
+  for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
+       i < rps_peer->num_ids_to_request; i++)
+  {
+    pending_req = GNUNET_new (struct PendingRequest);
+    pending_req->rps_peer = rps_peer;
+    pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
+        GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+          cur_test_run.request_interval * i),
+        request_peers,
+        pending_req);
+    GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
+                                      rps_peer->pending_req_tail,
+                                      pending_req);
+    rps_peer->num_pending_reqs++;
+  }
+}
+
+void
+cancel_pending_req_rep (struct RPSPeer *rps_peer)
+{
+  while (NULL != rps_peer->pending_req_head)
+    cancel_pending_req (rps_peer->pending_req_head);
+  GNUNET_assert (0 == rps_peer->num_pending_reqs);
+  while (NULL != rps_peer->pending_rep_head)
+    cancel_request (rps_peer->pending_rep_head);
+  GNUNET_assert (0 == rps_peer->num_pending_reps);
+}
+
 /***********************************
  * MALICIOUS
 ***********************************/
@@ -716,8 +850,8 @@
   uint32_t num_mal_peers;
   struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
 
-  GNUNET_assert (1 >= portion
-                 && 0 <  portion);
+  GNUNET_assert ( (1 >= portion) &&
+                  (0 <  portion) );
   num_mal_peers = round (portion * num_peers);
 
   if (rps_peer->index < num_mal_peers)
@@ -728,7 +862,8 @@
                 GNUNET_i2s (rps_peer->peer_id),
                 num_mal_peers);
 
-    GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, rps_peer_ids);
+    GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
+                              rps_peer_ids, target_peer);
   }
   #endif /* ENABLE_MALICIOUS */
 }
@@ -739,8 +874,8 @@
   uint32_t num_mal_peers;
 
   #ifdef ENABLE_MALICIOUS
-  GNUNET_assert (1 >= portion
-                 && 0 <  portion);
+  GNUNET_assert ( (1 >= portion) &&
+                  (0 <  portion) );
   num_mal_peers = round (portion * num_peers);
 
   if (rps_peer->index >= num_mal_peers)
@@ -748,8 +883,7 @@
        it's not sampling */
     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 2),
                                   seed_peers, rps_peer);
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 10),
-                                  request_peers, rps_peer);
+    schedule_missing_requests (rps_peer);
   }
   #endif /* ENABLE_MALICIOUS */
 }
@@ -772,8 +906,7 @@
 static void
 single_req_cb (struct RPSPeer *rps_peer)
 {
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5),
-                                request_peers, rps_peer);
+  schedule_missing_requests (rps_peer);
 }
 
 /***********************************
@@ -782,10 +915,7 @@
 static void
 delay_req_cb (struct RPSPeer *rps_peer)
 {
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5),
-                                request_peers, rps_peer);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 10),
-                                request_peers, rps_peer);
+  schedule_missing_requests (rps_peer);
 }
 
 /***********************************
@@ -824,8 +954,7 @@
 {
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 2),
                                 seed_peers, rps_peer);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 15),
-                                request_peers, rps_peer);
+  schedule_missing_requests (rps_peer);
 }
 
 //TODO start big mal
@@ -836,16 +965,128 @@
 static void
 req_cancel_cb (struct RPSPeer *rps_peer)
 {
-  // TODO
+  schedule_missing_requests (rps_peer);
+  GNUNET_SCHEDULER_add_delayed (
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                     (cur_test_run.request_interval + 1)),
+      cancel_request_cb, rps_peer);
 }
 
 /***********************************
  * PROFILER
 ***********************************/
+
+/**
+ * Callback to be called when RPS service is started or stopped at peers
+ *
+ * @param cls NULL
+ * @param op the operation handle
+ * @param emsg NULL on success; otherwise an error description
+ */
 static void
+churn_cb (void *cls,
+          struct GNUNET_TESTBED_Operation *op,
+          const char *emsg)
+{
+  // FIXME
+  struct OpListEntry *entry = cls;
+
+  GNUNET_TESTBED_operation_done (entry->op);
+  if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a 
peer\n");
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  GNUNET_assert (0 != entry->delta);
+
+  num_peers_online += entry->delta;
+
+  if (0 > entry->delta)
+  { /* Peer hopefully just went offline */
+    if (GNUNET_YES != rps_peers[entry->index].online)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "peer %s was expected to go offline but is still marked as 
online\n",
+                  GNUNET_i2s (rps_peers[entry->index].peer_id));
+      GNUNET_break (0);
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "peer %s probably went offline as expected\n",
+                  GNUNET_i2s (rps_peers[entry->index].peer_id));
+    }
+    rps_peers[entry->index].online = GNUNET_NO;
+  }
+
+  else if (0 < entry->delta)
+  { /* Peer hopefully just went online */
+    if (GNUNET_NO != rps_peers[entry->index].online)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "peer %s was expected to go online but is still marked as 
offline\n",
+                  GNUNET_i2s (rps_peers[entry->index].peer_id));
+      GNUNET_break (0);
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "peer %s probably went online as expected\n",
+                  GNUNET_i2s (rps_peers[entry->index].peer_id));
+      if (NULL != cur_test_run.pre_test)
+      {
+        cur_test_run.pre_test (&rps_peers[entry->index],
+            rps_peers[entry->index].rps_handle);
+        schedule_missing_requests (&rps_peers[entry->index]);
+      }
+    }
+    rps_peers[entry->index].online = GNUNET_YES;
+  }
+
+  GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
+  GNUNET_free (entry);
+  //if (num_peers_in_round[current_round] == peers_running)
+  //  run_round ();
+}
+
+static void
+manage_service_wrapper (unsigned int i, unsigned int j, int delta,
+    double prob_go_on_off)
+{
+  struct OpListEntry *entry;
+  uint32_t prob;
+
+  prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                   UINT32_MAX);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%u. selected peer (%u: %s) is %s.\n",
+              i,
+              j,
+              GNUNET_i2s (rps_peers[j].peer_id),
+              (delta < 0)? "online" : "offline");
+  if (prob < prob_go_on_off * UINT32_MAX)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%s goes %s\n",
+                GNUNET_i2s (rps_peers[j].peer_id),
+                (delta < 0) ? "offline" : "online");
+
+    entry = make_oplist_entry ();
+    entry->delta = delta;
+    entry->index = j;
+    entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
+                                                    testbed_peers[j],
+                                                    "rps",
+                                                    &churn_cb,
+                                                    entry,
+                                                    (delta < 0) ? 0 : 1);
+  }
+}
+
+static void
 churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct OpListEntry *entry;
   unsigned int i;
   unsigned int j;
   double portion_online;
@@ -853,7 +1094,6 @@
   double prob_go_offline;
   double portion_go_online;
   double portion_go_offline;
-  uint32_t prob;
 
   /* Compute the probability for an online peer to go offline
    * this round */
@@ -878,7 +1118,7 @@
                                          (unsigned int) num_peers);
 
   /* Go over 50% randomly chosen peers */
-  for (i = 0 ; i < .5 * num_peers ; i++)
+  for (i = 0; i < .5 * num_peers; i++)
   {
     j = permut[i];
 
@@ -885,58 +1125,15 @@
     /* If online, shut down with certain probability */
     if (GNUNET_YES == rps_peers[j].online)
     {
-      prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                       UINT32_MAX);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%u. selected peer (%u: %s) is online.\n",
-                  i,
-                  j,
-                  GNUNET_i2s (rps_peers[j].peer_id));
-      if (prob < prob_go_offline * UINT32_MAX)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "%s goes offline\n",
-                    GNUNET_i2s (rps_peers[j].peer_id));
+      cancel_pending_req_rep (&rps_peers[j]);
+      manage_service_wrapper (i, j, -1, prob_go_offline);
+    }
 
-        entry = make_oplist_entry ();
-        entry->delta = -1;
-        entry->index = j;
-        entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
-                                                        testbed_peers[j],
-                                                        "rps",
-                                                        &churn_cb,
-                                                        entry,
-                                                        0);
-      }
-   }
-
-   /* If offline, restart with certain probability */
-   else if (GNUNET_NO == rps_peers[j].online)
-   {
-     prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                      UINT32_MAX);
-     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "%u. selected peer (%u: %s) is offline.\n",
-                 i,
-                 j,
-                 GNUNET_i2s (rps_peers[j].peer_id));
-     if (prob < .66 * UINT32_MAX)
-     {
-       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                   "%s goes online\n",
-                   GNUNET_i2s (rps_peers[j].peer_id));
-
-       entry = make_oplist_entry ();
-       entry->delta = 1;
-       entry->index = j;
-       entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
-                                                       testbed_peers[j],
-                                                       "rps",
-                                                       &churn_cb,
-                                                       entry,
-                                                       1);
-     }
-   }
+    /* If offline, restart with certain probability */
+    else if (GNUNET_NO == rps_peers[j].online)
+    {
+      manage_service_wrapper (i, j, 1, 0.66);
+    }
   }
 
   GNUNET_free (permut);
@@ -948,21 +1145,13 @@
 }
 
 
-static void
-profiler_pre (void *cls, struct GNUNET_RPS_Handle *h)
+/**
+ * Initialise given RPSPeer
+ */
+static void profiler_init_peer (struct RPSPeer *rps_peer)
 {
-  //churn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
-  //                                                                          
10),
-  //                                           churn, NULL);
-  mal_pre (cls, h);
-
-  /* if (NULL == churn_task)
-  {
-    churn_task = GNUNET_SCHEDULER_add_delayed (
-          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
-          churn,
-          NULL);
-  } */
+  if (num_peers - 1 == rps_peer->index)
+    rps_peer->num_ids_to_request = cur_test_run.num_requests;
 }
 
 
@@ -978,38 +1167,39 @@
                       uint64_t n,
                       const struct GNUNET_PeerIdentity *recv_peers)
 {
-  struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
+  struct RPSPeer *rps_peer;
   struct RPSPeer *rcv_rps_peer;
   char *file_name;
   char *file_name_dh;
   unsigned int i;
+  struct PendingReply *pending_rep = (struct PendingReply *) cls;
 
+  rps_peer = pending_rep->rps_peer;
   file_name = "/tmp/rps/received_ids";
   file_name_dh = "/tmp/rps/diehard_input";
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "[%s] got %" PRIu64 " peers:\n",
               GNUNET_i2s (rps_peer->peer_id),
               n);
-  
-  for (i = 0 ; i < n ; i++)
+  for (i = 0; i < n; i++)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "%u: %s\n",
                 i,
                 GNUNET_i2s (&recv_peers[i]));
-
-    /* GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, 
recv_peers[i]); */
     tofile (file_name,
              "%s\n",
              GNUNET_i2s_full (&recv_peers[i]));
-
     rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, 
&recv_peers[i]);
-
     tofile (file_name_dh,
              "%" PRIu32 "\n",
              (uint32_t) rcv_rps_peer->index);
   }
+  /* Find #PendingReply holding the request handle */
+  GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
+                               rps_peer->pending_rep_tail,
+                               pending_rep);
+  rps_peer->num_pending_reps--;
 }
 
 
@@ -1016,14 +1206,11 @@
 static void
 profiler_cb (struct RPSPeer *rps_peer)
 {
-  uint32_t i;
-
-  /* Churn only at peers that do not request peers for evaluation */
-  if (NULL == churn_task &&
-      rps_peer->index != num_peers - 2)
+  /* Start churn */
+  if (NULL == churn_task)
   {
     churn_task = GNUNET_SCHEDULER_add_delayed (
-          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
+          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
           churn,
           NULL);
   }
@@ -1031,17 +1218,8 @@
   /* Only request peer ids at one peer.
    * (It's the before-last because last one is target of the focussed attack.)
    */
-  if (rps_peer->index == num_peers - 2)
-  {
-    for (i = 0 ; i < cur_test_run.num_requests ; i++)
-    {
-      GNUNET_SCHEDULER_add_delayed (
-          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                         cur_test_run.request_interval * i),
-          request_peers,
-          rps_peer);
-    }
-  }
+  if (eval_peer == rps_peer)
+    schedule_missing_requests (rps_peer);
 }
 
 /**
@@ -1126,7 +1304,6 @@
 
   testbed_peers = peers;
   num_peers_online = 0;
-
   for (i = 0 ; i < num_peers ; i++)
   {
     entry = make_oplist_entry ();
@@ -1137,16 +1314,6 @@
                                                      entry);
   }
 
-
-  // This seems not to work
-  //if (NULL != strstr (cur_test_run.name, "profiler"))
-  //{
-  //  churn_task = GNUNET_SCHEDULER_add_delayed (
-  //      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
-  //      churn,
-  //      NULL);
-  //}
-
   GNUNET_assert (num_peers == n_peers);
   for (i = 0 ; i < n_peers ; i++)
   {
@@ -1161,6 +1328,9 @@
                                       &rps_disconnect_adapter,
                                       &rps_peers[i]);
   }
+
+  if (NULL != churn_task)
+    GNUNET_SCHEDULER_cancel (churn_task);
   GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL);
 }
 
@@ -1177,13 +1347,13 @@
 {
   int ret_value;
 
+  num_peers = 5;
   cur_test_run.name = "test-rps-default";
+  cur_test_run.init_peer = default_init_peer;
   cur_test_run.pre_test = NULL;
   cur_test_run.reply_handle = default_reply_handle;
   cur_test_run.eval_cb = default_eval_cb;
   churn_task = NULL;
-
-  num_peers = 5;
   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
 
   if (strstr (argv[0], "malicious") != NULL)
@@ -1259,7 +1429,9 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
     cur_test_run.name = "test-rps-req-cancel";
+    num_peers = 1;
     cur_test_run.main_test = req_cancel_cb;
+    cur_test_run.eval_cb = no_eval;
   }
 
   else if (strstr (argv[0], "profiler") != NULL)
@@ -1266,24 +1438,30 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
     cur_test_run.name = "test-rps-profiler";
+    num_peers = 10;
     mal_type = 3;
-    cur_test_run.pre_test = profiler_pre;
+    cur_test_run.init_peer = profiler_init_peer;
+    cur_test_run.pre_test = mal_pre;
     cur_test_run.main_test = profiler_cb;
     cur_test_run.reply_handle = profiler_reply_handle;
     cur_test_run.eval_cb = profiler_eval;
     cur_test_run.request_interval = 2;
-    cur_test_run.num_requests = 50;
+    cur_test_run.num_requests = 5;
+    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
 
-    num_peers = 50;
-
+    /* 'Clean' directory */
     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
     GNUNET_DISK_directory_create ("/tmp/rps/");
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90);
   }
 
   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
+  peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
   rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
-  peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
+  if ( (2 == mal_type) ||
+       (3 == mal_type))
+    target_peer = &rps_peer_ids[num_peers - 2];
+  if (profiler_eval == cur_test_run.eval_cb)
+    eval_peer = &rps_peers[num_peers - 1];
 
   ok = 1;
   (void) GNUNET_TESTBED_test_run (cur_test_run.name,
@@ -1293,11 +1471,9 @@
                                   &run, NULL);
 
   ret_value = cur_test_run.eval_cb();
-
   GNUNET_free (rps_peers );
   GNUNET_free (rps_peer_ids);
   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
-
   return ret_value;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]