gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28705 - gnunet/src/testbed


From: gnunet
Subject: [GNUnet-SVN] r28705 - gnunet/src/testbed
Date: Mon, 19 Aug 2013 16:13:19 +0200

Author: harsha
Date: 2013-08-19 16:13:19 +0200 (Mon, 19 Aug 2013)
New Revision: 28705

Modified:
   gnunet/src/testbed/gnunet-service-testbed.c
   gnunet/src/testbed/test_testbed_api_operations.c
   gnunet/src/testbed/testbed_api.c
   gnunet/src/testbed/testbed_api_hosts.c
   gnunet/src/testbed/testbed_api_hosts.h
   gnunet/src/testbed/testbed_api_operations.c
   gnunet/src/testbed/testbed_api_operations.h
   gnunet/src/testbed/testbed_api_peers.c
   gnunet/src/testbed/testbed_api_peers.h
   gnunet/src/testbed/testbed_api_statistics.c
Log:
fix 2893: Move adaptive parallelisation mechanism to operation queues


Modified: gnunet/src/testbed/gnunet-service-testbed.c
===================================================================
--- gnunet/src/testbed/gnunet-service-testbed.c 2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/gnunet-service-testbed.c 2013-08-19 14:13:19 UTC (rev 
28705)
@@ -918,7 +918,8 @@
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONFIGURATION_get_value_number (cfg, "TESTBED",
                                                         "MAX_OPEN_FDS", &num));
-  GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_ ((unsigned int) 
num);
+  GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_
+      (OPERATION_QUEUE_TYPE_FIXED, (unsigned int) num);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONFIGURATION_get_value_time (cfg, "TESTBED",
                                                       "OPERATION_TIMEOUT",

Modified: gnunet/src/testbed/test_testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/test_testbed_api_operations.c    2013-08-19 14:03:01 UTC 
(rev 28704)
+++ gnunet/src/testbed/test_testbed_api_operations.c    2013-08-19 14:13:19 UTC 
(rev 28705)
@@ -490,9 +490,9 @@
 run (void *cls, char *const *args, const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *config)
 {
-  q1 = GNUNET_TESTBED_operation_queue_create_ (1);
+  q1 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 1);
   GNUNET_assert (NULL != q1);
-  q2 = GNUNET_TESTBED_operation_queue_create_ (2);
+  q2 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 2);
   GNUNET_assert (NULL != q2);
   op1 = GNUNET_TESTBED_operation_create_ (&op1, start_cb, release_cb);
   GNUNET_assert (NULL != op1);

Modified: gnunet/src/testbed/testbed_api.c
===================================================================
--- gnunet/src/testbed/testbed_api.c    2013-08-19 14:03:01 UTC (rev 28704)
+++ gnunet/src/testbed/testbed_api.c    2013-08-19 14:13:19 UTC (rev 28705)
@@ -860,7 +860,7 @@
     struct OverlayConnectData *data;
 
     data = opc->data;
-    data->failed = GNUNET_YES;
+    GNUNET_TESTBED_operation_mark_failed (opc->op);
     if (NULL != data->cb)
       data->cb (data->cb_cls, opc->op, emsg);
   }
@@ -1486,13 +1486,15 @@
   GNUNET_TESTBED_mark_host_registered_at_ (host, controller);
   controller->host = host;
   controller->opq_parallel_operations =
-      GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
-                                              max_parallel_operations);
+      GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+                                              (unsigned int) 
max_parallel_operations);
   controller->opq_parallel_service_connections =
-      GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
+      GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+                                              (unsigned int)
                                               
max_parallel_service_connections);
   controller->opq_parallel_topology_config_operations =
-      GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
+      GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+                                              (unsigned int)
                                               
max_parallel_topology_config_operations);
   controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
   if (NULL == controller_hostname)
