commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r7020 - gnuradio/branches/developers/eb/gcell/src/lib


From: eb
Subject: [Commit-gnuradio] r7020 - gnuradio/branches/developers/eb/gcell/src/lib
Date: Sat, 24 Nov 2007 19:48:29 -0700 (MST)

Author: eb
Date: 2007-11-24 19:48:29 -0700 (Sat, 24 Nov 2007)
New Revision: 7020

Modified:
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager.

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-24 09:29:11 UTC (rev 7019)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-25 02:48:29 UTC (rev 7020)
@@ -102,8 +102,6 @@
 
   // clamp nspes
   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
-
-  // FIXME when we get that next gen Cell
   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
 
   //
@@ -253,7 +251,7 @@
   d_shutdown_requested = true;         // set flag for event handler thread
 
   // should only happens during early QA code
-  if (d_eh_state == EHS_INIT)
+  if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
     return false;
 
   while (d_eh_state != EHS_DEAD)       // wait for it to finish
@@ -286,6 +284,9 @@
 bool
 gc_job_manager_impl::submit_job(gc_job_desc *jd)
 {
+  if (d_shutdown_requested)
+    return false;
+
   return false;                        // FIXME
 }
 
@@ -295,7 +296,52 @@
   return false;                        // FIXME
 }
 
-extern "C" static void *
+static void
+pthread_create_failure_msg(int r, const char *which)
+{
+  char buf[256];
+  char *s = 0;
+
+  switch (r){
+  case EAGAIN: s = "EAGAIN"; break;
+  case EINVAL: s = "EINVAL"; break;
+  case EPERM:  s = "EPERM";  break;
+  default:
+    snprintf(buf, sizeof(buf), "Unknown error %d", r);
+    s = buf;
+    break;
+  }
+  fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
+}
+
+
+static bool
+start_thread(pthread_t *thread,
+            void *(*start_routine)(void *),  void *arg,
+            const char *msg)
+{
+  pthread_attr_t attr;
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+  // FIXME save sigprocmask
+  // FIXME set sigprocmask
+
+  int r = pthread_create(thread, &attr, start_routine, arg);
+    
+  // FIXME restore sigprocmask
+
+  if (r != 0){
+    pthread_create_failure_msg(r, msg);
+    return false;
+  }
+  return true;
+}
+
+
+
+//extern "C" 
+static void *
 start_event_handler(void *arg)
 {
   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
@@ -303,6 +349,24 @@
   return 0;
 }
 
+//extern "C" 
+static void *
+worker_loop(void *arg)
+{
+  // FIXME need to pass additional info.
+  
+  worker_ctx *w = (worker_ctx *) arg;
+
+  w->state = WS_RUNNING;
+
+  printf("worker running\n");
+
+  // FIXME spe_context_run
+
+  w->state = WS_DEAD;
+  return 0;
+}
+
 void
 gc_job_manager_impl::create_event_handler()
 {
@@ -329,8 +393,32 @@
 
   // create our event handling thread
 
-  
+  if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
+    throw std::runtime_error("pthread_create");
+  }
 
+  // create the SPE worker threads
+
+  bool ok = true;
+  for (unsigned int i = 0; ok && i < d_options.nspes; i++){
+    char name[256];
+    snprintf(name, sizeof(name), "worker[%d]", i);
+    ok &= start_thread(&d_worker[i].thread, worker_loop,
+                      &d_worker[i], name);
+  }
+
+  if (!ok){
+    //
+    // FIXME Clean up the mess.  Need to terminate event handler and all 
workers.
+    //
+    // this should cause the workers to exit, unless they're really broken
+    send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
+
+    shutdown();
+    sleep(1);          // FIXME remove when shutdown waits for workers
+
+    throw std::runtime_error("pthread_create");
+  }
 }
 
 bool
@@ -347,7 +435,17 @@
 bool
 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
 {
-  return false;
+  if (spe >= d_options.nspes)
+    return false;
+
+  int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
+                           SPE_MBOX_ALL_BLOCKING);
+  if (r < 0){
+    perror("spe_in_mbox_write");
+    return false;
+  }
+
+  return r == 1;
 }
 
 void
@@ -379,6 +477,100 @@
 }
 
 void
+gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
+{
+  print_event(evt);
+
+  int spe_num = evt->data.u32;
+
+  // we assume that only a single event type can be signaled at once
+  
+  if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
+    static const int NMSGS = 8;
+    unsigned int msg[NMSGS];
+    int n = spe_out_mbox_read(evt->spe, msg, NMSGS);
+    if (n < 0){
+      perror("spe_out_mbox_read");
+    }
+    else {
+      for (int i = 0; i < n; i++){
+       switch(MBOX_MSG_OP(msg[i])){
+       case OP_JOB_DONE:
+         printf("job_done (0x%08x) from spe[%d]\n", msg[i], spe_num);
+         break;
+
+       case OP_PING_REPLY:
+         printf("ping_reply (0x%08x) from spe[%d]\n", msg[i], spe_num);
+         break;
+
+       case OP_EXIT:
+       case OP_PING:
+       default:
+         printf("Unexpected msg (0x%08x) from spe[%d]\n", msg[i], spe_num);
+         break;
+       }
+      }
+    }
+  }
+  else if (evt->events == SPE_EVENT_IN_MBOX){   // there's room to write to SPE
+    // spe_in_mbox_write (ignore)
+  }
+  else if (evt->events == SPE_EVENT_TAG_GROUP){         // our DMA completed
+    // spe_mfcio_tag_status_read
+  }
+  else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
+    spe_stop_info_t si;
+    int r = spe_stop_info_read(evt->spe, &si);
+    if (r < 0){
+      perror("spe_stop_info_read");
+    }
+    else {
+      switch (si.stop_reason){
+      case SPE_EXIT:
+       printf("spe[%d] SPE_EXIT w/ exit_code = %d\n",
+              spe_num, si.result.spe_exit_code);
+       break;
+      case SPE_STOP_AND_SIGNAL:
+       printf("spe[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
+              spe_num, si.result.spe_signal_code);
+       break;
+      case SPE_RUNTIME_ERROR:
+       printf("spe[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
+              spe_num, si.result.spe_runtime_error);
+       break;
+      case SPE_RUNTIME_EXCEPTION:
+       printf("spe[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 
0x%x\n",
+              spe_num, si.result.spe_runtime_exception);
+       break;
+      case SPE_RUNTIME_FATAL:
+       printf("spe[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
+              spe_num, si.result.spe_runtime_fatal);
+       break;
+      case SPE_CALLBACK_ERROR:
+       printf("spe[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
+              spe_num, si.result.spe_callback_error);
+       break;
+      case SPE_ISOLATION_ERROR:
+       printf("spe[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
+              spe_num, si.result.spe_isolation_error);
+       break;
+      default:
+       printf("spe[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
+              spe_num, si.stop_reason, si.spu_status);
+       break;
+      }
+    }
+  }
+  else {
+    fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", 
evt->events);
+    return;
+  }
+}
+
+//
+// This is the "main program" of the event handling thread
+//
+void
 gc_job_manager_impl::event_handler_loop()
 {
   static const int MAX_EVENTS = 16;
@@ -386,6 +578,8 @@
 
   spe_event_unit_t events[MAX_EVENTS];
 
+  printf("event_handler_loop: starting\n");
+
   set_eh_state(EHS_RUNNING);
 
   while (1){
@@ -394,18 +588,25 @@
     case EHS_RUNNING:      // normal stuff
       if (d_shutdown_requested) {
        set_eh_state(EHS_SHUTTING_DOWN);
-       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
       }
       break;
 
     case EHS_SHUTTING_DOWN:
-      break;
+      // FIXME wait until job queue is empty, then tell them to exit
+      send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
 
-    case EHS_DEAD:
+      // FIXME when all workers have died, we return.
+      // Not sure yet if we see the SPE_EXIT events.
+      // If we do, we can keep track here.
+      // In the meanwhile...
+
+      set_eh_state(EHS_DEAD);  // FIXME
+      printf("event_handler_loop: exiting\n");
       return;
 
     default:
       set_eh_state(EHS_DEAD);
+      printf("event_handler_loop: exiting\n");
       return;
     }
 
@@ -417,8 +618,7 @@
       // FIXME bail?
     }
     for (int i = 0; i < nevents; i++){
-      print_event(&events[i]);
-      // FIXME do the real work
+      handle_event(&events[i]);
     }
   }
 }

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-11-24 09:29:11 UTC (rev 7019)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-11-25 02:48:29 UTC (rev 7020)
@@ -41,7 +41,7 @@
 };
 
 struct worker_ctx {
-  worker_state         state;
+  volatile worker_state        state;
   spe_context_ptr_t    spe_ctx;
   pthread_t            thread;
 
@@ -135,6 +135,7 @@
   bool send_all_spes(uint32_t msg);
   bool send_spe(unsigned int spe, uint32_t msg);
   void print_event(spe_event_unit_t *evt);
+  void handle_event(spe_event_unit_t *evt);
 
 
   friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]