commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r7010 - in gnuradio/branches/developers/eb/gcell/src:


From: eb
Subject: [Commit-gnuradio] r7010 - in gnuradio/branches/developers/eb/gcell/src: apps include lib
Date: Tue, 20 Nov 2007 20:00:31 -0700 (MST)

Author: eb
Date: 2007-11-20 20:00:30 -0700 (Tue, 20 Nov 2007)
New Revision: 7010

Added:
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Removed:
   gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager.h
   gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager_impl.h
Modified:
   gnuradio/branches/developers/eb/gcell/src/apps/Makefile.am
   gnuradio/branches/developers/eb/gcell/src/lib/Makefile.am
   gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc
   gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
Log:
work-in-progress on cell job manager

Modified: gnuradio/branches/developers/eb/gcell/src/apps/Makefile.am
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/apps/Makefile.am  2007-11-20 
22:54:51 UTC (rev 7009)
+++ gnuradio/branches/developers/eb/gcell/src/apps/Makefile.am  2007-11-21 
03:00:30 UTC (rev 7010)
@@ -30,6 +30,8 @@
        test_all
 
 
-LDADD = $(top_builddir)/src/lib/libgcell-qa.la 
$(top_builddir)/src/lib/libgcell.la
+LDADD = \
+       $(top_builddir)/src/lib/libgcell-qa.la \
+       $(top_builddir)/src/lib/libgcell.la
 
 test_all_SOURCES = test_all.cc

Deleted: gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager.h

Deleted: gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager_impl.h

Modified: gnuradio/branches/developers/eb/gcell/src/lib/Makefile.am
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/Makefile.am   2007-11-20 
22:54:51 UTC (rev 7009)
+++ gnuradio/branches/developers/eb/gcell/src/lib/Makefile.am   2007-11-21 
03:00:30 UTC (rev 7010)
@@ -31,6 +31,7 @@
        gc_jd_stack.c
 
 libgcell_la_LIBADD = \
+       -lspe2 \
        $(GR_OMNITHREAD_LIBS)
 
 libgcell_qa_la_SOURCES = \

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h       
2007-11-20 22:54:51 UTC (rev 7009)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h       
2007-11-21 03:00:30 UTC (rev 7010)
@@ -41,19 +41,18 @@
 class gc_client_thread_info {
 public:
   gc_client_thread_info() :
-    d_allocated(0), d_cond(&d_mutex),
-    d_state(CT_RUNNING), d_jobs_were_waiting_for(0) { }
+    d_free(1), d_cond(&d_mutex),
+    d_state(CT_RUNNING) { }
 
   ~gc_client_thread_info() {
-    d_jobs_were_waiting_for = 0;
-    d_allocated = 0;
+    d_free = 1;
   }
 
-  gc_atomic_t     d_allocated; // is this gc_client_thread_info allocated ?
-  omni_mutex      d_mutex;     // mutex & condition var used to stop/start 
client
-  omni_condition   d_cond;
-  gc_ct_state_t           d_state;
-  unsigned long          *d_jobs_were_waiting_for;  // bitvector
+  volatile uint32_t    d_free;         // is this cti free? (1->free, 0->in 
use)
+  omni_mutex           d_mutex;        // used to suspend/resume client
+  omni_condition       d_cond;         // used to suspend/resume client
+  gc_ct_state_t                d_state;
+  unsigned long                d_jobs_were_waiting_for[0];  // bitvector
 };
 
 #endif /* INCLUDED_GC_CLIENT_THREAD_INFO_H */

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc     
2007-11-20 22:54:51 UTC (rev 7009)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.cc     
2007-11-21 03:00:30 UTC (rev 7010)
@@ -24,7 +24,7 @@
 #endif
 #include "gc_job_manager.h"
 
-gc_job_manager::gc_job_manager(int nspes, const gc_jm_limits *limits)
+gc_job_manager::gc_job_manager(const gc_jm_options *options)
 {
   // nop
 }

Copied: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h (from 
rev 7006, gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager.h)
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h              
                (rev 0)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h      