@@ -1856,13 +1858,25 @@
 
 
 /**
- * Signal that the information from an operation has been fully
- * processed.  This function MUST be called for each event
- * of type 'operation_finished' to fully remove the operation
- * from the operation queue.  After calling this function, the
- * 'op_result' becomes invalid (!).
+ * This function is used to signal that the event information (struct
+ * GNUNET_TESTBED_EventInformation) from an operation has been fully processed
+ * i.e. if the event callback is ever called for this operation. If the event
+ * callback for this operation has not yet been called, calling this function
+ * cancels the operation, frees its resources and ensures the no event is
+ * generated with respect to this operation. Note that however cancelling an
+ * operation does NOT guarantee that the operation will be fully undone (or 
that
+ * nothing ever happened). 
  *
- * @param operation operation to signal completion for
+ * This function MUST be called for every operation to fully remove the
+ * operation from the operation queue.  After calling this function, if
+ * operation is completed and its event information is of type
+ * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!).
+
+ * If the operation is generated from GNUNET_TESTBED_service_connect() then
+ * calling this function on such as operation calls the disconnect adapter if
+ * the connect adapter was ever called.
+ *
+ * @param operation operation to signal completion or cancellation
  */
 void
 GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation)

Modified: gnunet/src/testbed/testbed_api_hosts.c
===================================================================
--- gnunet/src/testbed/testbed_api_hosts.c      2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_hosts.c      2013-08-19 14:13:19 UTC (rev 
28705)
@@ -35,7 +35,6 @@
 #include "testbed_api_hosts.h"
 #include "testbed_helper.h"
 #include "testbed_api_operations.h"
-#include "testbed_api_sd.h"
 
 #include <zlib.h>
 
@@ -97,28 +96,6 @@
 
 
 /**
- * A slot to record time taken by an overlay connect operation
- */
-struct TimeSlot
-{
-  /**
-   * A key to identify this timeslot
-   */
-  void *key;
-
-  /**
-   * Time
-   */
-  struct GNUNET_TIME_Relative time;
-
-  /**
-   * Number of timing values accumulated
-   */
-  unsigned int nvals;
-};
-
-
-/**
  * Opaque handle to a host running experiments managed by the testing 
framework.
  * The master process must be able to SSH to this host without password (via
  * ssh-agent).
@@ -161,28 +138,6 @@
   struct OperationQueue *opq_parallel_overlay_connect_operations;
 
   /**
-   * An array of timing slots; size should be equal to the current number of 
parallel
-   * overlay connects
-   */
-  struct TimeSlot *tslots;
-
-  /**
-   * Handle for SD calculations amount parallel overlay connect operation 
finish
-   * times
-   */
-  struct SDHandle *poc_sd;  
-
-  /**
-   * The number of parallel overlay connects we do currently
-   */
-  unsigned int num_parallel_connects;
-
-  /**
-   * Counter to indicate when all the available time slots are filled
-   */
-  unsigned int tslots_filled;
-
-  /**
    * Is a controller started on this host? FIXME: Is this needed?
    */
   int controller_started;
@@ -382,9 +337,8 @@
   host->port = (0 == port) ? 22 : port;
   host->cfg = GNUNET_CONFIGURATION_dup (cfg);
   host->opq_parallel_overlay_connect_operations =
-      GNUNET_TESTBED_operation_queue_create_ (0);
-  GNUNET_TESTBED_set_num_parallel_overlay_connects_ (host, 1);
-  host->poc_sd = GNUNET_TESTBED_SD_init_ (10);
+      GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_ADAPTIVE, 
+                                              UINT_MAX);
   new_size = host_list_size;
   while (id >= new_size)
     new_size += HOST_LIST_GROW_STEP;
@@ -740,8 +694,6 @@
   GNUNET_free_non_null ((char *) host->hostname);
   GNUNET_TESTBED_operation_queue_destroy_
       (host->opq_parallel_overlay_connect_operations);
-  GNUNET_TESTBED_SD_destroy_ (host->poc_sd);
-  GNUNET_free_non_null (host->tslots);
   GNUNET_CONFIGURATION_destroy (host->cfg);
   GNUNET_free (host);
   while (host_list_size >= HOST_LIST_GROW_STEP)
