[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r13201 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r13201 - gnunet/src/fs |
Date: |
Thu, 7 Oct 2010 15:52:14 +0200 |
Author: grothoff
Date: 2010-10-07 15:52:14 +0200 (Thu, 07 Oct 2010)
New Revision: 13201
Modified:
gnunet/src/fs/fs.h
gnunet/src/fs/gnunet-service-fs.c
Log:
adding support for artificial delays
Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h 2010-10-07 12:52:19 UTC (rev 13200)
+++ gnunet/src/fs/fs.h 2010-10-07 13:52:14 UTC (rev 13201)
@@ -123,12 +123,6 @@
#define AVAILABILITY_TRIALS_MAX 8
/**
- * By how much (in ms) do we decrement the TTL
- * at each hop?
- */
-#define TTL_DECREMENT 5000
-
-/**
* Length of the P2P success tracker. Note that
* having a very long list can also hurt performance.
*/
Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c 2010-10-07 12:52:19 UTC (rev 13200)
+++ gnunet/src/fs/gnunet-service-fs.c 2010-10-07 13:52:14 UTC (rev 13201)
@@ -45,6 +45,13 @@
#define DEBUG_FS GNUNET_NO
/**
+ * Should we introduce random latency in processing? Required for proper
+ * implementation of GAP, but can be disabled for performance evaluation of
+ * the basic routing algorithm.
+ */
+#define SUPPORT_DELAYS GNUNET_NO
+
+/**
* Maximum number of outgoing messages we queue per peer.
*/
#define MAX_QUEUE_PER_PEER 16
@@ -140,6 +147,11 @@
void *cont_cls;
/**
+ * Do not transmit this pending message until this deadline.
+ */
+ struct GNUNET_TIME_Absolute delay_until;
+
+ /**
* Size of the reply; actual reply message follows
* at the end of this struct.
*/
@@ -226,6 +238,11 @@
struct GNUNET_TIME_Absolute last_transmission_request_start;
/**
+ * ID of delay task for scheduling transmission.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+
+ /**
* Average priority of successful replies. Calculated
* as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
*/
@@ -976,7 +993,7 @@
}
/* consider scheduling transmission to cp for content migration */
- if (cp->cth != NULL)
+ if (cp->cth != NULL)
return GNUNET_YES;
msize = 0;
pos = mig_head;
@@ -1004,6 +1021,11 @@
msize,
GNUNET_h2s (key));
#endif
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
cp->cth
= GNUNET_CORE_notify_transmit_ready (core,
0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1336,12 +1358,12 @@
TransmissionContinuation cont;
void *cont_cls;
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
if (pml != NULL)
{
GNUNET_assert (pml->pm == pm);
GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- cont = pm->cont;
- cont_cls = pm->cont_cls;
destroy_pending_message_list_entry (pml);
}
else
@@ -1689,7 +1711,15 @@
GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
if (NULL != cp->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ cp->cth = NULL;
+ }
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (NULL != (pm = cp->pending_messages_head))
destroy_pending_message (pm, 0 /* delivery failed */);
GNUNET_LOAD_value_free (cp->transmission_delay);
@@ -1894,6 +1924,39 @@
/**
+ * We've had to delay a request for transmission to core, but now
+ * we should be ready. Run it.
+ *
+ * @param cls the 'struct ConnectedPeer' for which a request was delayed
+ * @param tc task context (unused)
+ */
+static void
+delayed_transmission_request (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct ConnectedPeer *cp = cls;
+ struct GNUNET_PeerIdentity pid;
+ struct PendingMessage *pm;
+
+ pm = cp->pending_messages_head;
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (cp->cth == NULL);
+ if (pm == NULL)
+ return;
+ GNUNET_PEER_resolve (cp->pid,
+ &pid);
+ cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ pm->priority,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &pid,
+ pm->msize,
+ &transmit_to_peer,
+ cp);
+}
+
+
+/**
* Transmit messages by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
* for writing in the meantime. In that case, do nothing
@@ -1912,13 +1975,16 @@
{
struct ConnectedPeer *cp = cls;
char *cbuf = buf;
- struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
+ struct PendingMessage *next_pm;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative min_delay;
struct MigrationReadyBlock *mb;
struct MigrationReadyBlock *next;
struct PutMessage migm;
size_t msize;
unsigned int i;
+ struct GNUNET_PeerIdentity pid;
cp->cth = NULL;
if (NULL == buf)
@@ -1930,33 +1996,48 @@
GNUNET_LOAD_update (cp->transmission_delay,
UINT64_MAX);
return 0;
- }
+ }
GNUNET_LOAD_update (cp->transmission_delay,
GNUNET_TIME_absolute_get_duration
(cp->last_transmission_request_start).value);
+ now = GNUNET_TIME_absolute_get ();
msize = 0;
- while ( (NULL != (pm = cp->pending_messages_head) ) &&
+ min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+ next_pm = cp->pending_messages_head;
+ while ( (NULL != (pm = next_pm) ) &&
(pm->msize <= size) )
{
+ next_pm = pm->next;
+ if (pm->delay_until.value > now.value)
+ {
+ min_delay = GNUNET_TIME_relative_min (min_delay,
+
GNUNET_TIME_absolute_get_remaining (pm->delay_until));
+ continue;
+ }
memcpy (&cbuf[msize], &pm[1], pm->msize);
msize += pm->msize;
size -= pm->msize;
+ GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+ cp->pending_messages_tail,
+ pm);
+ if (NULL == pm->pml)
+ cp->pending_requests--;
destroy_pending_message (pm, cp->pid);
}
- if (NULL != pm)
- {
+ if (pm != NULL)
+ min_delay = GNUNET_TIME_UNIT_ZERO;
+ if (NULL != cp->pending_messages_head)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ min_delay,
+ &delayed_transmission_request,
+ cp);
+ }
+ if (pm == NULL)
+ {
GNUNET_PEER_resolve (cp->pid,
&pid);
- cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- pm->priority,
-
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- &pid,
- pm->msize,
- &transmit_to_peer,
- cp);
- }
- else
- {
next = mig_head;
while (NULL != (mb = next))
{
@@ -1984,7 +2065,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Pushing migration block `%s' (%u bytes) to
`%s'\n",
GNUNET_h2s (&mb->query),
- mb->size,
+ (unsigned int) mb->size,
GNUNET_i2s (&pid));
#endif
break;
@@ -1995,7 +2076,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Migration block `%s' (%u bytes) is not on
migration list for peer `%s'\n",
GNUNET_h2s (&mb->query),
- mb->size,
+ (unsigned int) mb->size,
GNUNET_i2s (&pid));
#endif
}
@@ -2013,9 +2094,9 @@
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to peer %u\n",
- msize,
- cp->pid);
+ "Transmitting %u bytes to peer with PID %u\n",
+ (unsigned int) msize,
+ (unsigned int) cp->pid);
#endif
return msize;
}
@@ -2063,7 +2144,15 @@
destroy_pending_message (cp->pending_messages_tail, 0);
GNUNET_PEER_resolve (cp->pid, &pid);
if (NULL != cp->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ cp->cth = NULL;
+ }
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
/* need to schedule transmission */
cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
cp->cth = GNUNET_CORE_notify_transmit_ready (core,
@@ -3119,6 +3208,12 @@
reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
reply->cont = &transmit_reply_continuation;
reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+ reply->delay_until
+ = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_MILLISECONDS,
+
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+
TTL_DECREMENT)));
+#endif
reply->msize = msize;
reply->priority = UINT32_MAX; /* send replies first! */
pm = (struct PutMessage*) &reply[1];
@@ -3557,10 +3652,10 @@
prq.priority = priority;
prq.finished = GNUNET_NO;
prq.request_found = GNUNET_NO;
- process_reply (&prq, key, pr);
if ( (old_rf == 0) &&
- (pr->results_found == 1) )
+ (pr->results_found == 0) )
update_datastore_delays (pr->start_time);
+ process_reply (&prq, key, pr);
if (prq.finished == GNUNET_YES)
return;
if (pr->qe == NULL)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r13201 - gnunet/src/fs,
gnunet <=