commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r7243 - in gnuradio/branches/developers/eb/gcell/src/l


From: eb
Subject: [Commit-gnuradio] r7243 - in gnuradio/branches/developers/eb/gcell/src/lib: . spu
Date: Sun, 23 Dec 2007 09:04:48 -0700 (MST)

Author: eb
Date: 2007-12-23 09:04:47 -0700 (Sun, 23 Dec 2007)
New Revision: 7243

Modified:
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
   gnuradio/branches/developers/eb/gcell/src/lib/qa_job_manager.cc
   gnuradio/branches/developers/eb/gcell/src/lib/spu/test_spu.c
Log:
work-in-progress on cell job manager

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc     
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc     
2007-12-23 16:04:47 UTC (rev 7243)
@@ -41,3 +41,14 @@
   // nop
 }
 
+void
+gc_job_manager::set_debug(int debug)
+{
+  // nop
+}
+
+int
+gc_job_manager::debug()
+{
+  return 0;
+}

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h      
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h      
2007-12-23 16:04:47 UTC (rev 7243)
@@ -136,6 +136,9 @@
   wait_jobs(unsigned int njobs,
            gc_job_desc *jd[], bool done[], gc_wait_mode mode) = 0;
 
+
+  virtual void set_debug(int debug);
+  virtual int debug();
 };
 
 

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-12-23 16:04:47 UTC (rev 7243)
@@ -32,7 +32,6 @@
 
 static const size_t CACHE_LINE_SIZE = 128;
 
-static const bool VERBOSE = true;
 static const unsigned int DEFAULT_MAX_JOBS = 128;
 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
 
@@ -43,6 +42,7 @@
 static bool          s_key_initialized = false;
 static pthread_key_t s_client_key;
 
+static int s_worker_debug = 0;
 
 // custom deleter of gang_contexts for use with boost::shared_ptr
 class gang_deleter {
@@ -110,7 +110,7 @@
 
 
 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
-  : d_spu_args(0),
+  : d_debug(0), d_spu_args(0),
     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
     d_shutdown_requested(false),
     d_client_thread(0)
@@ -122,6 +122,9 @@
     s_key_initialized = true;
   }
 
+  // ensure it's zero
+  pthread_setspecific(s_client_key, 0);
+
   if (options != 0)
     d_options = *options;
 
@@ -134,7 +137,7 @@
   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
 