@@ -1624,192 +1576,6 @@
 
 
 /**
- * Initializes the operation queue for parallel overlay connects
- *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
- */
-void
-GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
-                                                   GNUNET_TESTBED_Host *h,
-                                                   unsigned int npoc)
-{
-  //fprintf (stderr, "%d", npoc);
-  GNUNET_free_non_null (h->tslots);
-  h->tslots_filled = 0;
-  h->num_parallel_connects = npoc;
-  h->tslots = GNUNET_malloc (npoc * sizeof (struct TimeSlot));
-  GNUNET_TESTBED_operation_queue_reset_max_active_
-      (h->opq_parallel_overlay_connect_operations, npoc);
-}
-
-
-/**
- * Returns a timing slot which will be exclusively locked
- *
- * @param h the host handle
- * @param key a pointer which is associated to the returned slot; should not be
- *          NULL. It serves as a key to determine the correct owner of the slot
- * @return the time slot index in the array of time slots in the controller
- *           handle
- */
-unsigned int
-GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key)
-{
-  unsigned int slot;
-
-  GNUNET_assert (NULL != h->tslots);
-  GNUNET_assert (NULL != key);
-  for (slot = 0; slot < h->num_parallel_connects; slot++)
-    if (NULL == h->tslots[slot].key)
-    {
-      h->tslots[slot].key = key;
-      return slot;
-    }
-  GNUNET_assert (0);            /* We should always find a free tslot */
-}
-
-
-/**
- * Decides whether any change in the number of parallel overlay connects is
- * necessary to adapt to the load on the system
- *
- * @param h the host handle
- */
-static void
-decide_npoc (struct GNUNET_TESTBED_Host *h)
-{
-  struct GNUNET_TIME_Relative avg;
-  int sd;
-  unsigned int slot;
-  unsigned int nvals;
-
-  if (h->tslots_filled != h->num_parallel_connects)
-    return;
-  avg = GNUNET_TIME_UNIT_ZERO;
-  nvals = 0;
-  for (slot = 0; slot < h->num_parallel_connects; slot++)
-  {
-    avg = GNUNET_TIME_relative_add (avg, h->tslots[slot].time);
-    nvals += h->tslots[slot].nvals;
-  }
-  GNUNET_assert (nvals >= h->num_parallel_connects);
-  avg = GNUNET_TIME_relative_divide (avg, nvals);
-  GNUNET_assert (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != 
avg.rel_value_us);
-  sd = GNUNET_TESTBED_SD_deviation_factor_ (h->poc_sd, (unsigned int) 
avg.rel_value_us);
-  if ( (sd <= 5) ||
-       (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                      h->num_parallel_connects)) )
-    GNUNET_TESTBED_SD_add_data_ (h->poc_sd, (unsigned int) avg.rel_value_us);
-  if (GNUNET_SYSERR == sd)
-  {
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                       
h->num_parallel_connects);
-    return;
-  }
-  GNUNET_assert (0 <= sd);
-  if (0 == sd)
-  {
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                       h->num_parallel_connects
-                                                       * 2);
-    return;
-  }
-  if (1 == sd)
-  {
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                       h->num_parallel_connects
-                                                       + 1);
-    return;
-  }
-  if (1 == h->num_parallel_connects)
-  {
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
-    return;
-  }
-  if (2 == sd)
-  {
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                       h->num_parallel_connects
-                                                       - 1);
-    return;
-  }
-  GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                     h->num_parallel_connects /
-                                                     2);
-}
-
-
-/**
- * Releases a time slot thus making it available for be used again
- *
- * @param h the host handle
- * @param index the index of the the time slot
- * @param key the key to prove ownership of the timeslot
- * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if 
the
- *           time slot cannot be removed - this could be because of the index
- *           greater than existing number of time slots or `key' being 
different
- */
-int
-GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
-                                   unsigned int index, void *key)
-{
-  struct TimeSlot *slot;
-
-  GNUNET_assert (NULL != key);
-  if (index >= h->num_parallel_connects)
-    return GNUNET_NO;
-  slot = &h->tslots[index];
-  if (key != slot->key)
-    return GNUNET_NO;
-  slot->key = NULL;
-  return GNUNET_YES;
-}
-
-
-/**
- * Function to update a time slot
- *
- * @param h the host handle
- * @param index the index of the time slot to update
- * @param key the key to identify ownership of the slot
- * @param time the new time
- * @param failed should this reading be treated as coming from a fail event
- */
-void
-GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
-                                  unsigned int index, void *key,
-                                  struct GNUNET_TIME_Relative time, int failed)
-{
-  struct TimeSlot *slot;
-
-  if (GNUNET_YES == failed)
-  {
-    if (1 == h->num_parallel_connects)
-    {
-      GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
-      return;
-    }
-    GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-                                                       h->num_parallel_connects
-                                                       - 1);
-  }
-  if (GNUNET_NO == GNUNET_TESTBED_release_time_slot_ (h, index, key))
-    return;
-  slot = &h->tslots[index];
-  slot->nvals++;
-  if (GNUNET_TIME_UNIT_ZERO.rel_value_us == slot->time.rel_value_us)
-  {
-    slot->time = time;
-    h->tslots_filled++;
-    decide_npoc (h);
-    return;
-  }
-  slot->time = GNUNET_TIME_relative_add (slot->time, time);
-}
-
-
-/**
  * Queues the given operation in the queue for parallel overlay connects of the
  * given host
  *

Modified: gnunet/src/testbed/testbed_api_hosts.h
===================================================================
--- gnunet/src/testbed/testbed_api_hosts.h      2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_hosts.h      2013-08-19 14:13:19 UTC (rev 
28705)
@@ -151,61 +151,6 @@
 
 
 /**
- * (re)sets the operation queue for parallel overlay connects
- *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
- */
-void
-GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
-                                                   GNUNET_TESTBED_Host *h,
-                                                   unsigned int npoc);
-
-
-/**
- * Releases a time slot thus making it available for be used again
- *
- * @param h the host handle
- * @param index the index of the the time slot
- * @param key the key to prove ownership of the timeslot
- * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if 
the
- *           time slot cannot be removed - this could be because of the index
- *           greater than existing number of time slots or `key' being 
different
- */
-int
-GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
-                                   unsigned int index, void *key);
-
-
-/**
- * Function to update a time slot
- *
- * @param h the host handle
- * @param index the index of the time slot to update
- * @param key the key to identify ownership of the slot
- * @param time the new time
- * @param failed should this reading be treated as coming from a fail event
- */
-void
-GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
-                                  unsigned int index, void *key,
-                                  struct GNUNET_TIME_Relative time, int 
failed);
-
-
-/**
- * Returns a timing slot which will be exclusively locked
- *
- * @param h the host handle
- * @param key a pointer which is associated to the returned slot; should not be
- *          NULL. It serves as a key to determine the correct owner of the slot
- * @return the time slot index in the array of time slots in the controller
- *           handle
- */
-unsigned int
-GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key);
-
-
-/**
  * Queues the given operation in the queue for parallel overlay connects of the
  * given host
  *

Modified: gnunet/src/testbed/testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/testbed_api_operations.c 2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_operations.c 2013-08-19 14:13:19 UTC (rev 
28705)
@@ -27,6 +27,7 @@
 
 #include "platform.h"
 #include "testbed_api_operations.h"
+#include "testbed_api_sd.h"
 
 
 /**
@@ -60,6 +61,89 @@
  * Queue of operations where we can only support a certain
  * number of concurrent operations of a particular type.
  */
