[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r34922 - gnunet/src/rps
From: |
gnunet |
Subject: |
[GNUnet-SVN] r34922 - gnunet/src/rps |
Date: |
Sun, 18 Jan 2015 04:19:02 +0100 |
Author: ch3
Date: 2015-01-18 04:19:02 +0100 (Sun, 18 Jan 2015)
New Revision: 34922
Modified:
gnunet/src/rps/gnunet-service-rps.c
gnunet/src/rps/gnunet-service-rps_sampler.c
gnunet/src/rps/gnunet-service-rps_sampler.h
Log:
schedule some requests for later
Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-01-18 00:33:51 UTC (rev 34921)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-01-18 03:19:02 UTC (rev 34922)
@@ -226,7 +226,7 @@
/**
* Identifier for the main task that runs periodically.
*/
-static struct GNUNET_SCHEDULER_Task * do_round_task;
+static struct GNUNET_SCHEDULER_Task *do_round_task;
/**
* Time inverval the do_round task runs in.
@@ -1346,7 +1346,14 @@
/* Initialise sampler */
- RPS_sampler_init (sampler_size_est_need, own_identity, insertCB, NULL,
removeCB, NULL);
+ struct GNUNET_TIME_Relative half_round_interval;
+ struct GNUNET_TIME_Relative max_round_interval;
+
+ half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
+ max_round_interval = GNUNET_TIME_relative_add (round_interval,
half_round_interval);
+
+ RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval,
+ insertCB, NULL, removeCB, NULL);
sampler_size = sampler_size_est_need;
/* Initialise push and pull maps */
Modified: gnunet/src/rps/gnunet-service-rps_sampler.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-18 00:33:51 UTC (rev
34921)
+++ gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-18 03:19:02 UTC (rev
34922)
@@ -77,15 +77,31 @@
*/
struct GNUNET_HashCode peer_id_hash;
+
/**
* Time of last request.
*/
- struct GNUNET_TIME_Absolute last_request;
+ struct GNUNET_TIME_Absolute last_client_request;
/**
* Flag that indicates that we are not holding a valid PeerID right now.
*/
enum RPS_SamplerEmpty is_empty;
+
+ /**
+ * 'Birth'
+ */
+ struct GNUNET_TIME_Absolute birth;
+
+ /**
+ * How many times a PeerID was put in this sampler.
+ */
+ uint32_t num_peers;
+
+ /**
+ * How many times this sampler changed the peer_id.
+ */
+ uint32_t num_change;
};
/**
@@ -112,6 +128,13 @@
uint64_t sampler_elem_index;
/**
+ * Max time a round takes
+ *
+ * Used in the context of RPS
+ */
+ struct GNUNET_TIME_Relative max_round_interval;
+
+ /**
* Callback to be called when a peer gets inserted into a sampler.
*/
RPS_sampler_insert_cb insert_cb;
@@ -174,6 +197,38 @@
const struct GNUNET_PeerIdentity *id);
/**
+ * Closure to #RPS_sampler_get_rand_peer()
+ */
+struct GetPeerCls
+{
+ /**
+ * The task for this function.
+ */
+ struct GNUNET_SCHEDULER_Task *get_peer_task;
+
+ /**
+ * The callback
+ */
+ RPS_sampler_rand_peer_ready_cb cb;
+
+ /**
+ * The closure to the callback
+ */
+ void *cb_cls;
+
+ /**
+ * The address of the id to be stored at
+ */
+ struct GNUNET_PeerIdentity *id;
+};
+
+/**
+ * Multihashmap that keeps track of all get_peer_tasks that are still
scheduled.
+ */
+struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks;
+
+
+/**
* Global sampler variable.
*/
struct RPS_Sampler *sampler;
@@ -214,10 +269,17 @@
n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: Got %" PRIX64 "th of %" PRIX64 " peers\n",
+ n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
+
if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
- {
+ { /* All peers are ready -- return those to the client */
GNUNET_assert (NULL != n_peers_cls->callback);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: returning %" PRIX64 " peers to the client\n",
+ n_peers_cls->num_peers);
n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids,
n_peers_cls->num_peers);
GNUNET_free (n_peers_cls);
@@ -240,13 +302,17 @@
&(sampler_el->auth_key.key),
GNUNET_CRYPTO_HASH_LENGTH);
- sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS;
+ sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
/* We might want to keep the previous peer */
//GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
// sizeof(struct GNUNET_PeerIdentity),
// &sampler_el->peer_id_hash);
+
+ sampler_el->birth = GNUNET_TIME_absolute_get ();
+ sampler_el->num_peers = 0;
+ sampler_el->num_change = 0;
}
@@ -282,12 +348,14 @@
{
struct GNUNET_HashCode other_hash;
- if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) )
+ s_elem->num_peers++;
+
+ if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
- GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
- GNUNET_i2s(&(s_elem->peer_id)));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ GNUNET_i2s (other));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
+ GNUNET_i2s (&(s_elem->peer_id)));
}
else
{
@@ -297,48 +365,48 @@
&other_hash);
if ( EMPTY == s_elem->is_empty )
- { // Or whatever is a valid way to say
- // "we have no PeerID at the moment"
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting
(was empty previously).\n",
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting
(was empty previously).\n",
GNUNET_i2s(other));
s_elem->peer_id = *other;
- //s_elem->peer_id = other;
s_elem->peer_id_hash = other_hash;
+
if (NULL != sampler->insert_cb)
- {
- sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id));
- }
+ sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id));
+
+ s_elem->num_change++;
}
- else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) )
+ else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
- GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
- GNUNET_i2s(&s_elem->peer_id));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ GNUNET_i2s (other));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
+ GNUNET_i2s (&s_elem->peer_id));
if ( NULL != sampler->remove_cb )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the
remove callback.\n",
- GNUNET_i2s(&s_elem->peer_id));
- sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with
the remove callback.\n",
+ GNUNET_i2s (&s_elem->peer_id));
+ sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
}
- memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
- //s_elem->peer_id = other;
+ s_elem->peer_id = *other;
s_elem->peer_id_hash = other_hash;
if ( NULL != sampler->insert_cb )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with
the insert callback.\n",
- GNUNET_i2s(&s_elem->peer_id));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with
the insert callback.\n",
+ GNUNET_i2s (&s_elem->peer_id));
sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
}
+
+ s_elem->num_change++;
}
else
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
GNUNET_i2s(&s_elem->peer_id));
}
}
@@ -410,7 +478,7 @@
}
GNUNET_assert(sampler->sampler_size == new_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); //
remove
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); //
remove
}
@@ -427,7 +495,9 @@
* @param rem_cls the closure given to #rem_cb
*/
void
-RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+RPS_sampler_init (size_t init_size,
+ const struct GNUNET_PeerIdentity *id,
+ struct GNUNET_TIME_Relative max_round_interval,
RPS_sampler_insert_cb ins_cb, void *ins_cls,
RPS_sampler_remove_cb rem_cb, void *rem_cls)
{
@@ -441,10 +511,12 @@
sampler = GNUNET_new (struct RPS_Sampler);
sampler->sampler_size = 0;
sampler->sampler_elements = NULL;
+ sampler->max_round_interval = max_round_interval;
sampler->insert_cb = ins_cb;
sampler->insert_cls = ins_cls;
sampler->remove_cb = rem_cb;
sampler->remove_cls = rem_cls;
+ get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
//sampler->sampler_elements = GNUNET_new_array(init_size, struct
GNUNET_PeerIdentity);
//GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size,
min_size);
RPS_sampler_resize (init_size);
@@ -489,7 +561,7 @@
{
if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id,
&(sampler->sampler_elements[i]->peer_id)) )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
}
}
@@ -523,8 +595,8 @@
// peer = NULL;
//else
peer = &(sampler->sampler_elements[r_index]->peer_id);
- sampler->sampler_elements[r_index]->last_request =
GNUNET_TIME_absolute_get();
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n",
GNUNET_i2s(peer));
+ //sampler->sampler_elements[r_index]->last_client_request =
GNUNET_TIME_absolute_get();
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n",
GNUNET_i2s(peer));
return peer;
}
@@ -574,22 +646,66 @@
* @return a random PeerID of the PeerIDs previously put into the sampler.
*/
void
-RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
- void *cls, struct GNUNET_PeerIdentity *id)
+//RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
+// void *cls, struct GNUNET_PeerIdentity *id)
+RPS_sampler_get_rand_peer (void *cls, const struct
GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GetPeerCls *gpc;
+ struct RPS_SamplerElement *s_elem;
+ struct GNUNET_TIME_Relative last_request_diff;
+ struct GNUNET_HashCode *hash;
+ //struct GNUNET_TIME_Relative inv_last_request_diff;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
+
+ gpc = (struct GetPeerCls *) cls;
+ hash = GNUNET_new (struct GNUNET_HashCode);
+
do
- {
- // TODO check if we can actually return that now - otherwise wait
- *id = sampler->sampler_elements[client_get_index]->peer_id;
+ { /* Get first non empty sampler */
+ // TODO schedule for later if all samplers are empty
+ *gpc->id = sampler->sampler_elements[client_get_index]->peer_id;
RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
if ( client_get_index == sampler->sampler_size )
client_get_index = 0;
else
client_get_index++;
- } while (NOT_EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
+ } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
- cb (cls, id);
+ s_elem = sampler->sampler_elements[client_get_index];
+
+ /* Check whether we may use this sampler to give it back to the client */
+ if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us !=
s_elem->last_client_request.abs_value_us)
+ {
+ last_request_diff = GNUNET_TIME_absolute_get_difference
(s_elem->last_client_request,
+
GNUNET_TIME_absolute_get ());
+ /* We're not going to give it back now if it was already requested by a
client this round */
+ if (last_request_diff.rel_value_us <
sampler->max_round_interval.rel_value_us)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: Last client request on this sampler was less than max
round interval ago -- scheduling for later\n");
+ ///* How many time remains untile the next round has started? */
+ //inv_last_request_diff = GNUNET_TIME_absolute_get_difference
(last_request_diff,
+ //
sampler->max_round_interval);
+ // add a little delay
+ /* Schedule it one round later */
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed
(sampler->max_round_interval,
+ &RPS_sampler_get_rand_peer,
+ cls);
+ return;
+ }
+ // TODO add other reasons to wait here
+ }
+
+ GNUNET_CRYPTO_hash (gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task
*), hash);
+ if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash,
gpc->get_peer_task))
+ LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the
hashmap\n");
+ GNUNET_free (gpc->get_peer_task);
+
+ s_elem->last_client_request = GNUNET_TIME_absolute_get ();
+
+ gpc->cb (gpc->cb_cls, gpc->id);
}
@@ -608,11 +724,11 @@
RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
void *cls, uint64_t num_peers)
{
- // use _get_rand_peers_ ?
if ( 0 == sampler->sampler_size )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sgrp: List empty - Returning NULL\n");
+ cb (cls, NULL, 0);
}
else
{
@@ -621,18 +737,37 @@
struct GNUNET_PeerIdentity *peers;
uint64_t i;
struct RPS_GetNRandPeersReadyCls *cb_cls;
+ struct GetPeerCls *gpc;
+ struct GNUNET_HashCode *hash;
peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
+ hash = GNUNET_new (struct GNUNET_HashCode);
cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls);
cb_cls->num_peers = num_peers;
cb_cls->cur_num_peers = 0;
- cb_cls->callback = NULL;
- cb_cls->cls = NULL;
+ cb_cls->ids = peers;
+ cb_cls->callback = cb;
+ cb_cls->cls = cls;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: Scheduling requests for %" PRIX64 " peers\n", num_peers);
+
for ( i = 0 ; i < num_peers ; i++ )
- RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
- cb_cls, &peers[i]);
+ {
+ gpc = GNUNET_new (struct GetPeerCls);
+ gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb;
+ gpc->cb_cls = cb_cls;
+ gpc->id = &peers[i];
+
+ // maybe add a little delay
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_now
(&RPS_sampler_get_rand_peer, gpc);
+ GNUNET_CRYPTO_hash (gpc->get_peer_task, sizeof (struct
GNUNET_SCHEDULER_Task *), hash);
+ (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash,
gpc->get_peer_task,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
+ // cb_cls, &peers[i]);
+ }
}
}
@@ -662,11 +797,33 @@
/**
+ * Callback to iterate over the hashmap to cancle the get_peer_tasks.
+ */
+ int
+clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void
*value)
+{
+ struct GNUNET_SCHEDULER_Task *task;
+
+ task = (struct GNUNET_SCHEDULER_Task *) value;
+ GNUNET_SCHEDULER_cancel (task);
+
+ GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value);
+
+ return GNUNET_YES;
+}
+
+
+/**
* Cleans the sampler.
*/
void
RPS_sampler_destroy ()
{
+ if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks,
+
clear_get_peer_tasks,
+ NULL))
+ LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was
cancelled\n");
+ GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks);
RPS_sampler_resize (0);
GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
}
Modified: gnunet/src/rps/gnunet-service-rps_sampler.h
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-18 00:33:51 UTC (rev
34921)
+++ gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-18 03:19:02 UTC (rev
34922)
@@ -88,7 +88,9 @@
* @param rem_cls the closure given to #rem_cb
*/
void
-RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+RPS_sampler_init (size_t init_size,
+ const struct GNUNET_PeerIdentity *id,
+ struct GNUNET_TIME_Relative max_round_interval,
RPS_sampler_insert_cb ins_cb, void *ins_cls,
RPS_sampler_remove_cb rem_cb, void *rem_cls);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r34922 - gnunet/src/rps,
gnunet <=