commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r7082 - in gnuradio/branches/developers/eb/gcell/src:


From: eb
Subject: [Commit-gnuradio] r7082 - in gnuradio/branches/developers/eb/gcell/src: include lib
Date: Thu, 6 Dec 2007 20:33:52 -0700 (MST)

Author: eb
Date: 2007-12-06 20:33:51 -0700 (Thu, 06 Dec 2007)
New Revision: 7082

Modified:
   gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager

Modified: gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h     
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h     
2007-12-07 03:33:51 UTC (rev 7082)
@@ -59,10 +59,16 @@
 //! final job status
 typedef enum {
   JS_OK,
+  JS_SHUTTING_DOWN,        // job mananger is shutting down
+  JS_TOO_MANY_CLIENTS,     // too many client threads
   JS_UNKNOWN_METHOD,       // didn't recognize the method ID
-  JS_BAD_SIGNATURE,        // arg counts and/or types are wrong
+  JS_BAD_SIGNATURE,        // arg counts and/or types are wrong for given 
method
   JS_BAD_ALIGNMENT,        // indirect arg EA not 16-byte aligned
   JS_BAD_LENGTH,           // indirect arg length not a multiple of 16 bytes 
long
+  JS_BAD_N_DIRECT,         // too many direct args
+  JS_BAD_N_INDIRECT,       // too many indirect args
+  JS_ARGS_TOO_LONG,        // total length of indirect args exceed limit
+  JS_BAD_JUJU,             // misc problem: you're having a bad day
 } gc_job_status_t;
 
 #define MAX_ARGS_DIRECT                8
@@ -112,8 +118,8 @@
  */
 typedef struct gc_job_args
 {
-  uint32_t      n_by_val;                         // # of "value" args
-  uint32_t      n_by_ref;                         // # of "indirct" args 
(DMA'd in/out)
+  uint32_t      n_direct;                         // # of "direct" args
+  uint32_t      n_indirect;                       // # of "indirct" args 
(DMA'd in/out)
   uint32_t      _pad[2];                          // (padding)
   gc_tag_t      dir_tag[MAX_ARGS_DIRECT] _AL16;   // type of "direct" 
argument[i]
   gc_arg_union_t dir_val[MAX_ARGS_DIRECT] _AL16;   // "direct" argument values

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h       
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h       
2007-12-07 03:33:51 UTC (rev 7082)
@@ -48,11 +48,12 @@
     d_jobs_were_waiting_for = 0;
   }
 
-  volatile uint32_t    d_free;         // is this cti free? (1->free, 0->in 
use)
+  uint32_t             d_free;         // is this cti free? (1->free, 0->in 
use)
   gc_ct_state_t                d_state;
   omni_mutex           d_mutex;        // used to suspend/resume client
   omni_condition       d_cond;         // used to suspend/resume client
   unsigned long               *d_jobs_were_waiting_for;  // bitvector
+  uint16_t             d_client_id;    // which client info are we?
 };
 
 #endif /* INCLUDED_GC_CLIENT_THREAD_INFO_H */

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h      
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h      
2007-12-07 03:33:51 UTC (rev 7082)
@@ -106,6 +106,8 @@
 
   /*!
    * \brief Wait for specified job to complete.
+   *
+   * A thread may only wait for jobs which it submitted.
    */
   virtual bool wait_job(gc_job_desc *jd) = 0;
 

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-12-07 03:33:51 UTC (rev 7082)
@@ -28,14 +28,23 @@
 #include <stdio.h>
 #include <stdexcept>
 #include <stdlib.h>
+#include <atomic_dec_if_positive.h>
 
+static const size_t CACHE_LINE_SIZE = 128;
+
 static const bool VERBOSE = true;
 
-static const size_t CACHE_LINE_SIZE = 128;
 static const unsigned int DEFAULT_MAX_JOBS = 128;
 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
 
+// FIXME this really depends on the SPU code...
+static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
 
+
+static bool          s_key_initialized = false;
+static pthread_key_t s_client_key;
+
+
 // custom deleter of gang_contexts for use with boost::shared_ptr
 class gang_deleter {
 public:
@@ -57,7 +66,18 @@
   }
 };
 