+struct OperationQueue;
+
+
+/**
+ * A slot to record time taken by an operation
+ */
+struct TimeSlot
+{
+  /**
+   * DLL next pointer
+   */
+  struct TimeSlot *next;
+
+  /**
+   * DLL prev pointer
+   */
+  struct TimeSlot *prev;
+
+  /**
+   * This operation queue to which this time slot belongs to
+   */
+  struct OperationQueue *queue;
+
+  /**
+   * The operation to which this timeslot is currently allocated to
+   */
+  struct GNUNET_TESTBED_Operation *op;
+
+  /**
+   * Accumulated time
+   */
+  struct GNUNET_TIME_Relative tsum;
+
+  /**
+   * Number of timing values accumulated
+   */
+  unsigned int nvals;
+};
+
+
+/**
+ * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
+ */
+struct FeedbackCtx
+{
+  /**
+   * Handle for calculating standard deviation
+   */
+  struct SDHandle *sd;
+
+  /**
+   * Head for DLL of time slots which are free to be allocated to operations
+   */
+  struct TimeSlot *alloc_head;
+    
+  /**
+   * Tail for DLL of time slots which are free to be allocated to operations
+   */
+  struct TimeSlot *alloc_tail;
+
+  /**
+   * Pointer to the chunk of time slots.  Free all time slots at a time using
+   * this pointer.
+   */
+  struct TimeSlot *tslots_freeptr;
+
+  /**
+   * Number of time slots filled so far
+   */
+  unsigned int tslots_filled;
+  
+  /**
+   * Bound on the maximum number of operations which can be active
+   */
+  unsigned int max_active_bound;
+
+};
+
+
+/**
+ * Queue of operations where we can only support a certain
+ * number of concurrent operations of a particular type.
+ */
 struct OperationQueue
 {
   /**
@@ -108,12 +192,26 @@
   struct QueueEntry *nq_tail;
 
   /**
+   * Feedback context; only relevant for adaptive operation queues.  NULL for
+   * fixed operation queues
+   */
+  struct FeedbackCtx *fctx;
+
+  /**
+   * The type of this opeartion queue
+   */
+  enum OperationQueueType type;
+
+  /**
    * Number of operations that are currently active in this queue.
    */
   unsigned int active;
 
   /**
-   * Max number of operations which can be active at any time in this queue
+   * Max number of operations which can be active at any time in this queue.
+   * This value can be changed either by calling
+   * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
+   * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
    */
   unsigned int max_active;
 
@@ -222,6 +320,21 @@
   struct ReadyQueueEntry *rq_entry;
 
   /**
+   * Head pointer for DLL of tslots allocated to this operation
+   */
+  struct TimeSlot *tslots_head;
+
+  /**
+   * Tail pointer for DLL of tslots allocated to this operation
+   */
+  struct TimeSlot *tslots_tail;
+
+  /**
+   * The time at which the operation is started
+   */
+  struct GNUNET_TIME_Absolute tstart;
+
+  /**
    * Number of queues in the operation queues array
    */
   unsigned int nqueues;
@@ -231,6 +344,11 @@
    */
   enum OperationState state;
 
+  /**
+   * Is this a failed operation?
+   */
+  int failed;
+
 };
 
 /**
@@ -250,6 +368,29 @@
 
 
 /**
+ * Assigns the given operation a time slot from the given operation queue
+ *
+ * @param op the operation
+ * @param queue the operation queue
+ * @return the timeslot
+ */
+static void
+assign_timeslot (struct GNUNET_TESTBED_Operation *op,
+                 struct OperationQueue *queue)
+{
+  struct FeedbackCtx *fctx = queue->fctx;
+  struct TimeSlot *tslot;
+
+  GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
+  tslot = fctx->alloc_head;
+  GNUNET_assert (NULL != tslot);
+  GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
+  GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
+  tslot->op = op;
+}
+
+
+/**
  * Removes a queue entry of an operation from one of the operation queues' 
lists
  * depending on the state of the operation
  *
@@ -378,6 +519,8 @@
 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_TESTBED_Operation *op;
+  struct OperationQueue *queue;
+  unsigned int cnt;
 
   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL != rq_head);
@@ -386,8 +529,15 @@
   if (NULL != rq_head)
     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
   change_state (op, OP_STATE_ACTIVE);
+  for (cnt = 0; cnt < op->nqueues; cnt++)
+  {
+    queue = op->queues[cnt];
+    if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+      assign_timeslot (op, queue);
+  }
+  op->tstart = GNUNET_TIME_absolute_get ();
   if (NULL != op->start)
-    op->start (op->cb_cls);  
+    op->start (op->cb_cls);
 }
 
 
@@ -582,7 +732,7 @@
   if (NULL != evict_ops)
   {
     for (i = 0; i < n_evict_ops; i++)
-      GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+      GNUNET_TESTBED_operation_release_ (evict_ops[i]); 
     GNUNET_free (evict_ops);
     evict_ops = NULL;
     /* Evicting the operations should schedule this operation */
