commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r7018 - gnuradio/branches/developers/eb/gcell/src/lib


From: eb
Subject: [Commit-gnuradio] r7018 - gnuradio/branches/developers/eb/gcell/src/lib
Date: Fri, 23 Nov 2007 23:48:59 -0700 (MST)

Author: eb
Date: 2007-11-23 23:48:59 -0700 (Fri, 23 Nov 2007)
New Revision: 7018

Modified:
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-24 05:02:17 UTC (rev 7017)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-24 06:48:59 UTC (rev 7018)
@@ -56,9 +56,25 @@
   }
 };
 
+/*
+ * Return pointer to cache-aligned chunk of storage of size size bytes.
+ * Throw if can't allocate memory.  The storage should be freed
+ * using "free" when done.
+ */
+static void *
+aligned_alloc(size_t size)
+{
+  void *p = 0;
+  if (posix_memalign(&p, CACHE_LINE_SIZE, size) != 0){
+    perror("posix_memalign");
+    throw std::runtime_error("memory");
+  }
+  return p;
+}
 
 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
-  : d_nclients(0), d_client_thread(0), d_job_busy(0)
+  : d_eh_thread(0), d_eh_state(EHS_INIT), d_shutdown_requested(false),
+    d_nclients(0), d_client_thread(0), d_job_busy(0)
 {
   if (options != 0)
     d_options = *options;
@@ -106,7 +122,6 @@
     }
   }
 
-
   if (d_options.use_affinity){
     printf("gc_job_manager: warning: affinity request was ignored\n");
   }
@@ -135,32 +150,26 @@
     // FIXME load some code in the SPE
   }
 
-
   // ----------------------------------------------------------------
   // initalize the free list of job descriptors
-  gc_jd_stack_init(&d_free_list);
+  
+  d_free_list = (gc_jd_stack_t *) aligned_alloc(sizeof(gc_jd_stack_t));
+  // This ensures that the memory associated with d_free_list is
+  // automatically freed in the destructor or if an exception occurs
+  // here in the constructor.
+  _d_free_list_boost =
+    boost::shared_ptr<void>((void *) d_free_list, free_deleter());
+  gc_jd_stack_init(d_free_list);
 
   printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
   printf("max_jobs = %u\n", d_options.max_jobs);
 
   // Initialize the array of job descriptors.
-  void *p = 0;
-  if (posix_memalign(&p, CACHE_LINE_SIZE,
-                    sizeof(d_jd[0]) * d_options.max_jobs) != 0){
-    perror("posix_memalign");
-    throw std::runtime_error("memory");
-  }
-  d_jd = (gc_job_desc_t *) p;
+  d_jd = (gc_job_desc_t *) aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs);
+  _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
   memset(d_jd, 0, sizeof(d_jd[0]) * d_options.max_jobs);
 
-  // This ensures that the memory associated with d_jd is
-  // automatically freed in the destructor or if an exception occurs
-  // here in the constructor.  It is admittedly a somewhat strange
-  // use; however we need the the custom deleter, which is only
-  // available with shared_ptr.
-  _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
 
-
   // set unique job_id
   for (int i = 0; i < (int) d_options.max_jobs; i++)
     d_jd[i].sys.job_id = i;
@@ -169,13 +178,21 @@
   for (int i = d_options.max_jobs - 1; i >= 0; i--)
     free_job_desc(&d_jd[i]);
 
+  // ----------------------------------------------------------------
+  // initalize the job queue
+  
+  d_queue = (gc_jd_queue_t *) aligned_alloc(sizeof(gc_jd_queue_t));
+  _d_queue_boost =
+    boost::shared_ptr<void>((void *) d_queue, free_deleter());
+  gc_jd_queue_init(d_queue);
 
+
   // ----------------------------------------------------------------
   // initialize d_client_thread
 
   // FIXME robust deletion
   d_client_thread = new gc_client_thread_info[d_options.max_client_threads];
-  
+
   // ----------------------------------------------------------------
   // initialize bitvectors
 
@@ -186,16 +203,10 @@
   // allocate all bitvectors in a single cache-aligned chunk
   size_t nlongs = (1 + d_options.max_client_threads) * d_bvlen;
   size_t byte_size = nlongs * sizeof(unsigned long);
-  p = 0;
-  if (posix_memalign(&p, CACHE_LINE_SIZE, byte_size) != 0){
-    perror("posix_memalign");
-    throw std::runtime_error("memory");
-  }
+  void *p = aligned_alloc(byte_size);
+  _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
   memset(p, 0, byte_size);
 
-  // let boost keep track of the storage
-  _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
-
   // now point the other pointers into this storage.
   // d_job_busy is the first one, followed by the ones in the client data
 
@@ -207,10 +218,10 @@
     d_client_thread[i].d_jobs_were_waiting_for = v;
 
   // ----------------------------------------------------------------
+  // create the spe event handler
 
+  create_event_handler();
 
-  // FIXME sequence this...
-  // FIXME create event handler thread
   // FIXME create N worker threads
   // FIXME confirm everything started OK
 
