gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26819 - gnunet/src/testbed


From: gnunet
Subject: [GNUnet-SVN] r26819 - gnunet/src/testbed
Date: Tue, 9 Apr 2013 15:32:54 +0200

Author: harsha
Date: 2013-04-09 15:32:54 +0200 (Tue, 09 Apr 2013)
New Revision: 26819

Modified:
   gnunet/src/testbed/test_testbed_api_operations.c
   gnunet/src/testbed/testbed_api_operations.c
   gnunet/src/testbed/testbed_api_operations.h
Log:
- maintain separate queues for operations that are in WAITING, READY, and 
STARTED states


Modified: gnunet/src/testbed/test_testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/test_testbed_api_operations.c    2013-04-09 12:08:58 UTC 
(rev 26818)
+++ gnunet/src/testbed/test_testbed_api_operations.c    2013-04-09 13:32:54 UTC 
(rev 26819)
@@ -60,7 +60,7 @@
  * This operation should go into both queues and should consume 2 units of
  * resources on both queues. Since op2 needs a resource from both queues and is
  * queues before this operation, it will be blocked until op2 is released even
- * though q1 has
+ * though q1 has enough free resources
  */
 struct GNUNET_TESTBED_Operation *op3;
 

Modified: gnunet/src/testbed/testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/testbed_api_operations.c 2013-04-09 12:08:58 UTC (rev 
26818)
+++ gnunet/src/testbed/testbed_api_operations.c 2013-04-09 13:32:54 UTC (rev 
26819)
@@ -60,17 +60,7 @@
  */
 struct OperationQueue
 {
- /**
-   * The head of the operation queue
-   */
-  struct QueueEntry *head;
-
   /**
-   * The tail of the operation queue
-   */
-  struct QueueEntry *tail;
-
-  /**
    * DLL head for the wait queue.  Operations which are waiting for this
    * operation queue are put here
    */
@@ -82,6 +72,28 @@
   struct QueueEntry *wq_tail;
 
   /**
+   * DLL head for the ready queue.  Operations which are in this operation 
queue
+   * and are in ready state are put here
+   */
+  struct QueueEntry *rq_head;
+
+  /**
+   * DLL tail for the ready queue
+   */
+  struct QueueEntry *rq_tail;
+
+  /**
+   * DLL head for the active queue.  Operations which are in this operation
+   * queue and are currently active are put here
+   */
+  struct QueueEntry *aq_head;
+
+  /**
+   * DLL tail for the active queue.
+   */
+  struct QueueEntry *aq_tail;
+
+  /**
    * Number of operations that are currently active in this queue.
    */
   unsigned int active;
@@ -214,7 +226,80 @@
  */
 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
 
+void
+remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
+{
+  struct OperationQueue *opq;
+  struct QueueEntry *entry;
+  
+  opq = op->queues[index];
+  entry = op->qentries[index];
+  switch (op->state)
+  {
+  case OP_STATE_INIT:
+    GNUNET_assert (0);
+    break;
+  case OP_STATE_WAITING:
+    GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
+    break;
+  case OP_STATE_READY:
+    GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
+    break;
+  case OP_STATE_STARTED:
+    GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
+    break;
+  }
+}
 
+void
+change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
+{
+  struct QueueEntry *entry;
+  struct OperationQueue *opq;
+  unsigned int cnt;
+  unsigned int s;
+  
+  GNUNET_assert (OP_STATE_INIT != state);
+  GNUNET_assert (NULL != op->queues);
+  GNUNET_assert (NULL != op->nres);
+  GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
+  GNUNET_assert (op->state != state);
+  for (cnt = 0; cnt < op->nqueues; cnt++)
+  {
+    if (OP_STATE_INIT == op->state)
+    {
+      entry = GNUNET_malloc (sizeof (struct QueueEntry));
+      entry->op = op;
+      entry->nres = op->nres[cnt];
+      s = cnt;
+      GNUNET_array_append (op->qentries, s, entry);      
+    }
+    else
+    {
+      entry = op->qentries[cnt];
+      remove_queue_entry (op, cnt);
+    }
+    opq = op->queues[cnt];
+    switch (state)
+    {
+    case OP_STATE_INIT:
+      GNUNET_assert (0);
+      break;
+    case OP_STATE_WAITING:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
+      break;
+    case OP_STATE_READY:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
+      break;
+    case OP_STATE_STARTED:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
+      break;
+    }
+  }
+  op->state = state;
+}
+
+
 /**
  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
  * if the given operation is the last one in the queue.
@@ -256,7 +341,7 @@
   rq_remove (op);
   if (NULL != rq_head)
     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
-  op->state = OP_STATE_STARTED;
+  change_state (op, OP_STATE_STARTED);
   if (NULL != op->start)
     op->start (op->cb_cls);  
 }
@@ -282,48 +367,16 @@
 }
 
 
-void
-wq_add (struct GNUNET_TESTBED_Operation *op)
+static int
+is_queue_empty (struct OperationQueue *opq)
 {
-  struct QueueEntry *entry;
-  struct OperationQueue *opq;
-  unsigned int cnt;
-
-  GNUNET_assert (OP_STATE_WAITING == op->state);
-  GNUNET_assert (NULL == op->qentries);
-  for (cnt = 0; cnt < op->nqueues;)
-  {
-    opq = op->queues[cnt];
-    entry = GNUNET_malloc (sizeof (struct QueueEntry));
-    entry->op = op;
-    entry->nres = op->nres[cnt];
-    GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
-    GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
-  }
+  if ( (NULL != opq->wq_head)
+       || (NULL != opq->rq_head)
+       || (NULL != opq->aq_head) )
+    return GNUNET_NO;
+  return GNUNET_YES;
 }
 
-
-void
-wq_remove (struct GNUNET_TESTBED_Operation *op)
-{
-  struct QueueEntry *entry;
-  struct OperationQueue *opq;
-  unsigned int cnt;
-
-  GNUNET_assert (OP_STATE_WAITING == op->state);
-  GNUNET_assert (NULL != op->qentries);
-  for (cnt = 0; cnt < op->nqueues; cnt ++)
-  {
-    opq = op->queues[cnt];
-    entry = op->qentries[cnt];
-    GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
-    GNUNET_free (entry);
-  }
-  GNUNET_free (op->qentries);
-  op->qentries = NULL;
-}
-
-
 /**
  * Checks for the readiness of an operation and schedules a operation start 
task
  *
@@ -342,10 +395,9 @@
     if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
       return;
   }
-  wq_remove (op);
   for (i = 0; i < op->nqueues; i++)
     op->queues[i]->active += op->nres[i];
-  op->state = OP_STATE_READY;  
+  change_state (op, OP_STATE_READY);
   rq_add (op);
 }
 
@@ -364,8 +416,7 @@
   rq_remove (op);
   for (i = 0; i < op->nqueues; i++)
     op->queues[i]->active--;
-  op->state = OP_STATE_WAITING;
-  wq_add (op);
+  change_state (op, OP_STATE_WAITING);
 }
 
 
@@ -419,8 +470,7 @@
 void
 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 {
-  GNUNET_break (NULL == queue->head);
-  GNUNET_break (NULL == queue->tail);
+  GNUNET_break (GNUNET_YES == is_queue_empty (queue));
   GNUNET_free (queue);
 }
 
@@ -435,13 +485,29 @@
 int
 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
 {
-  if (NULL != queue->head)
+  if (GNUNET_NO == is_queue_empty (queue))
     return GNUNET_NO;
   GNUNET_TESTBED_operation_queue_destroy_ (queue);
   return GNUNET_YES;
 }
 
 
+void
+recheck_waiting (struct OperationQueue *opq)
+{
+  struct QueueEntry *entry;
+  struct QueueEntry *entry2;
+
+  entry = opq->wq_head;
+  while ( (NULL != entry) && (opq->active < opq->max_active) )
+  {
+    entry2 = entry->next;
+    check_readiness (entry->op);
+    entry = entry2;
+  }
+}
+
+
 /**
  * Function to reset the maximum number of operations in the given queue. If
  * max_active is lesser than the number of currently active operations, the
@@ -457,21 +523,10 @@
   struct QueueEntry *entry;
 
   queue->max_active = max_active;
-  entry = queue->head;
-  while ((queue->active > queue->max_active) && (NULL != entry))
-  {
-    if (entry->op->state == OP_STATE_READY)
-      defer (entry->op);
-    entry = entry->next;
-  }
-
-  entry = queue->head;
-  while ((NULL != entry) && (queue->active < queue->max_active))
-  {
-    if (OP_STATE_WAITING == entry->op->state)
-      check_readiness (entry->op);
-    entry = entry->next;
-  }
+  while ( (queue->active > queue->max_active)
+          && (NULL != (entry = queue->rq_head)) )
+    defer (entry->op);
+  recheck_waiting (queue);
 }
 
 
@@ -491,14 +546,9 @@
                                          struct GNUNET_TESTBED_Operation *op,
                                          unsigned int nres)
 {
-  struct QueueEntry *entry;
   unsigned int qsize;
 
   GNUNET_assert (0 < nres);
-  entry = GNUNET_malloc (sizeof (struct QueueEntry));
-  entry->op = op;
-  entry->nres = nres;
-  GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
   qsize = op->nqueues;
   GNUNET_array_append (op->queues, op->nqueues, queue);
   GNUNET_array_append (op->nres, qsize, nres);
@@ -538,55 +588,12 @@
 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
 {
   GNUNET_assert (NULL == op->rq_entry);
-  op->state = OP_STATE_WAITING;
-  wq_add (op);
+  change_state (op, OP_STATE_WAITING);
   check_readiness (op);
 }
 
 
 /**
- * Remove an operation from a queue.  This can be because the
- * oeration was active and has completed (and the resources have
- * been released), or because the operation was cancelled and
- * thus scheduling the operation is no longer required.
- *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
- */
-void
-GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
-                                        struct GNUNET_TESTBED_Operation
-                                        *op)
-{
-  struct QueueEntry *entry;
-
-  for (entry = queue->head; NULL != entry; entry = entry->next)
-    if (entry->op == op)
-      break;
-  GNUNET_assert (NULL != entry);
-  GNUNET_assert (0 < entry->nres);
-  switch (op->state)
-  {
-  case OP_STATE_INIT:
-  case OP_STATE_WAITING:
-    break;
-  case OP_STATE_READY:
-  case OP_STATE_STARTED:
-    GNUNET_assert (0 != queue->active);
-    GNUNET_assert (queue->active >= entry->nres);
-    queue->active -= entry->nres;
-    break;
-  }
-  GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
-  GNUNET_free (entry);
-  entry = queue->wq_head;
-  if (NULL == entry)
-    return;
-  check_readiness (entry->op);
-}
-
-
-/**
  * An operation is 'done' (was cancelled or finished); remove
  * it from the queues and release associated resources.
  *
@@ -595,22 +602,40 @@
 void
 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
 {
+  struct QueueEntry *entry;  
+  struct OperationQueue *opq;
   unsigned int i;
 
-  switch (op->state)
+  if (OP_STATE_INIT == op->state)
   {
-  case OP_STATE_READY:
+    GNUNET_free (op);
+    return;
+  }
+  if (OP_STATE_READY == op->state)
     rq_remove (op);
-    break;
-  case OP_STATE_WAITING:
-    wq_remove (op);
-    break;
-  case OP_STATE_STARTED:
-  case OP_STATE_INIT:
-    break;
+  GNUNET_assert (NULL != op->queues);
+  GNUNET_assert (NULL != op->qentries);
+  for (i = 0; i < op->nqueues; i++)
+  {
+    entry = op->qentries[i];
+    remove_queue_entry (op, i);
+    opq = op->queues[i];
+    switch (op->state)
+    {
+    case OP_STATE_INIT:
+    case OP_STATE_WAITING:
+      break;
+    case OP_STATE_READY:
+    case OP_STATE_STARTED:
+      GNUNET_assert (0 != opq->active);
+      GNUNET_assert (opq->active >= entry->nres);
+      opq->active -= entry->nres;
+      recheck_waiting (opq);
+      break;
+    }    
+    GNUNET_free (entry);
   }
-  for (i = 0; i < op->nqueues; i++)
-    GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op);
+  GNUNET_free_non_null (op->qentries);
   GNUNET_free (op->queues);
   GNUNET_free (op->nres);
   if (NULL != op->release)

Modified: gnunet/src/testbed/testbed_api_operations.h
===================================================================
--- gnunet/src/testbed/testbed_api_operations.h 2013-04-09 12:08:58 UTC (rev 
26818)
+++ gnunet/src/testbed/testbed_api_operations.h 2013-04-09 13:32:54 UTC (rev 
26819)
@@ -127,21 +127,6 @@
 
 
 /**
- * Remove an operation from a queue.  This can be because the
- * oeration was active and has completed (and the resources have
- * been released), or because the operation was cancelled and
- * thus scheduling the operation is no longer required.
- *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
- */
-void
-GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
-                                        struct GNUNET_TESTBED_Operation *op);
-
-
-
-/**
  * Function to call to start an operation once all
  * queues the operation is part of declare that the
  * operation can be activated.




reply via email to

[Prev in Thread] Current Thread [Next in Thread]