@@ -619,6 +769,162 @@
 
 
 /**
+ * Cleanups the array of timeslots of an operation queue.  For each time slot 
in
+ * the array, if it is allocated to an operation, it will be deallocated from
+ * the operation
+ *
+ * @param queue the operation queue
+ */
+static void
+cleanup_tslots (struct OperationQueue *queue)
+{
+  struct FeedbackCtx *fctx = queue->fctx;
+  struct TimeSlot *tslot;
+  struct GNUNET_TESTBED_Operation *op;
+  unsigned int cnt;
+
+  GNUNET_assert (NULL != fctx);
+  for (cnt = 0; cnt < queue->max_active; cnt++)
+  {
+    tslot = &fctx->tslots_freeptr[cnt];
+    op = tslot->op;
+    if (NULL == op)
+      continue;
+    GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+  }
+  GNUNET_free_non_null (fctx->tslots_freeptr);
+  fctx->tslots_freeptr = NULL;
+  fctx->alloc_head = NULL;
+  fctx->alloc_tail = NULL;
+  fctx->tslots_filled = 0;
+}
+
+
+/**
+ * Initializes the operation queue for parallel overlay connects
+ *
+ * @param h the host handle
+ * @param npoc the number of parallel overlay connects - the queue size
+ */
+static void
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
+{
+  struct FeedbackCtx *fctx = queue->fctx;
+  struct TimeSlot *tslot;
+  unsigned int cnt;
+  
+  cleanup_tslots (queue);
+  n = GNUNET_MIN (n ,fctx->max_active_bound);
+  fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+  for (cnt = 0; cnt < n; cnt++)
+  {
+    tslot = &fctx->tslots_freeptr[cnt];
+    tslot->queue = queue;
+    GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, 
tslot);
+  }
+  GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
+}
+
+
+/**
+ * Adapts parallelism in an adaptive queue by using the statistical data from
+ * the feedback context.
+ *
+ * @param queue the queue
+ * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
+ */
+static void
+adapt_parallelism (struct OperationQueue *queue, int fail)
+{
+  struct GNUNET_TIME_Relative avg;
+  struct FeedbackCtx *fctx;
+  struct TimeSlot *tslot;
+  int sd;
+  unsigned int nvals;
+  unsigned int cnt;
+
+  avg = GNUNET_TIME_UNIT_ZERO;
+  nvals = 0;
+  fctx = queue->fctx;
+  for (cnt = 0; cnt < queue->max_active; cnt++)
+  {
+    tslot = &fctx->tslots_freeptr[cnt];
+    avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+    nvals += tslot->nvals;
+  }
+  GNUNET_assert (nvals >= queue->max_active);
+  avg = GNUNET_TIME_relative_divide (avg, nvals);
+  sd = GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int)
+                                            avg.rel_value_us);
+  if ( (sd <= 5) ||
+       (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                      queue->max_active)) )
+    GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+  if (GNUNET_SYSERR == sd)
+  {
+    adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+    return;
+  }
+  GNUNET_assert (0 <= sd);
+  if ((0 == sd) && (! fail))
+  {
+    adaptive_queue_set_max_active (queue, queue->max_active * 2);
+    return;
+  }
+  if ((1 == sd) && (! fail))
+  {
+    adaptive_queue_set_max_active (queue, queue->max_active + 1);
+    return;
+  }
+  if (1 == queue->max_active)
+  {
+    adaptive_queue_set_max_active (queue, 1);
+    return;
+  }
+  if (((sd < 2) && (fail)) || (2 == sd))
+  {
+    adaptive_queue_set_max_active (queue, queue->max_active - 1);
+    return;
+  }
+  adaptive_queue_set_max_active (queue, queue->max_active / 2);
+}
+
+
+/**
+ * update tslots with the operation's completion time.  Additionally, if
+ * updating a timeslot makes all timeslots filled in an adaptive operation
+ * queue, call adapt_parallelism() for that queue.
+ *
+ * @param op the operation
+ */
+static void
+update_tslots (struct GNUNET_TESTBED_Operation *op)
+{
+  struct OperationQueue *queue;
+  struct GNUNET_TIME_Relative t;
+  struct TimeSlot *tslot;
+  struct FeedbackCtx *fctx;
+  
+  t = GNUNET_TIME_absolute_get_duration (op->tstart);
+  while (NULL != (tslot = op->tslots_head)) /* update time slots */
+  {
+    queue = tslot->queue;
+    fctx = queue->fctx;
+    tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+    GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+    tslot->op = NULL;    
+    GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+  tslot);
+    if (0 != tslot->nvals++)
+      continue;
+    fctx->tslots_filled++;
+    if (queue->max_active == fctx->tslots_filled)
+      adapt_parallelism (queue, op->failed);
+  }
+}
+
+
+/**
  * Create an 'operation' to be performed.
  *
  * @param cls closure for the callbacks
@@ -644,17 +950,32 @@
 /**
  * Create an operation queue.
  *
+ * @param type the type of operation queue
  * @param max_active maximum number of operations in this
  *        queue that can be active in parallel at the same time
  * @return handle to the queue
  */
 struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+                                        unsigned int max_active)
 {
   struct OperationQueue *queue;
+  struct FeedbackCtx *fctx;
 
   queue = GNUNET_malloc (sizeof (struct OperationQueue));
-  queue->max_active = max_active;
+  queue->type = type;
+  if (OPERATION_QUEUE_TYPE_FIXED == type)
+  {
+    queue->max_active = max_active;
+  }
+  else
+  {
+    fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
+    fctx->max_active_bound = max_active;
+    fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+    queue->fctx = fctx;
+    adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+  }
   return queue;
 }
 
