[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r9034 - in gnunet/src: fs include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r9034 - in gnunet/src: fs include |
Date: |
Sun, 27 Sep 2009 17:06:56 -0600 |
Author: grothoff
Date: 2009-09-27 17:06:55 -0600 (Sun, 27 Sep 2009)
New Revision: 9034
Modified:
gnunet/src/fs/fs.h
gnunet/src/fs/gnunet-service-fs.c
gnunet/src/include/gnunet_container_lib.h
gnunet/src/include/gnunet_core_service.h
Log:
more work on fs service
Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h 2009-09-25 22:10:51 UTC (rev 9033)
+++ gnunet/src/fs/fs.h 2009-09-27 23:06:55 UTC (rev 9034)
@@ -75,6 +75,19 @@
#define TTL_DECREMENT 5000
/**
+ * Length of the P2P success tracker. Note that
+ * having a very long list can also hurt performance.
+ */
+#define P2P_SUCCESS_LIST_SIZE 8
+
+
+/**
+ * Length of the CS-2-P success tracker. Note that
+ * having a very long list can also hurt performance.
+ */
+#define CS2P_SUCCESS_LIST_SIZE 8
+
+/**
* How long are we willing to wait for the datastore to be ready to
* process a request for a query without priority?
*/
Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c 2009-09-25 22:10:51 UTC (rev 9033)
+++ gnunet/src/fs/gnunet-service-fs.c 2009-09-27 23:06:55 UTC (rev 9034)
@@ -38,6 +38,7 @@
* - check that we decrement PIDs always where necessary (can wait)
*/
#include "platform.h"
+#include <values.h>
#include "gnunet_core_service.h"
#include "gnunet_datastore_service.h"
#include "gnunet_peer_lib.h"
@@ -407,7 +408,10 @@
/**
* Pending transmission request with the core service for the target
- * peer (for processing of 'replies_pending').
+ * peer (for processing of 'replies_pending') or Handle for a
+ * pending query-request for P2P-transmission with the core service.
+ * If non-NULL, this request must be cancelled should this struct be
+ * destroyed!
*/
struct GNUNET_CORE_TransmitHandle *cth;
@@ -436,6 +440,12 @@
GNUNET_HashCode query;
/**
+ * The task responsible for transmitting queries
+ * for this request.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+
+ /**
* (Interned) Peer identifier (only valid if "client" is NULL)
* that identifies a peer that gave us this request.
*/
@@ -608,6 +618,69 @@
/**
+ * Information about a peer that we are connected to.
+ * We track data that is useful for determining which
+ * peers should receive our requests.
+ */
+struct ConnectedPeer
+{
+
+ /**
+ * List of the last clients for which this peer
+ * successfully answered a query.
+ */
+ struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
+
+ /**
+ * List of the last PIDs for which
+ * this peer successfully answered a query;
+ * We use 0 to indicate no successful reply.
+ */
+ GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
+
+ /**
+ * Average delay between sending the peer a request and
+ * getting a reply (only calculated over the requests for
+ * which we actually got a reply). Calculated
+ * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+ */
+ struct GNUNET_TIME_Relative avg_delay;
+
+ /**
+ * Average priority of successful replies. Calculated
+ * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
+ */
+ double avg_priority;
+
+ /**
+ * The peer's identity.
+ */
+ GNUNET_PEER_Id pid;
+
+ /**
+ * Number of requests we have currently pending
+ * with this peer (that is, requests that were
+ * transmitted so recently that we would not retransmit
+ * them right now).
+ */
+ unsigned int pending_requests;
+
+ /**
+ * Which offset in "last_p2p_replies" will be updated next?
+ * (we go round-robin).
+ */
+ unsigned int last_p2p_replies_woff;
+
+ /**
+ * Which offset in "last_client_replies" will be updated next?
+ * (we go round-robin).
+ */
+ unsigned int last_client_replies_woff;
+
+};
+
+
+/**
* Our connection to the datastore.
*/
static struct GNUNET_DATASTORE_Handle *dsh;
@@ -686,6 +759,11 @@
static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
/**
+ * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
+
+/**
* Maximum number of requests (from other peers) that we're
* willing to have pending at any given point in time.
* FIXME: set from configuration (and 32 is a tiny value for testing only).
@@ -1457,6 +1535,279 @@
/**
+ * Closure used for "target_peer_select_cb".
+ */
+struct PeerSelectionContext
+{
+ /**
+ * The request for which we are selecting
+ * peers.
+ */
+ struct PendingRequest *pr;
+
+ /**
+ * Current "prime" target.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * How much do we like this target?
+ */
+ double target_score;
+
+};
+
+
+/**
+ * Function called for each connected peer to determine
+ * which one(s) would make good targets for forwarding.
+ *
+ * @param cls closure (struct PeerSelectionContext)
+ * @param key current key code (peer identity)
+ * @param value value in the hash map (struct ConnectedPeer)
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int
+target_peer_select_cb (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerSelectionContext *psc = cls;
+ // struct ConnectedPeer *cp = value;
+ double score;
+ // FIXME (CRITICAL: would always sent to same peer without this!)
+ // 1) check if we have already (recently) forwarded to this peer, if so, skip
+ // 2) calculate how much we'd like to forward to this peer
+ score = 0;
+
+ // 3) store in closure
+ if (score > psc->target_score)
+ {
+ psc->target_score = score;
+ psc->target.hashPubKey = *key;
+ }
+ return GNUNET_YES;
+}
+
+
+
+
+/**
+ * We use a random delay to make the timing of requests
+ * less predictable. This function returns such a random
+ * delay.
+ *
+ * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+ return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK,
+
TTL_DECREMENT));
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We've selected a peer for forwarding of a query.
+ * Construct the message and then re-schedule the
+ * task to forward again to (other) peers.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request_cb (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct PendingRequest *pr = cls;
+ uint16_t msize;
+
+ pr->cth = NULL;
+ /* (1) check for timeout */
+ if (NULL == buf)
+ {
+ /* timeout, try another peer immediately again */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ GNUNET_TIME_UNIT_ZERO,
+ &forward_request_task,
+ pr);
+ return 0;
+ }
+ /* (2) build query message */
+ msize = 0;
+ // CRITICAL-FIXME! (nothing goes without this!)
+ /* (3) schedule job to do it again (or another peer, etc.) */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), // FIXME!
+ &forward_request_task,
+ pr);
+
+ return msize;
+}
+
+
+/**
+ * Function called after we've tried to reserve
+ * a certain amount of bandwidth for a reply.
+ * Check if we succeeded and if so send our query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param latency current latency estimate, "FOREVER" if we have been
+ * disconnected
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
+ */
+static void
+target_reservation_cb (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer,
+ unsigned int bpm_in,
+ unsigned int bpm_out,
+ struct GNUNET_TIME_Relative
+ latency, int amount,
+ unsigned long long preference)
+{
+ struct PendingRequest *pr = cls;
+ uint32_t priority;
+ uint16_t size;
+ struct GNUNET_TIME_Relative maxdelay;
+
+ GNUNET_assert (peer != NULL);
+ if ( (amount != DBLOCK_SIZE) ||
+ (pr->cth != NULL) )
+ {
+ /* try again later; FIXME: we may need to un-reserve "amount"? */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), //
FIXME: longer?
+ &forward_request_task,
+ pr);
+ return;
+ }
+ // (2) transmit, update ttl/priority
+ // FIXME: calculate priority, maxdelay, size properly!
+ priority = 0;
+ size = 60000;
+ maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+ pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+ priority,
+ maxdelay,
+ peer,
+ size,
+ &transmit_request_cb,
+ pr);
+ if (pr->cth == NULL)
+ {
+ /* try again later */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), //
FIXME: longer?
+ &forward_request_task,
+ pr);
+ }
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PendingRequest *pr = cls;
+ struct PeerSelectionContext psc;
+
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ if (pr->cth != NULL)
+ {
+ /* we're busy transmitting a result, wait a bit */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ return;
+ }
+ /* (1) select target */
+ psc.pr = pr;
+ psc.target_score = MINDOUBLE;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &target_peer_select_cb,
+ &psc);
+ if (psc.target_score == MINDOUBLE)
+ {
+ /* no possible target found, wait some time */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), //
FIXME: exponential back-off? or at least wait longer...
+ &forward_request_task,
+ pr);
+ return;
+ }
+ /* (2) reserve reply bandwidth */
+ // FIXME: need a way to cancel; this
+ // async operation is problematic (segv-problematic)
+ // if "pr" is destroyed while it happens!
+ GNUNET_CORE_peer_configure (core,
+ &psc.target,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ -1,
+ DBLOCK_SIZE, // FIXME: make dependent on type?
+ 0,
+ &target_reservation_cb,
+ pr);
+}
+
+
+/**
* We're processing (local) results for a search request
* from a (local) client. Pass applicable results to the
* client and if we are done either clean up (operation
@@ -1547,8 +1898,13 @@
&pr->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- // FIXME: trigger some processing NOW!
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+
GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
local_get_context_free (lgc);
return;
}
@@ -1765,10 +2121,12 @@
cl->tail,
pr->crl_entry);
}
+ if (GNUNET_SCHEDULER_NO_TASK != pr->task)
+ GNUNET_SCHEDULER_cancel (sched, pr->task);
+ if (NULL != pr->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
if (NULL != pr->bf)
GNUNET_CONTAINER_bloomfilter_free (pr->bf);
- if (NULL != pr->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
if (NULL != pr->th)
GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
while (NULL != (reply = pr->replies_pending))
@@ -1883,6 +2241,10 @@
requests_by_peer = NULL;
GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
requests_by_expiration = NULL;
+ // FIXME: iterate over entries and free individually?
+ // (or do we get disconnect notifications?)
+ GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+ connected_peers = NULL;
GNUNET_CONTAINER_multihashmap_destroy (ifm);
ifm = NULL;
while (NULL != (pos = indexed_files))
@@ -1917,7 +2279,30 @@
}
+
/**
+ * Method called whenever a given peer connects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ */
+static void
+peer_connect_handler (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer)
+{
+ struct ConnectedPeer *cp;
+
+ cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+ cp->pid = GNUNET_PEER_intern (peer);
+ GNUNET_CONTAINER_multihashmap_put (connected_peers,
+ &peer->hashPubKey,
+ cp,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+}
+
+
+/**
* Method called whenever a peer disconnects.
*
* @param cls closure, not used
@@ -1928,6 +2313,13 @@
const struct
GNUNET_PeerIdentity * peer)
{
+ struct ConnectedPeer *cp;
+
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &peer->hashPubKey);
+ GNUNET_PEER_change_rc (cp->pid, -1);
+ GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+ GNUNET_free (cp);
GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
&peer->hashPubKey,
&destroy_request,
@@ -1993,11 +2385,15 @@
eer);
destroy_pending_request (eer);
}
- // FIXME: trigger actual forwarding NOW!
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
GNUNET_free (pgc);
}
-
-
/**
* Transmit the given message by copying it to
* the target buffer "buf". "buf" will be
@@ -2831,7 +3227,7 @@
GNUNET_TIME_UNIT_FOREVER_REL,
NULL,
&core_start_cb,
- NULL,
+ &peer_connect_handler,
&peer_disconnect_handler,
NULL,
NULL, GNUNET_NO,
@@ -2860,6 +3256,7 @@
ifm = GNUNET_CONTAINER_multihashmap_create (128);
requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME:
get size from config
requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get
size from config
+ connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
requests_by_expiration = GNUNET_CONTAINER_heap_create
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
read_index_list ();
dsh = GNUNET_DATASTORE_connect (cfg,
Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h 2009-09-25 22:10:51 UTC (rev
9033)
+++ gnunet/src/include/gnunet_container_lib.h 2009-09-27 23:06:55 UTC (rev
9034)
@@ -472,6 +472,7 @@
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
};
+
/**
* Iterator over HashCodes.
*
Modified: gnunet/src/include/gnunet_core_service.h
===================================================================
--- gnunet/src/include/gnunet_core_service.h 2009-09-25 22:10:51 UTC (rev
9033)
+++ gnunet/src/include/gnunet_core_service.h 2009-09-27 23:06:55 UTC (rev
9034)
@@ -197,6 +197,7 @@
/**
* Function called with statistics about the given peer.
*
+ * @param cls closure
* @param peer identifies the peer
* @param latency current latency estimate, "FOREVER" if we have been
* disconnected
@@ -239,6 +240,7 @@
* @param info function to call with the resulting configuration information
* @param info_cls closure for info
*/
+// FIXME: should return handle for cancellation!
void
GNUNET_CORE_peer_configure (struct GNUNET_CORE_Handle *handle,
const struct GNUNET_PeerIdentity *peer,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r9034 - in gnunet/src: fs include,
gnunet <=