2007-11-21 03:00:30 UTC (rev 7010)
@@ -0,0 +1,133 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GC_JOB_MANAGER_H
+#define INCLUDED_GC_JOB_MANAGER_H
+
+#include <boost/utility.hpp>
+#include "gc_job_desc.h"
+
+class gc_job_manager;
+
+/*
+ * \brief Options that configure the job_manager.
+ * The default values are reasonable.
+ */
+struct gc_jm_options {
+  unsigned int max_jobs;           // max # of job descriptors in system
+  unsigned int max_client_threads;  // max # of client threads of job manager
+  unsigned int nspes;              // how many SPEs shall we use? 0 -> all of 
them
+  bool gang_schedule;              // shall we gang schedule?
+  bool use_affinity;               // shall we try for affinity (FIXME not 
implmented)
+
+  gc_jm_options() :
+    max_jobs(0), max_client_threads(0), nspes(0),
+    gang_schedule(true), use_affinity(false)
+  {
+  }
+};
+
+
+/*
+ * \brief Create an instance of the job manager
+ */
+gc_job_manager *
+gc_make_job_manager(const gc_jm_options *options = 0);
+
+
+/*!
+ * \brief Abstract class that manages SPE jobs.
+ *
+ * There is typically a single instance derived from this class.
+ * It is safe to call its methods from any thread.
+ */
+class gc_job_manager : boost::noncopyable
+{
+public:
+  gc_job_manager(const gc_jm_options *options = 0); 
+
+  virtual ~gc_job_manager();
+
+  /*!
+   * Stop accepting new jobs.  Wait for existing jobs to complete.
+   * Return all managed SPE's to the system.
+   */
+  virtual bool shutdown() = 0;
+
+  /*!
+   * \brief Return number of SPE's currently allocated to job manager.
+   */
+  virtual int nspes() const = 0;
+
+  /*!
+   * \brief Return a pointer to a properly aligned job descriptor.
+   * Throws if none are available.
+   */
+  virtual gc_job_desc *alloc_job_desc() = 0;
+
+  /*
+   *! Return a job descriptor previously allocated with alloc_job_desc()
+   *
+   * \param[in] jd pointer to job descriptor to free.
+   */
+  virtual void free_job_desc(gc_job_desc *jd) = 0;
+
+  /*!
+   * \brief Submit a job for asynchronous processing on an SPE.
+   *
+   * \param[in] jd pointer to job description
+   *
+   * The caller must not read or write the job description
+   * or any of the memory associated with any indirect arguments
+   * until after calling wait_job.
+   *
+   * \returns true iff the job was successfully enqueued.
+   * If submit_job returns false, check jd->status for additional info.
+   */
+  virtual bool submit_job(gc_job_desc *jd) = 0;
+
+  /*!
+   * \brief Wait for specified job to complete.
+   */
+  virtual bool wait_job(gc_job_desc *jd) = 0;
+
+#if 0
+  /*!
+   * \brief Wait for any job to complete.
+   * \param[in] jobs vector of jobs
+   *
+   * FIXME need to return info about which jobs completed.
+   */
+  int wait_any_jobs(const std::vector<gc_job_desc *> &jobs);
+
+  /*!
+   * \brief Wait for alls jobs to complete.
+   * \param[in] jobs vector of jobs
+   *
+   * FIXME need to return info about which jobs completed.
+   */
+  int wait_all_jobs(const std::vector<gc_job_desc *> &jobs);
+#endif
+
+};
+
+
+#endif /* INCLUDED_GC_JOB_MANAGER_H */

Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-20 22:54:51 UTC (rev 7009)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc        
2007-11-21 03:00:30 UTC (rev 7010)
@@ -24,25 +24,87 @@
 #endif
 #include "gc_job_manager_impl.h"
 
+//#define _XOPEN_SOURCE 600    // to get XSI version of strerror_r
+//#include <string.h>
+#include <stdio.h>
+#include <stdexcept>
+
+static const bool VERBOSE = true;
+
 static const unsigned int DEFAULT_MAX_JOBS = 256;
 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
 