-  if (VERBOSE){
+  if (debug()){
     printf("cpu_nodes = %d\n", ncpu_nodes);
     for (int i = 0; i < ncpu_nodes; i++){
       printf("node[%d].physical_spes = %2d\n", i,
@@ -249,8 +252,10 @@
     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
   gc_jd_stack_init(d_free_list);
 
-  printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
-  printf("max_jobs = %u\n", d_options.max_jobs);
+  if (debug()){
+    printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
+    printf("max_jobs = %u\n", d_options.max_jobs);
+  }
 
   // Initialize the array of job descriptors.
   d_jd = (gc_job_desc_t *) aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs);
@@ -313,6 +318,9 @@
   d_jd = 0;            // handled via _d_jd_boost
   d_free_list = 0;     // handled via _d_free_list_boost
   d_queue = 0;         // handled via _d_queue_boost
+
+  // clear cti, since we've deleted the underlying data
+  pthread_setspecific(s_client_key, 0);
 }
 
 bool
@@ -513,6 +521,7 @@
     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
     cti->d_njobs_waiting_for = njobs;
     cti->d_jobs_waiting_for = jd;
+    assert(cti->d_jobs_done != 0);
 
     unsigned int ndone = 0;
 
@@ -759,24 +768,26 @@
 void
 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
 {
-  print_event(evt);
+  // print_event(evt);
 
   int spe_num = evt->data.u32;
 
-  // we assume that only a single event type can be signaled at once
+  // only a single event type can be signaled at a time
   
   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
     static const int NMSGS = 32;
     unsigned int msg[NMSGS];
-    int n = spe_out_mbox_read(evt->spe, msg, NMSGS);
+    int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, 
SPE_MBOX_ANY_BLOCKING);
+    // printf("spe_out_intr_mbox_read = %d\n", n);
     if (n < 0){
-      perror("spe_out_mbox_read");
+      perror("spe_out_intr_mbox_read");
     }
     else {
       for (int i = 0; i < n; i++){
        switch(MBOX_MSG_OP(msg[i])){
        case OP_JOB_DONE:
-         printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
+         if (debug())
+           printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
          notify_client_job_is_done(MBOX_MSG_ARG(msg[i]));
          break;
 
@@ -797,8 +808,10 @@
     else {
       switch (si.stop_reason){
       case SPE_EXIT:
-       printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
-              spe_num, si.result.spe_exit_code);
+       if (debug()){
+         printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
+                spe_num, si.result.spe_exit_code);
+       }
        break;
       case SPE_STOP_AND_SIGNAL:
        printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
@@ -856,7 +869,8 @@
 
   spe_event_unit_t events[MAX_EVENTS];
 
-  printf("event_handler_loop: starting\n");
+  if (d_debug)
+    printf("event_handler_loop: starting\n");
 
   set_eh_state(EHS_RUNNING);
 
@@ -885,7 +899,8 @@
 
        if (all_dead){
          set_eh_state(EHS_DEAD);
-         printf("event_handler_loop: exiting\n");
+         if (d_debug)
+           printf("event_handler_loop: exiting\n");
          return;
        }
       }
@@ -920,7 +935,8 @@
   spe_stop_info_t      si;
 
   w->state = WS_RUNNING;
-  printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
+  if (s_worker_debug)
+    printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
 
   unsigned int entry = SPE_DEFAULT_ENTRY;
   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
@@ -932,8 +948,9 @@
   }
   else if (r == 0){
     // spe program called exit.
-    printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
-          w->spe_idx, si.result.spe_exit_code);
+    if (s_worker_debug)
+      printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
+            w->spe_idx, si.result.spe_exit_code);
   }
   else {
     // called stop_and_signal
@@ -945,7 +962,8 @@
   }
 
   // in any event, we're committing suicide now ;)
-  printf("worker[%d]: WS_DEAD\n", w->spe_idx);
+  if (s_worker_debug)
+    printf("worker[%d]: WS_DEAD\n", w->spe_idx);
 
   w->state = WS_DEAD;
   return 0;
@@ -982,6 +1000,18 @@
 }
 
 
+void
+gc_job_manager_impl::set_debug(int debug)
+{
+  d_debug = debug;
+  s_worker_debug = debug;
+}
+
+int
+gc_job_manager_impl::debug()
+{
+  return d_debug;
+}
 ////////////////////////////////////////////////////////////////////////
 
 worker_ctx::~worker_ctx()

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-12-23 16:04:47 UTC (rev 7243)
@@ -88,7 +88,7 @@
 {
   enum { MAX_SPES =  16 };
 
-
+  int                    d_debug;
   gc_jm_options                  d_options;
   spe_program_handle_sptr d_spe_image;
   spe_gang_context_sptr   d_gang;              // boost::shared_ptr
@@ -225,6 +225,9 @@
   virtual int
   wait_jobs(unsigned int njobs,
            gc_job_desc *jd[], bool done[], gc_wait_mode mode);
+
+  virtual void set_debug(int debug);
+  virtual int debug();
 };
 
 #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */

Modified: gnuradio/branches/developers/eb/gcell/src/lib/qa_job_manager.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/qa_job_manager.cc     
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/qa_job_manager.cc     
2007-12-23 16:04:47 UTC (rev 7243)
@@ -75,7 +75,7 @@
 void
 qa_job_manager::t1()
 {
-  t1_body();           // leaks 800 bytes first time, could be static inits
+  t1_body();           // leaks 800 bytes first time, could be one-time inits
   leak_check(&qa_job_manager::t1_body, "t1");
 }
 
@@ -158,8 +158,9 @@
 void
 qa_job_manager::t3_body()
 {
-  // This seems to leak memory, but I'm not sure if it's us or the
-  // underlying exception handling mechanism
+  // This leaks memory the first time it's invoked, but I'm not sure
+  // if it's us or the underlying exception handling mechanism, or
+  // cppunit.  cppunit is the prime suspect.
 
   gc_job_manager *mgr;
   gc_jm_options opts;
@@ -168,14 +169,82 @@
   CPPUNIT_ASSERT_THROW(mgr = gc_make_job_manager(&opts), std::out_of_range);
 }
 