@@ -668,7 +989,16 @@
 void
 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 {
+  struct FeedbackCtx *fctx;
+  
   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
+  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+  {
+    cleanup_tslots (queue);
+    fctx = queue->fctx;
+    GNUNET_TESTBED_SD_destroy_ (fctx->sd);
+    GNUNET_free (fctx);
+  }
   GNUNET_free (queue);
 }
 
@@ -867,8 +1197,10 @@
     rq_remove (op);
   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
     GNUNET_TESTBED_operation_activate_ (op);
+  if (OP_STATE_ACTIVE == op->state)
+    update_tslots (op);
   GNUNET_assert (NULL != op->queues);
-  GNUNET_assert (NULL != op->qentries);
+  GNUNET_assert (NULL != op->qentries);  
   for (i = 0; i < op->nqueues; i++)
   {
     entry = op->qentries[i];
@@ -882,8 +1214,8 @@
       break;
     case OP_STATE_WAITING:      
       break;
+    case OP_STATE_ACTIVE:
     case OP_STATE_READY:
-    case OP_STATE_ACTIVE:
       GNUNET_assert (0 != opq->active);
       GNUNET_assert (opq->active >= entry->nres);
       opq->active -= entry->nres;
@@ -901,4 +1233,16 @@
 }
 
 