-gc_job_manager_impl::gc_job_manager_impl(int nspes, const gc_jm_limits *limits)
+#if 0
+static char *
+gc_strerror(int errnum, char *buf, size_t buflen)
+{
+  // we're using the XSI version, not the GNU-specific version
+  int r = strerror_r(errnum, buf, sizeof(buf));
+  if (r != 0){                 // trouble
+    snprintf(buf, buflen, "Unknown error %d", errnum);
+  }
+  return buf;
+}
+#endif
+
+
+gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
   : d_nspes(0), d_nclients(0)
 {
-  d_limits.max_jobs = DEFAULT_MAX_JOBS;
-  d_limits.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
+  if (options != 0)
+    d_options = *options;
 
-  if (limits != 0){
-    if (limits->max_jobs != 0)
-      d_limits.max_jobs = limits->max_jobs;
-    if (limits->max_client_threads != 0)
-      d_limits.max_client_threads = limits->max_client_threads;
+  // provide the real default for those indicated with a zero
+  if (d_options.max_jobs == 0)
+    d_options.max_jobs = DEFAULT_MAX_JOBS;
+  if (d_options.max_client_threads == 0)
+    d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
+
+  int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
+  int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
+
+  if (VERBOSE){
+    printf("cpu_nodes = %d\n", ncpu_nodes);
+    for (int i = 0; i < ncpu_nodes; i++){
+      printf("node[%d].physical_spes = %2d\n", i,
+            spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
+      printf("node[%d].usable_spes   = %2d\n", i,
+            spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
+    }
   }
 
-  for (unsigned int i = 0; i < MAX_SPES; i++)
-    d_spe_context[i] = 0;
+  //
+  // sanity check requested number of spes.
+  //
+  if (d_options.nspes == 0)    // use all of them
+    d_options.nspes = nusable_spes;
+  else {
+    if (d_options.nspes > (unsigned int) nusable_spes){
+      fprintf(stderr,
+             "gc_job_manager: caller requested %d spes.  There are only %d 
available.\n",
+             d_options.nspes, nusable_spes);
+      if (d_options.gang_schedule){
+       // If we're gang scheduling we'll never get scheduled if we
+       // ask for more than are available.
+       throw std::out_of_range("not enough spes available");
+      }
+    }
+  }
 
+  if (d_options.use_affinity){
+    printf("gc_job_manager: warning: affinity request was ignored\n");
+  }
+
+  // we probably want to get smarter about partioning these on the blade server
+
+  if (d_options.gang_schedule){
+    d_gang = spe_gang_context_create(0);
+    if (d_gang == 0){
+      perror("gc_job_manager_impl[spe_gang_context_create]");
+      throw std::runtime_error("spe_gang_context_create");
+    }
+  }
+
   // FIXME more storage init.  Think about shared pointers w/ custom 
destructors
 
   gc_jd_stack_init(&d_free_list);
@@ -61,25 +123,22 @@
 int
 gc_job_manager_impl::nspes() const
 {
-  return 0;                    // FIXME
+  return d_options.nspes;
 }
 
-bool
-gc_job_manager_impl::set_nspes(int nspes)
-{
-  return false;                        // FIXME
-}
-
 gc_job_desc *
 gc_job_manager_impl::alloc_job_desc()
 {
-  return 0;
+  // stack is lock free, thus safe to call from any thread
+  return gc_jd_stack_pop(&d_free_list);
 }
 
 void
 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
 {
-  // FIXME
+  // stack is lock free, thus safe to call from any thread
+  if (jd != 0)
+    gc_jd_stack_push(&d_free_list, jd);
 }
 
 bool
@@ -94,3 +153,17 @@
   return false;                        // FIXME
 }
 
+// ------------------------------------------------------------------------
+
+worker_ctx::~worker_ctx()
+{
+  fprintf(stderr, "\n~worker_ctx\n");
+  if (spe_ctx){
+    int r = spe_context_destroy(spe_ctx);
+    if (r != 0){
+      perror("spe_context_destroy");
+    }
+    spe_ctx = 0;
+  }
+}
+

Copied: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
(from rev 7006, 
gnuradio/branches/developers/eb/gcell/src/include/gc_job_manager_impl.h)
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h         
                (rev 0)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h 
2007-11-21 03:00:30 UTC (rev 7010)
@@ -0,0 +1,146 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H
+#define INCLUDED_GC_JOB_MANAGER_IMPL_H
+
+#include "gc_job_manager.h"
+#include "gc_client_thread_info.h"
+#include "gc_jd_stack.h"
+#include <libspe2.h>
+
+class gc_job_manager_impl;
+
+enum worker_state {
+  WS_FREE,     // not in use
+  WS_INIT,     // allocated and being initialized
+  WS_RUNNING,  // the thread is running
+  WS_DEAD,     // the thread is dead
+};
+
+struct worker_ctx {
+  worker_state         state;
+  spe_context_ptr_t    spe_ctx;
+
+  worker_ctx() : state(WS_FREE), spe_ctx(0) {}
+  ~worker_ctx();
+};
+
+/*!
+ * \brief Concrete class that manages SPE jobs.
+ *
+ * This class contains all the implementation details.
+ */
+class gc_job_manager_impl : public gc_job_manager
+{
+  static const unsigned int MAX_SPES =  16;
+
+  gc_jm_options                 d_options;
+  spe_gang_context_ptr_t d_gang;
+  int                   d_nspes;               // number of SPEs we've 
allocated
+  worker_ctx            d_worker[MAX_SPES];
+
+  gc_job_desc_t                *d_jd;                  // [limits.max_jobs]
+
+  int                   d_nclients;            // # of used entries in 
d_client_thread
+  gc_client_thread_info        *d_client_thread;       // 
[limits.max_client_threads]
+
+  int                   d_bvlen;               // bit vector length in longs
+  unsigned long                *d_job_busy;            // bitvector of job's 
status 1=BUSY
+
+  gc_jd_stack_t                 d_free_list;           // stack of free job 
descriptors
+
+
+  friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);
+
+  /*!
+   * \param nspes number of SPEs job manager should use.
+   * If nspes == -1, all available SPEs will be allocated.
+   */
+  gc_job_manager_impl(const gc_jm_options *options = 0);
+
+public:
+  virtual ~gc_job_manager_impl();
+
+  /*!
+   * Stop accepting new jobs.  Wait for existing jobs to complete.
+   * Return all managed SPE's to the system.
+   */
+  virtual bool shutdown();
+
+  /*!
+   * \brief Return number of SPE's currently allocated to job manager.
+   */
+  virtual int nspes() const;
+
+  /*!
+   * \brief Return a pointer to a properly aligned job descriptor.
+   * Throws if none are available.
+   */
+  virtual gc_job_desc *alloc_job_desc();
+
+  /*
+   *! Return a job descriptor previously allocated with alloc_job_desc()
+   *
+   * \param[in] jd pointer to job descriptor to free.
+   */
+  virtual void free_job_desc(gc_job_desc *jd);
+
+  /*!
+   * \brief Submit a job for asynchronous processing on an SPE.
+   *
+   * \param[in] jd pointer to job description
+   *
+   * The caller must not read or write the job description
+   * or any of the memory associated with any indirect arguments
+   * until after calling wait_job.
+   *
+   * \returns true iff the job was successfully enqueued.
+   * If submit_job returns false, check jd->status for additional info.
+   */
+  virtual bool submit_job(gc_job_desc *jd);
+
+  /*!
+   * \brief Wait for specified job to complete.
+   */
+  virtual bool wait_job(gc_job_desc *jd);
+
+#if 0
+  /*!
+   * \brief Wait for any job to complete.
+   * \param[in] jobs vector of jobs
+   *
+   * FIXME need to return info about which jobs completed.
+   */
+  int wait_any_jobs(const std::vector<gc_job_desc *> &jobs);
+
+  /*!
+   * \brief Wait for alls jobs to complete.
+   * \param[in] jobs vector of jobs
+   *
+   * FIXME need to return info about which jobs completed.
+   */
+  int wait_all_jobs(const std::vector<gc_job_desc *> &jobs);
+#endif
+
+};
+
+#endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]