[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28350 - gnunet/src/experimentation
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28350 - gnunet/src/experimentation |
Date: |
Thu, 1 Aug 2013 14:25:15 +0200 |
Author: wachs
Date: 2013-08-01 14:25:15 +0200 (Thu, 01 Aug 2013)
New Revision: 28350
Modified:
gnunet/src/experimentation/gnunet-daemon-experimentation.h
gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
Log:
changes to scheduler
Modified: gnunet/src/experimentation/gnunet-daemon-experimentation.h
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation.h 2013-08-01
11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation.h 2013-08-01
12:25:15 UTC (rev 28350)
@@ -399,10 +399,14 @@
/**
- * Start the scheduler component
+ * Add a new experiment for a node
+ *
+ * @param n the node
+ * @param e the experiment
+ * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
*/
void
-GED_scheduler_add (struct Node *n, struct Experiment *e);
+GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound);
/**
* Start the scheduler component
Modified: gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
2013-08-01 11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
2013-08-01 12:25:15 UTC (rev 28350)
@@ -317,7 +317,7 @@
GNUNET_i2s (&n->id));
/* Tell the scheduler to add a node with an experiment */
- GED_scheduler_add (n, e);
+ GED_scheduler_add (n, e, GNUNET_YES);
counter ++;
}
@@ -786,9 +786,6 @@
GNUNET_break (0);
return;
}
-
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer
%s for experiment `%s'\n"),
- "STOP", GNUNET_i2s (peer), name);
GED_scheduler_handle_stop (n, e);
}
Modified: gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
2013-08-01 11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
2013-08-01 12:25:15 UTC (rev 28350)
@@ -64,19 +64,41 @@
struct Experiment *e;
struct Node *n;
int state;
+ int outbound;
GNUNET_SCHEDULER_TaskIdentifier task;
};
-struct ScheduledExperiment *waiting_head;
-struct ScheduledExperiment *waiting_tail;
+struct ScheduledExperiment *waiting_in_head;
+struct ScheduledExperiment *waiting_in_tail;
-struct ScheduledExperiment *running_head;
-struct ScheduledExperiment *running_tail;
+struct ScheduledExperiment *running_in_head;
+struct ScheduledExperiment *running_in_tail;
+struct ScheduledExperiment *waiting_out_head;
+struct ScheduledExperiment *waiting_out_tail;
+
+struct ScheduledExperiment *running_out_head;
+struct ScheduledExperiment *running_out_tail;
+
+
static unsigned int experiments_scheduled;
static unsigned int experiments_running;
static unsigned int experiments_requested;
+
+static struct ScheduledExperiment *
+find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment
*tail,
+ struct Node
*n, struct Experiment *e, int outbound)
+{
+ struct ScheduledExperiment *cur;
+ for (cur = head; NULL != cur; cur = cur->next)
+ {
+ if ((cur->n == n) && (cur->e == e) && (cur->outbound ==
outbound)) /* Node and experiment are equal */
+ break;
+ }
+ return cur;
+}
+
static void
request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
{
@@ -86,19 +108,19 @@
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to
request for experiment `%s'\n",
GNUNET_i2s (&se->n->id), se->e->name);
- GNUNET_CONTAINER_DLL_remove (waiting_head, waiting_tail, se);
+ GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
GNUNET_free (se);
/* Remove experiment */
-
GNUNET_assert (experiments_requested > 0);
experiments_requested --;
GNUNET_STATISTICS_set (GED_stats, "# experiments requested",
experiments_requested, GNUNET_NO);
}
-static void start_experiment (void *cls,const struct
GNUNET_SCHEDULER_TaskContext* tc)
+static void run_experiment_inbound (void *cls,const struct
GNUNET_SCHEDULER_TaskContext* tc)
{
struct ScheduledExperiment *se = cls;
+ struct GNUNET_TIME_Relative start;
struct GNUNET_TIME_Relative end;
struct GNUNET_TIME_Relative backoff;
@@ -111,15 +133,78 @@
backoff.rel_value += GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK, 1000);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to
peer `%s' for `%s' for %llu ms\n",
GNUNET_i2s (&se->n->id), se->e->name, (unsigned
long long) backoff.rel_value);
- se->task = GNUNET_SCHEDULER_add_delayed (backoff,
&start_experiment, se);
+ se->task = GNUNET_SCHEDULER_add_delayed (backoff,
&run_experiment_inbound, se);
return;
}
else if (BUSY == se->state)
se->state = NOT_RUNNING;
- if (NOT_RUNNING == se->state)
+ switch (se->state) {
+ case NOT_RUNNING:
+ /* Send START_ACK message */
+ //GED_nodes_request_start (se->n, se->e);
+ se->state = REQUESTED;
+ /* Schedule to run */
+ start =
GNUNET_TIME_absolute_get_remaining(se->e->start);
+ if (0 == start.rel_value)
+ se->task = GNUNET_SCHEDULER_add_now
(&run_experiment_inbound, se);
+ else
+ se->task = GNUNET_SCHEDULER_add_delayed
(start, &run_experiment_inbound, se);
+ break;
+ case REQUESTED:
+ /* Already requested */
+ se->state = STARTED;
+ case STARTED:
+ /* Experiment is running */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment
`%s' peer for `%s'\n",
+ GNUNET_i2s (&se->n->id), se->e->name);
+
+ /* do work here */
+
+ /* Reschedule */
+ end =
GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop,
se->e->frequency));
+ if (0 == end.rel_value)
+ {
+ se->state = STOPPED;
+ return; /* End of experiment is reached */
+ }
+ /* Reschedule */
+ se->task = GNUNET_SCHEDULER_add_delayed
(se->e->frequency, &run_experiment_inbound, se);
+ break;
+ case STOPPED:
+ /* Experiment expired */
+ break;
+ default:
+ break;
+ }
+
+}
+
+static void run_experiment_outbound (void *cls,const struct
GNUNET_SCHEDULER_TaskContext* tc)
+{
+ struct ScheduledExperiment *se = cls;
+ struct GNUNET_TIME_Relative end;
+ struct GNUNET_TIME_Relative backoff;
+
+ se->task = GNUNET_SCHEDULER_NO_TASK;
+
+ if (GNUNET_NO == GED_nodes_rts (se->n))
{
- /* Send start message */
+ /* Cannot send to peer, core is busy */
+ se->state = BUSY;
+ backoff = GNUNET_TIME_UNIT_SECONDS;
+ backoff.rel_value += GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK, 1000);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to
peer `%s' for `%s' for %llu ms\n",
+ GNUNET_i2s (&se->n->id), se->e->name, (unsigned
long long) backoff.rel_value);
+ se->task = GNUNET_SCHEDULER_add_delayed (backoff,
&run_experiment_outbound, se);
+ return;
+ }
+ else if (BUSY == se->state)
+ se->state = NOT_RUNNING; /* Not busy anymore, can send
*/
+
+ switch (se->state) {
+ case NOT_RUNNING:
+ /* Send START message */
GED_nodes_request_start (se->n, se->e);
se->state = REQUESTED;
se->task = GNUNET_SCHEDULER_add_delayed
(EXP_RESPONSE_TIMEOUT, &request_timeout, se);
@@ -128,15 +213,12 @@
GNUNET_i2s (&se->n->id), se->e->name);
experiments_requested ++;
GNUNET_STATISTICS_set (GED_stats, "# experiments
requested", experiments_requested, GNUNET_NO);
- return;
- }
- else if (REQUESTED == se->state)
- {
- /* Already requested */
- return;
- }
- else if (STARTED == se->state)
- {
+ break;
+ case REQUESTED:
+ /* Expecting START_ACK */
+ GNUNET_break (0);
+ break;
+ case STARTED:
/* Experiment is running */
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment
`%s' peer for `%s'\n",
GNUNET_i2s (&se->n->id), se->e->name);
@@ -151,15 +233,17 @@
return; /* End of experiment is reached */
}
/* Reschedule */
- se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency,
&start_experiment, se);
- }
-
- else if (STOPPED == se->state)
- {
+ se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency,
&run_experiment_outbound, se);
+ break;
+ case STOPPED:
/* Experiment expired */
+ break;
+ default:
+ break;
}
}
+
/**
* Handle a START message from a remote node
*
@@ -169,7 +253,21 @@
void
GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
{
+ struct ScheduledExperiment *se;
+ if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail,
n, e, GNUNET_NO))) ||
+ (NULL != (se = find_experiment (running_in_head,
running_in_tail, n, e, GNUNET_NO))))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s
message from peer %s for experiment `%s'\n"),
+ "START", GNUNET_i2s (&n->id), e->name);
+ GNUNET_break_op (0);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer
%s for experiment `%s'\n"),
+ "START", GNUNET_i2s (&n->id), e->name);
+
+ GED_scheduler_add (n, e, GNUNET_NO);
}
/**
@@ -181,7 +279,20 @@
void
GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
{
+ struct ScheduledExperiment *se;
+ if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n,
e, GNUNET_NO)))
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer
%s for requested experiment `%s'\n"),
+ "START_ACK", GNUNET_i2s (&n->id), e->name);
+
+ if (GNUNET_SCHEDULER_NO_TASK != se->task)
+ GNUNET_SCHEDULER_cancel (se->task);
+ se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
}
@@ -194,7 +305,23 @@
void
GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
{
+ struct ScheduledExperiment *se;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer
%s for experiment `%s'\n"),
+ "STOP", GNUNET_i2s (&n->id), e->name);
+
+ if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n,
e, GNUNET_NO)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message
from peer %s for waiting experiment `%s'\n"),
+ "STOP", GNUNET_i2s (&n->id), e->name);
+ }
+
+ if (NULL != (se = find_experiment (running_in_head, running_in_tail, n,
e, GNUNET_NO)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message
from peer %s for running experiment `%s'\n"),
+ "STOP", GNUNET_i2s (&n->id), e->name);
+ }
+
}
/**
@@ -202,35 +329,51 @@
*
* @param n the node
* @param e the experiment
+ * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
*/
void
-GED_scheduler_add (struct Node *n, struct Experiment *e)
+GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
{
struct ScheduledExperiment *se;
struct GNUNET_TIME_Relative start;
struct GNUNET_TIME_Relative end;
+ GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
+
start = GNUNET_TIME_absolute_get_remaining(e->start);
end = GNUNET_TIME_absolute_get_remaining(e->stop);
if (0 == end.rel_value)
return; /* End of experiment is reached */
/* Add additional checks here if required */
-
se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
se->state = NOT_RUNNING;
+ se->outbound = outbound;
se->e = e;
se->n = n;
- if (0 == start.rel_value)
- se->task = GNUNET_SCHEDULER_add_now (&start_experiment,
se);
+
+ if (GNUNET_YES == outbound)
+ {
+ if (0 == start.rel_value)
+ se->task = GNUNET_SCHEDULER_add_now
(&run_experiment_outbound, se);
+ else
+ se->task = GNUNET_SCHEDULER_add_delayed (start,
&run_experiment_outbound, se);
+ GNUNET_CONTAINER_DLL_insert (waiting_out_head,
waiting_out_tail, se);
+ }
else
- se->task = GNUNET_SCHEDULER_add_delayed (start,
&start_experiment, se);
+ {
+ if (0 == start.rel_value)
+ se->task = GNUNET_SCHEDULER_add_now
(&run_experiment_inbound, se);
+ else
+ se->task = GNUNET_SCHEDULER_add_delayed (start,
&run_experiment_inbound, se);
+ GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail,
se);
+ }
- GNUNET_CONTAINER_DLL_insert (waiting_head, waiting_tail, se);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Added experiment `%s' for node to
be scheduled\n",
- e->name, GNUNET_i2s(&se->n->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node
to be scheduled\n",
+ (GNUNET_YES == outbound) ? "outbound" : "inbound",
e->name, GNUNET_i2s(&se->n->id));
experiments_scheduled ++;
GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled",
experiments_scheduled, GNUNET_NO);
+
}
/**
@@ -253,11 +396,11 @@
struct ScheduledExperiment *cur;
struct ScheduledExperiment *next;
- next = waiting_head;
+ next = waiting_in_head;
while (NULL != (cur = next))
{
next = cur->next;
- GNUNET_CONTAINER_DLL_remove (waiting_head,
waiting_tail, cur);
+ GNUNET_CONTAINER_DLL_remove (waiting_in_head,
waiting_in_tail, cur);
if (GNUNET_SCHEDULER_NO_TASK != cur->task)
{
GNUNET_SCHEDULER_cancel (cur->task);
@@ -269,11 +412,11 @@
GNUNET_STATISTICS_set (GED_stats, "# experiments
scheduled", experiments_scheduled, GNUNET_NO);
}
- next = running_head;
+ next = running_in_head;
while (NULL != (cur = next))
{
next = cur->next;
- GNUNET_CONTAINER_DLL_remove (running_head,
running_tail, cur);
+ GNUNET_CONTAINER_DLL_remove (running_in_head,
running_in_tail, cur);
if (GNUNET_SCHEDULER_NO_TASK != cur->task)
{
GNUNET_SCHEDULER_cancel (cur->task);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28350 - gnunet/src/experimentation,
gnunet <=