+/**
+ * Marks an operation as failed
+ *
+ * @param op the operation to be marked as failed
+ */
+void
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
+{
+  op->failed = GNUNET_YES;
+}
+
+
 /* end of testbed_api_operations.c */

Modified: gnunet/src/testbed/testbed_api_operations.h
===================================================================
--- gnunet/src/testbed/testbed_api_operations.h 2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_operations.h 2013-08-19 14:13:19 UTC (rev 
28705)
@@ -38,14 +38,35 @@
 
 
 /**
+ * The type of operation queue
+ */
+enum OperationQueueType
+{
+  /**
+   * Operation queue which permits a fixed maximum number of operations to be
+   * active at any time
+   */
+  OPERATION_QUEUE_TYPE_FIXED,
+
+  /**
+   * Operation queue which adapts the number of operations to be active based 
on
+   * the operation completion times of previously executed operation in it
+   */
+  OPERATION_QUEUE_TYPE_ADAPTIVE
+};
+
+
+/**
  * Create an operation queue.
  *
- * @param max_active maximum number of operations in this
- *        queue that can be active in parallel at the same time
+ * @param type the type of operation queue
+ * @param max_active maximum number of operations in this queue that can be
+ *   active in parallel at the same time.
  * @return handle to the queue
  */
 struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active);
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+                                        unsigned int max_active);
 
 
 /**
@@ -199,5 +220,14 @@
 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op);
 
 
+/**
+ * Marks an operation as failed
+ *
+ * @param op the operation to be marked as failed
+ */
+void
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op);
+
+
 #endif
 /* end of testbed_api_operations.h */