+
 /*
+ * Called when client thread is destroyed.
+ * We mark our client info free.
+ */
+static void
+client_key_destructor(void *p)
+{
+  ((gc_client_thread_info *) p)->d_free = 1;
+}
+
+/*
  * Return pointer to cache-aligned chunk of storage of size size bytes.
  * Throw if can't allocate memory.  The storage should be freed
  * with "free" when done.  The memory is initialized to zero.
@@ -81,8 +101,15 @@
   : d_spu_args(0),
     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
     d_shutdown_requested(false),
-    d_nclients(0), d_client_thread(0), d_job_busy(0)
+    d_client_thread(0), d_job_busy(0)
 {
+  if (!s_key_initialized){
+    int r = pthread_key_create(&s_client_key, client_key_destructor);
+    if (r != 0)
+      throw std::runtime_error("pthread_key_create");
+    s_key_initialized = true;
+  }
+
   if (options != 0)
     d_options = *options;
 
@@ -229,9 +256,16 @@
   // ----------------------------------------------------------------
   // initialize d_client_thread
 
-  // FIXME robust deletion
-  d_client_thread = new gc_client_thread_info[d_options.max_client_threads];
+  {
+    gc_client_thread_info_sa cti(
+         new gc_client_thread_info[d_options.max_client_threads]);
 
+    d_client_thread.swap(cti);
+
+    for (unsigned int i = 0; i < d_options.max_client_threads; i++)
+      d_client_thread[i].d_client_id = i;
+  }
+
   // ----------------------------------------------------------------
   // initialize bitvectors
 
@@ -268,11 +302,6 @@
 {
   shutdown();
 
-  // FIXME whatever else needs to be done
-
-  delete [] d_client_thread;
-  d_client_thread = 0;
-
   d_jd = 0;            // handled via _d_jd_boost
   d_job_busy = 0;      // handled via _d_all_bitvectors
   d_free_list = 0;     // handled via _d_free_list_boost
@@ -317,18 +346,100 @@
     gc_jd_stack_push(d_free_list, jd);
 }
 
+/*
+ * We check as much as we can here on the PPE side, so that the SPE
+ * doesn't have to.
+ */
+static bool
+check_args(gc_job_desc *jd, gc_job_args *args)
+{
+  if (args->n_direct > MAX_ARGS_DIRECT){
+    jd->status = JS_BAD_N_DIRECT;
+    return false;
+  }
+
+  if (args->n_indirect > MAX_ARGS_INDIRECT){
+    jd->status = JS_BAD_N_INDIRECT;
+    return false;
+  }
+
+  size_t total_len = 0;
+  for (unsigned int i = 0; i < args->n_indirect; i++){
+    if ((args->ind_len[i] & 0xf) != 0){
+      jd->status = JS_BAD_LENGTH;
+      return false;
+    }
+    if ((args->ind_ea[i] & 0xf) != 0){
+      jd->status = JS_BAD_ALIGNMENT;
+      return false;
+    }
+    total_len += args->ind_len[i];
+  }
+
+  if (total_len > MAX_TOTAL_INDIRECT_LENGTH){
+    jd->status = JS_ARGS_TOO_LONG;
+    return false;
+  }
+
+  return true;
+}
+
 bool
 gc_job_manager_impl::submit_job(gc_job_desc *jd)
 {
-  if (d_shutdown_requested)
+  if (d_shutdown_requested){
+    jd->status = JS_SHUTTING_DOWN;
     return false;
+  }
 
-  return false;                        // FIXME
+  // Ensure we've got a client_thread_info assigned to this thread.
+  
+  gc_client_thread_info *cti =
+    (gc_client_thread_info *) pthread_getspecific(s_client_key);
+  if (cti == 0){
+    if ((cti = alloc_cti()) == 0){
+      fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client 
threads.\n");
+      jd->status = JS_TOO_MANY_CLIENTS;
+      return false;
+    }
+    int r = pthread_setspecific(s_client_key, cti);
+    if (r != 0){
+      jd->status = JS_BAD_JUJU;
+      fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
+      return false;
+    }
+  }
+
+  // FIXME map and/or check method against all known methods
+
+  if (!check_args(jd, &jd->input))
+    return false;
+
+  if (!check_args(jd, &jd->output))
+    return false;
+
+  jd->status = JS_OK;
+  jd->sys.client_id = cti->d_client_id;
+  gc_jd_queue_enqueue(d_queue, jd);
+
+  return true;
 }
 
 bool
 gc_job_manager_impl::wait_job(gc_job_desc *jd)
 {
+  gc_client_thread_info *cti =
+    (gc_client_thread_info *) pthread_getspecific(s_client_key);
+  if (cti == 0)
+    return false;
+
+  if (jd->sys.client_id != cti->d_client_id){
+    fprintf(stderr, "gc_job_manager_impl::wait_job: can't want for job you 
didn't submit\n");
+    return false;
+  }
+
+  // FIXME...
+  
   return false;                        // FIXME
 }
 
@@ -699,7 +810,33 @@
 
 ////////////////////////////////////////////////////////////////////////
 
+gc_client_thread_info *
+gc_job_manager_impl::alloc_cti()
+{
+  for (unsigned int i = 0; i < d_options.max_client_threads; i++){
+    if (d_client_thread[i].d_free){
+      // try to atomically grab it
+      if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
+       // got it...
+       d_client_thread[i].d_state = CT_RUNNING;
+       memset(&d_client_thread[i].d_jobs_were_waiting_for, 0, d_bvlen * 
sizeof(unsigned long));
+       return &d_client_thread[i];
+      }
+    }
+  }
+  return 0;
+}
 
+void
+gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
+{
+  assert((size_t) (cti - d_client_thread.get()) < 
d_options.max_client_threads);
+  cti->d_free = 1;
+}
+
+
+////////////////////////////////////////////////////////////////////////
+
 worker_ctx::~worker_ctx()
 {
   if (spe_ctx){
@@ -711,3 +848,4 @@
   }
   state = WS_FREE;
 }
+

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-12-07 03:33:51 UTC (rev 7082)
@@ -30,8 +30,10 @@
 #include <libspe2.h>
 #include <vector>
 #include <boost/shared_ptr.hpp>
+#include <boost/scoped_array.hpp>
 
 typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr;
+typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa;
 
 
 enum worker_state {
@@ -107,8 +109,7 @@
   gc_job_desc_t                *d_jd;                  // [options.max_jobs]
   boost::shared_ptr<void> _d_jd_boost;         // hack for automatic storage 
mgmt
 
-  int                   d_nclients;            // active length of 
d_client_thread
-  gc_client_thread_info *d_client_thread;      // [options.max_client_threads]
+  gc_client_thread_info_sa d_client_thread;    // [options.max_client_threads]
 
   // We use bitvectors to represent the busy/free state of a job.  Each
   // bitvector is d_bvlen longs in length.
@@ -133,6 +134,9 @@
   boost::shared_ptr<void> _d_queue_boost;      // hack for automatic storage 
mgmt
 
 
+  gc_client_thread_info *alloc_cti();
+  void free_cti(gc_client_thread_info *cti);
+
   void create_event_handler();
   void set_eh_state(evt_handler_state s);
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]