@@ -228,6 +239,8 @@
 
   d_jd = 0;            // handled via _d_jd_boost
   d_job_busy = 0;      // handled via _d_all_bitvectors
+  d_free_list = 0;     // handled via _d_free_list_boost
+  d_queue = 0;         // handled via _d_queue_boost
 }
 
 bool
@@ -246,7 +259,7 @@
 gc_job_manager_impl::alloc_job_desc()
 {
   // stack is lock free, thus safe to call from any thread
-  return gc_jd_stack_pop(&d_free_list);
+  return gc_jd_stack_pop(d_free_list);
 }
 
 void
@@ -254,7 +267,7 @@
 {
   // stack is lock free, thus safe to call from any thread
   if (jd != 0)
-    gc_jd_stack_push(&d_free_list, jd);
+    gc_jd_stack_push(d_free_list, jd);
 }
 
 bool
@@ -269,6 +282,34 @@
   return false;                        // FIXME
 }
 
+void
+gc_job_manager_impl::create_event_handler()
+{
+  // create the SPE event handler and register our interest in all events
+
+  d_spe_event_handler.ptr = spe_event_handler_create();
+  if (d_spe_event_handler.ptr == 0){
+    perror("spe_event_handler_create");
+    throw std::runtime_error("spe_event_handler_create");
+  }
+
+  for (unsigned int i = 0; i < d_options.nspes; i++){
+    spe_event_unit_t   eu;
+    memset(&eu, 0, sizeof(eu));
+    eu.events = SPE_EVENT_ALL_EVENTS;
+    eu.spe = d_worker[i].spe_ctx;
+    eu.data.u32 = i;   // available in events returned by spe_event_wait
+
+    if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
+      perror("spe_event_handler_register");
+      throw std::runtime_error("spe_event_handler_register");
+    }
+  }
+
+  // FIXME create the thread
+
+}
+
 // ------------------------------------------------------------------------
 
 worker_ctx::~worker_ctx()
@@ -283,3 +324,4 @@
   }
   state = WS_FREE;
 }
+

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-11-24 05:02:17 UTC (rev 7017)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-11-24 06:48:59 UTC (rev 7018)
@@ -25,6 +25,7 @@
 #include "gc_job_manager.h"
 #include "gc_client_thread_info.h"
 #include "gc_jd_stack.h"
+#include "gc_jd_queue.h"
 #include <libspe2.h>
 #include <vector>
 #include <boost/shared_ptr.hpp>
@@ -42,11 +43,33 @@
 struct worker_ctx {
   worker_state         state;
   spe_context_ptr_t    spe_ctx;
+  pthread_t            thread;
 
-  worker_ctx() : state(WS_FREE), spe_ctx(0) {}
+  worker_ctx() : state(WS_FREE), spe_ctx(0), thread(0) {}
   ~worker_ctx();
 };
 
+enum evt_handler_state {
+  EHS_INIT,            // being initialized
+  EHS_RUNNING,         // thread is running
+  EHS_SHUTTING_DOWN,   // in process of shutting down everything
+  EHS_DEAD,            // thread is dead
+};
+
+struct spe_event_handler {
+  spe_event_handler_ptr_t      ptr;
+
+  spe_event_handler() : ptr(0) {}
+  ~spe_event_handler(){
+    if (ptr){
+      if (spe_event_handler_destroy(ptr) != 0){
+       perror("spe_event_handler_destroy");
+      }
+    }
+  }
+};
+
+
 /*!
  * \brief Concrete class that manages SPE jobs.
  *
@@ -58,8 +81,15 @@
 
   gc_jm_options                 d_options;
   spe_gang_context_sptr  d_gang;               // boost::shared_ptr
-  worker_ctx            d_worker[MAX_SPES];
 
+  worker_ctx            d_worker[MAX_SPES];    // SPE ctx, thread, etc
+
+  pthread_t             d_eh_thread;           // the event handler thread
+  evt_handler_state     d_eh_state;
+  bool                  d_shutdown_requested;
+  spe_event_handler     d_spe_event_handler;
+  
+
   // All of the job descriptors are hung off of here.
   // We allocate them all in a single cache aligned chunk.
   gc_job_desc_t                *d_jd;                  // [options.max_jobs]
@@ -82,11 +112,17 @@
   // This points into _d_all_bitvectors.
   unsigned long                *d_job_busy;
 
-  // The actually allocation chunk is hung off of d_jd.
-  // This is the lock free stack where we keep track of the free ones.
-  gc_jd_stack_t                 d_free_list;           // stack of free job 
descriptors
+  // Lock free stack where we keep track of the free job descriptors.
+  gc_jd_stack_t                *d_free_list;           // stack of free job 
descriptors
+  boost::shared_ptr<void> _d_free_list_boost;  // hack for automatic storage 
mgmt
 
+  // The PPE inserts jobs here; SPEs pull jobs from here.
+  gc_jd_queue_t                *d_queue;               // job queue
+  boost::shared_ptr<void> _d_queue_boost;      // hack for automatic storage 
mgmt
 
+
+  void create_event_handler();
+
   friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);
 
   /*!





reply via email to

[Prev in Thread] Current Thread [Next in Thread]