Modified: gnunet/src/testbed/testbed_api_peers.c
===================================================================
--- gnunet/src/testbed/testbed_api_peers.c      2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_peers.c      2013-08-19 14:13:19 UTC (rev 
28705)
@@ -410,8 +410,6 @@
   opc->state = OPC_STATE_STARTED;
   data = opc->data;
   GNUNET_assert (NULL != data);
-  data->tslot_index = GNUNET_TESTBED_get_tslot_ (data->p1->host, data);
-  data->tstart = GNUNET_TIME_absolute_get ();
   msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
   msg->header.size =
       htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
@@ -434,7 +432,6 @@
 oprelease_overlay_connect (void *cls)
 {
   struct OperationContext *opc = cls;
-  struct GNUNET_TIME_Relative duration;
   struct OverlayConnectData *data;
 
   data = opc->data;
@@ -443,14 +440,10 @@
   case OPC_STATE_INIT:
     break;
   case OPC_STATE_STARTED:
-    (void) GNUNET_TESTBED_release_time_slot_ (data->p1->host, 
data->tslot_index,
-                                              data);
     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
     break;
   case OPC_STATE_FINISHED:
-    duration = GNUNET_TIME_absolute_get_duration (data->tstart);
-    GNUNET_TESTBED_update_time_slot_ (data->p1->host, data->tslot_index, data,
-                                      duration, data->failed);
+    break;
   }
   GNUNET_free (data);
   GNUNET_free (opc);

Modified: gnunet/src/testbed/testbed_api_peers.h
===================================================================
--- gnunet/src/testbed/testbed_api_peers.h      2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_peers.h      2013-08-19 14:13:19 UTC (rev 
28705)
@@ -250,21 +250,6 @@
    */
   struct OperationContext *sub_opc;
 
-  /**
-   * The starting time of this operation
-   */
-  struct GNUNET_TIME_Absolute tstart;
-
-  /**
-   * Has this operation failed
-   */
-  int failed;
-
-  /**
-   * The timing slot index for this operation
-   */
-  unsigned int tslot_index;
-
 };
 
 

Modified: gnunet/src/testbed/testbed_api_statistics.c
===================================================================
--- gnunet/src/testbed/testbed_api_statistics.c 2013-08-19 14:03:01 UTC (rev 
28704)
+++ gnunet/src/testbed/testbed_api_statistics.c 2013-08-19 14:13:19 UTC (rev 
28705)
@@ -415,8 +415,8 @@
   GNUNET_assert (NULL != proc);
   GNUNET_assert (NULL != cont);
   if (NULL == no_wait_queue)
-    no_wait_queue =
-        GNUNET_TESTBED_operation_queue_create_ (UINT_MAX);
+    no_wait_queue = GNUNET_TESTBED_operation_queue_create_ 
+        (OPERATION_QUEUE_TYPE_FIXED, UINT_MAX);
   sc = GNUNET_malloc (sizeof (struct GetStatsContext));
   sc->peers = peers;
   sc->subsystem = (NULL == subsystem) ? NULL : GNUNET_strdup (subsystem);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]