+static void
+init_jd(gc_job_desc *jd, gc_method_t method)
+{
+  jd->method = method;
+  jd->input.n_direct = 0;
+  jd->input.n_indirect = 0;
+  jd->output.n_direct = 0;
+  jd->output.n_indirect = 0;
+}
+
 void
 qa_job_manager::t4_body()
 {
+  gc_job_manager *mgr;
+  gc_jm_options opts;
+  opts.nspes = 1;
+  opts.gang_schedule = true;
+  mgr = gc_make_job_manager(&opts);
+  //mgr->set_debug(-1);
+  static const int NJOBS = 32;
+  gc_job_desc *jds[NJOBS];
+  bool done[NJOBS];
+
+  for (int i = 0; i < NJOBS; i++){
+    jds[i] = mgr->alloc_job_desc();
+    init_jd(jds[i], i);
+  }
+
+  for (int i = 0; i < NJOBS; i++){
+    if (!mgr->submit_job(jds[i])){
+      printf("submit_job(jds[%d]) failed, status = %d\n", i, jds[i]->status);
+    }
+  }
+
+  int n = mgr->wait_jobs(NJOBS, jds, done, GC_WAIT_ALL);
+  CPPUNIT_ASSERT_EQUAL(NJOBS, n);
+
+  for (int i = 0; i < NJOBS; i++){
+    mgr->free_job_desc(jds[i]);
+  }
+
+  delete mgr;
 }
 
 void
 qa_job_manager::t5_body()
 {
+  gc_job_manager *mgr;
+  gc_jm_options opts;
+  opts.nspes = 0;      // use them all
+  opts.gang_schedule = true;
+  mgr = gc_make_job_manager(&opts);
+  //mgr->set_debug(-1);
+  static const int NJOBS = 32;
+  gc_job_desc *jds[NJOBS];
+  bool done[NJOBS];
+
+  for (int i = 0; i < NJOBS; i++){
+    jds[i] = mgr->alloc_job_desc();
+    init_jd(jds[i], i);
+  }
+
+  for (int i = 0; i < NJOBS; i++){
+    if (!mgr->submit_job(jds[i])){
+      printf("submit_job(jds[%d]) failed, status = %d\n", i, jds[i]->status);
+    }
+  }
+
+  int n = mgr->wait_jobs(NJOBS, jds, done, GC_WAIT_ALL);
+  CPPUNIT_ASSERT_EQUAL(NJOBS, n);
+
+  for (int i = 0; i < NJOBS; i++){
+    mgr->free_job_desc(jds[i]);
+  }
+
+  delete mgr;
 }
 
 void

Modified: gnuradio/branches/developers/eb/gcell/src/lib/spu/test_spu.c
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/spu/test_spu.c        
2007-12-23 02:02:48 UTC (rev 7242)
+++ gnuradio/branches/developers/eb/gcell/src/lib/spu/test_spu.c        
2007-12-23 16:04:47 UTC (rev 7243)
@@ -35,10 +35,10 @@
 {
   // FIXME do something useful ;)
 
-  printf("spu[%d]: job_id = %3d  client_id = %3d\n",
-        spu_args.spu_idx, jd->sys.job_id, jd->sys.client_id);
+  if (0)
+    printf("spu[%d]: job_id = %3d  client_id = %3d\n",
+          spu_args.spu_idx, jd->sys.job_id, jd->sys.client_id);
 
-
   // FIXME lookup method
   // FIXME copy indirect args in
   // FIXME invoke method
@@ -63,7 +63,7 @@
     int cnt = spu_readchcnt(SPU_RdInMbox);
     if (unlikely(cnt > 0)){
       int msg = spu_readch(SPU_RdInMbox);
-      printf("spu[%d] mbox_msg: 0x%08x\n", spu_args.spu_idx, msg);
+      // printf("spu[%d] mbox_msg: 0x%08x\n", spu_args.spu_idx, msg);
       if (MBOX_MSG_OP(msg) == OP_EXIT)
        return;
     }
@@ -83,13 +83,12 @@
 {
   sys_tags_init();
 
-  // start the dma to get the args
+  // dma the args in
   mfc_get(&spu_args, argp, sizeof(spu_args), sys_tag, 0, 0);
-
   mfc_write_tag_mask(1 << sys_tag);    // the tag we're interested in
   mfc_read_tag_status_all();           // wait for DMA to complete
 
-  printf("spu[%d] queue = 0x%llx\n", spu_args.spu_idx, spu_args.queue);
+  // printf("spu[%d] queue = 0x%llx\n", spu_args.spu_idx, spu_args.queue);
 
   main_loop();
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]