[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r7087 - in gnuradio/branches/developers/eb/gcell/src:
From: |
eb |
Subject: |
[Commit-gnuradio] r7087 - in gnuradio/branches/developers/eb/gcell/src: include lib |
Date: |
Fri, 7 Dec 2007 22:24:40 -0700 (MST) |
Author: eb
Date: 2007-12-07 22:24:39 -0700 (Fri, 07 Dec 2007)
New Revision: 7087
Modified:
gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
gnuradio/branches/developers/eb/gcell/src/include/gc_mbox.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager. Implemented wait_job and
wait_jobs methods.
Modified: gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
2007-12-07 20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
2007-12-08 05:24:39 UTC (rev 7087)
@@ -67,7 +67,7 @@
JS_BAD_LENGTH, // indirect arg length not a multiple of 16 bytes
long
JS_BAD_N_DIRECT, // too many direct args
JS_BAD_N_INDIRECT, // too many indirect args
- JS_ARGS_TOO_LONG, // total length of indirect args exceed limit
+ JS_ARGS_TOO_LONG, // total length of indirect args exceeds limit
JS_BAD_JUJU, // misc problem: you're having a bad day
} gc_job_status_t;
Modified: gnuradio/branches/developers/eb/gcell/src/include/gc_mbox.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/include/gc_mbox.h 2007-12-07
20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/include/gc_mbox.h 2007-12-08
05:24:39 UTC (rev 7087)
@@ -40,12 +40,10 @@
// PPE to SPE (sent via SPE Read Inbound Mailbox)
#define OP_EXIT 0x0 // exit now
-#define OP_PING 0x1 // send PING_REPLY with arg
-#define OP_PING_REPLY 0x2 //
// SPE to PPE (sent via SPE Write Outbound Interrupt Mailbox)
-#define OP_JOB_DONE 0x3 // arg is job_id from gc_job_desc_private
+#define OP_JOB_DONE 0x1 // arg is job_id from gc_job_desc_private
#endif /* INCLUDED_GC_MBOX_H */
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
2007-12-07 20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
2007-12-08 05:24:39 UTC (rev 7087)
@@ -24,36 +24,58 @@
#include <omnithread.h>
#include <boost/utility.hpp>
-enum gc_ct_state_t {
- CT_RUNNING,
- CT_WAITING_JOB,
+enum gc_ct_state {
+ CT_NOT_WAITING,
+ CT_WAIT_ALL,
+ CT_WAIT_ANY,
};
/*
* \brief per client-thread data used by gc_job_manager
*
- * "Client threads" are any threads that invoke methods
- * on gc_job_manager. We use pthread_set_specific to
- * store a pointer to one of these for each thread
- * that comes our way.
+ * "Client threads" are any threads that invoke methods on
+ * gc_job_manager. We use pthread_set_specific to store a pointer to
+ * one of these for each thread that comes our way.
*/
class gc_client_thread_info : boost::noncopyable {
public:
gc_client_thread_info() :
- d_free(1), d_state(CT_RUNNING), d_cond(&d_mutex),
- d_jobs_were_waiting_for(0){ }
+ d_free(1), d_cond(&d_mutex), d_state(CT_NOT_WAITING),
+ d_jobs_done(0), d_njobs_waiting_for(0),
+ d_jobs_waiting_for(0){ }
~gc_client_thread_info() {
d_free = 1;
- d_jobs_were_waiting_for = 0;
+ d_state = CT_NOT_WAITING;
+ d_jobs_done = 0;
+ d_njobs_waiting_for = 0;
+ d_jobs_waiting_for = 0;
}
- uint32_t d_free; // is this cti free? (1->free, 0->in
use)
- gc_ct_state_t d_state;
- omni_mutex d_mutex; // used to suspend/resume client
- omni_condition d_cond; // used to suspend/resume client
- unsigned long *d_jobs_were_waiting_for; // bitvector
- uint16_t d_client_id; // which client info are we?
+ //! is this cti free? (1->free, 0->in use)
+ uint32_t d_free;
+
+ //! which client info are we?
+ uint16_t d_client_id;
+
+ //! hold this mutex to manipulate anything below here
+ omni_mutex d_mutex;
+
+ //! signaled by event handler to wake client thread up
+ omni_condition d_cond;
+
+ //! Is this client waiting?
+ gc_ct_state d_state;
+
+ //! Jobs that have finished and not yet been waited for (bitvector)
+ unsigned long *d_jobs_done;
+
+ //! # of jobs we're waiting for
+ unsigned int d_njobs_waiting_for;
+
+ //! Jobs that client thread is waiting for
+ gc_job_desc **d_jobs_waiting_for;
+
};
#endif /* INCLUDED_GC_CLIENT_THREAD_INFO_H */
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
2007-12-07 20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
2007-12-08 05:24:39 UTC (rev 7087)
@@ -27,6 +27,11 @@
class gc_job_manager;
+enum gc_wait_mode {
+ GC_WAIT_ANY,
+ GC_WAIT_ALL,
+};
+
/*
* \brief Options that configure the job_manager.
* The default values are reasonable.
@@ -105,29 +110,31 @@
virtual bool submit_job(gc_job_desc *jd) = 0;
/*!
- * \brief Wait for specified job to complete.
+ * \brief Wait for job to complete.
*
* A thread may only wait for jobs which it submitted.
- */
- virtual bool wait_job(gc_job_desc *jd) = 0;
-
-#if 0
- /*!
- * \brief Wait for any job to complete.
- * \param[in] jobs vector of jobs
*
- * FIXME need to return info about which jobs completed.
+ * \returns true if sucessful, else false.
*/
- int wait_any_jobs(const std::vector<gc_job_desc *> &jobs);
+ virtual bool
+ wait_job(gc_job_desc *jd) = 0;
/*!
- * \brief Wait for alls jobs to complete.
- * \param[in] jobs vector of jobs
+ * \brief wait for 1 or more jobs to complete.
*
- * FIXME need to return info about which jobs completed.
+ * \param[input] njobs is the length of arrays \p jd and \p done.
+ * \param[input] jd are the jobs that are to be waited for.
+ * \param[output] done indicates whether the corresponding job is complete.
+ * \param[input] mode indicates whether to wait for ALL or ANY of the jobs
+ * in \p jd to complete.
+ *
+ * A thread may only wait for jobs which it submitted.
+ *
+ * \returns number of jobs completed, or -1 if error.
*/
- int wait_all_jobs(const std::vector<gc_job_desc *> &jobs);
-#endif
+ virtual int
+ wait_jobs(unsigned int njobs,
+ gc_job_desc *jd[], bool done[], gc_wait_mode mode) = 0;
};
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-12-07 20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-12-08 05:24:39 UTC (rev 7087)
@@ -113,7 +113,7 @@
: d_spu_args(0),
d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
d_shutdown_requested(false),
- d_client_thread(0), d_job_busy(0)
+ d_client_thread(0)
{
if (!s_key_initialized){
int r = pthread_key_create(&s_client_key, client_key_destructor);
@@ -286,21 +286,17 @@
d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
// allocate all bitvectors in a single cache-aligned chunk
- size_t nlongs = (1 + d_options.max_client_threads) * d_bvlen;
- size_t byte_size = nlongs * sizeof(unsigned long);
- void *p = aligned_alloc(byte_size);
+ size_t nlongs = d_bvlen * d_options.max_client_threads;
+ void *p = aligned_alloc(nlongs * sizeof(unsigned long));
_d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
- // now point the other pointers into this storage.
- // d_job_busy is the first one, followed by the ones in the client data
-
+ // Now point the gc_client_thread_info bitvectors into this storage
unsigned long *v = (unsigned long *) p;
- d_job_busy = v;
- v += d_bvlen;
for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
- d_client_thread[i].d_jobs_were_waiting_for = v;
+ d_client_thread[i].d_jobs_done = v;
+
// ----------------------------------------------------------------
// create the spe event handler & worker (SPE) threads
@@ -315,7 +311,6 @@
shutdown();
d_jd = 0; // handled via _d_jd_boost
- d_job_busy = 0; // handled via _d_all_bitvectors
d_free_list = 0; // handled via _d_free_list_boost
d_queue = 0; // handled via _d_queue_boost
}
@@ -343,6 +338,48 @@
return d_options.nspes;
}
+////////////////////////////////////////////////////////////////////////
+
+void
+gc_job_manager_impl::bv_zero(unsigned long *bv)
+{
+ memset(bv, 0, sizeof(unsigned long) * d_bvlen);
+}
+
+inline void
+gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ bv[wi] &= ~(1UL << bi);
+}
+
+inline void
+gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ bv[wi] |= (1UL << bi);
+}
+
+inline bool
+gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ return (bv[wi] & (1UL << bi)) != 0;
+}
+
+inline bool
+gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
+{
+ unsigned int wi = bitno / (sizeof (unsigned long) * 8);
+ unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
+ return (bv[wi] & (1UL << bi)) == 0;
+}
+
+////////////////////////////////////////////////////////////////////////
+
gc_job_desc *
gc_job_manager_impl::alloc_job_desc()
{
@@ -358,6 +395,8 @@
gc_jd_stack_push(d_free_list, jd);
}
+////////////////////////////////////////////////////////////////////////
+
/*
* We check as much as we can here on the PPE side, so that the SPE
* doesn't have to.
@@ -365,30 +404,30 @@
static bool
check_args(gc_job_desc *jd, gc_job_args *args)
{
- if (args->n_direct > MAX_ARGS_DIRECT){
+ if (unlikely(args->n_direct > MAX_ARGS_DIRECT)){
jd->status = JS_BAD_N_DIRECT;
return false;
}
- if (args->n_indirect > MAX_ARGS_INDIRECT){
+ if (unlikely(args->n_indirect > MAX_ARGS_INDIRECT)){
jd->status = JS_BAD_N_INDIRECT;
return false;
}
size_t total_len = 0;
for (unsigned int i = 0; i < args->n_indirect; i++){
- if ((args->ind_len[i] & 0xf) != 0){
+ if (unlikely((args->ind_len[i] & 0xf) != 0)){
jd->status = JS_BAD_LENGTH;
return false;
}
- if ((args->ind_ea[i] & 0xf) != 0){
+ if (unlikely((args->ind_ea[i] & 0xf) != 0)){
jd->status = JS_BAD_ALIGNMENT;
return false;
}
total_len += args->ind_len[i];
}
- if (total_len > MAX_TOTAL_INDIRECT_LENGTH){
+ if (unlikely(total_len > MAX_TOTAL_INDIRECT_LENGTH)){
jd->status = JS_ARGS_TOO_LONG;
return false;
}
@@ -399,7 +438,7 @@
bool
gc_job_manager_impl::submit_job(gc_job_desc *jd)
{
- if (d_shutdown_requested){
+ if (unlikely(d_shutdown_requested)){
jd->status = JS_SHUTTING_DOWN;
return false;
}
@@ -408,7 +447,7 @@
gc_client_thread_info *cti =
(gc_client_thread_info *) pthread_getspecific(s_client_key);
- if (cti == 0){
+ if (unlikely(cti == 0)){
if ((cti = alloc_cti()) == 0){
fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client
threads.\n");
jd->status = JS_TOO_MANY_CLIENTS;
@@ -432,29 +471,85 @@
jd->status = JS_OK;
jd->sys.client_id = cti->d_client_id;
+
+ // FIXME keep count of jobs in progress?
+
gc_jd_queue_enqueue(d_queue, jd);
-
return true;
}
bool
gc_job_manager_impl::wait_job(gc_job_desc *jd)
{
+ bool done;
+ return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1;
+}
+
+int
+gc_job_manager_impl::wait_jobs(unsigned int njobs,
+ gc_job_desc *jd[],
+ bool done[],
+ gc_wait_mode mode)
+{
+ unsigned int i;
+
gc_client_thread_info *cti =
(gc_client_thread_info *) pthread_getspecific(s_client_key);
- if (cti == 0)
- return false;
+ if (unlikely(cti == 0))
+ return -1;
- if (jd->sys.client_id != cti->d_client_id){
- fprintf(stderr, "gc_job_manager_impl::wait_job: can't wait for a job you
didn't submit\n");
- return false;
+ for (i = 0; i < njobs; i++){
+ done[i] = false;
+ if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
+ fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job
you didn't submit\n");
+ return -1;
+ }
}
- // FIXME...
-
- return false; // FIXME
+ {
+ omni_mutex_lock l(cti->d_mutex);
+
+ // setup info for event handler
+ cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
+ cti->d_njobs_waiting_for = njobs;
+ cti->d_jobs_waiting_for = jd;
+
+ unsigned int ndone = 0;
+
+ // wait for jobs to complete
+
+ while (1){
+ ndone = 0;
+ for (i= 0; i < njobs; i++){
+ if (done[i])
+ ndone++;
+ else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
+ bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
+ done[i] = true;
+ ndone++;
+ }
+ }
+
+ if (mode == GC_WAIT_ANY && ndone > 0)
+ break;
+
+ if (mode == GC_WAIT_ALL && ndone == njobs)
+ break;
+
+ // FIXME what happens when somebody calls shutdown?
+
+ cti->d_cond.wait(); // wait for event handler to wake us up
+ }
+
+ cti->d_state = CT_NOT_WAITING;
+ cti->d_njobs_waiting_for = 0; // tidy up (not reqd)
+ cti->d_jobs_waiting_for = 0; // tidy up (not reqd)
+ return ndone;
+ }
}
+////////////////////////////////////////////////////////////////////////
+
bool
gc_job_manager_impl::send_all_spes(uint32_t msg)
{
@@ -623,6 +718,45 @@
}
void
+gc_job_manager_impl::notify_client_job_is_done(unsigned int job_id)
+{
+ const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL
ERROR)";
+ if (job_id >= d_options.max_jobs){
+ // internal error, shouldn't happen
+ fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
+ return;
+ }
+ gc_job_desc *jd = &d_jd[job_id];
+
+ if (jd->sys.client_id >= d_options.max_client_threads){
+ // internal error, shouldn't happen
+ fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
+ return;
+ }
+ gc_client_thread_info *cti = &d_client_thread[jd->sys.client_id];
+
+ {
+ omni_mutex_lock l(cti->d_mutex);
+
+ // mark job done
+ bv_set(cti->d_jobs_done, job_id);
+
+ // FIXME we could/should distinguish between CT_WAIT_ALL & CT_WAIT_ANY
+
+ switch (cti->d_state){
+ case CT_WAIT_ANY:
+ case CT_WAIT_ALL:
+ cti->d_cond.signal(); // wake client thread up
+ break;
+
+ case CT_NOT_WAITING:
+ default:
+ break; // nop
+ }
+ }
+}
+
+void
gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
{
print_event(evt);
@@ -643,14 +777,10 @@
switch(MBOX_MSG_OP(msg[i])){
case OP_JOB_DONE:
printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
+ notify_client_job_is_done(MBOX_MSG_ARG(msg[i]));
break;
- case OP_PING_REPLY:
- printf("eh: ping_reply (0x%08x) from spu[%d]\n", msg[i], spe_num);
- break;
-
case OP_EXIT:
- case OP_PING:
default:
printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
break;
@@ -831,9 +961,13 @@
// try to atomically grab it
if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
// got it...
- d_client_thread[i].d_state = CT_RUNNING;
- memset(&d_client_thread[i].d_jobs_were_waiting_for, 0, d_bvlen *
sizeof(unsigned long));
- return &d_client_thread[i];
+ gc_client_thread_info *cti = &d_client_thread[i];
+ cti->d_state = CT_NOT_WAITING;
+ bv_zero(cti->d_jobs_done);
+ cti->d_njobs_waiting_for = 0;
+ cti->d_jobs_waiting_for = 0;
+
+ return cti;
}
}
}
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-12-07 20:16:10 UTC (rev 7086)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-12-08 05:24:39 UTC (rev 7087)
@@ -113,20 +113,15 @@
gc_client_thread_info_sa d_client_thread; // [options.max_client_threads]
- // We use bitvectors to represent the busy/free state of a job. Each
+ // We use bitvectors to represent the completing state of a job. Each
// bitvector is d_bvlen longs in length.
int d_bvlen; // bit vector length in longs
// This contains the storage for all the bitvectors used by the job
- // manager. There's 1 for d_job_busy and 1 for each client thread
- // [options.max_client_threads]. We allocate them all in a single
- // cache aligned chunk.
+ // manager. There's 1 for each client thread, in the d_jobs_done
+ // field. We allocate them all in a single cache aligned chunk.
boost::shared_ptr<void> _d_all_bitvectors; // hack for automatic storage
mgmt
- // bitvector that describes that state of all jobs (1=BUSY).
- // This points into _d_all_bitvectors.
- unsigned long *d_job_busy;
-
// Lock free stack where we keep track of the free job descriptors.
gc_jd_stack_t *d_free_list; // stack of free job
descriptors
boost::shared_ptr<void> _d_free_list_boost; // hack for automatic storage
mgmt
@@ -141,6 +136,7 @@
void create_event_handler();
void set_eh_state(evt_handler_state s);
+ void notify_client_job_is_done(unsigned int job_id);
public:
void event_handler_loop(); // really private
@@ -151,6 +147,13 @@
void print_event(spe_event_unit_t *evt);
void handle_event(spe_event_unit_t *evt);
+ // bitvector ops
+ void bv_zero(unsigned long *bv);
+ void bv_clr(unsigned long *bv, unsigned int bitno);
+ void bv_set(unsigned long *bv, unsigned int bitno);
+ bool bv_isset(unsigned long *bv, unsigned int bitno);
+ bool bv_isclr(unsigned long *bv, unsigned int bitno);
+
friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);
gc_job_manager_impl(const gc_jm_options *options = 0);
@@ -197,28 +200,31 @@
virtual bool submit_job(gc_job_desc *jd);
/*!
- * \brief Wait for specified job to complete.
- */
- virtual bool wait_job(gc_job_desc *jd);
-
-#if 0
- /*!
- * \brief Wait for any job to complete.
- * \param[in] jobs vector of jobs
+ * \brief Wait for job to complete.
*
- * FIXME need to return info about which jobs completed.
+ * A thread may only wait for jobs which it submitted.
+ *
+ * \returns true if sucessful, else false.
*/
- int wait_any_jobs(const std::vector<gc_job_desc *> &jobs);
+ virtual bool
+ wait_job(gc_job_desc *jd);
/*!
- * \brief Wait for alls jobs to complete.
- * \param[in] jobs vector of jobs
+ * \brief wait for 1 or more jobs to complete.
*
- * FIXME need to return info about which jobs completed.
+ * \param[input] njobs is the length of arrays \p jd and \p done.
+ * \param[input] jd are the jobs that are to be waited for.
+ * \param[output] done indicates whether the corresponding job is complete.
+ * \param[input] mode indicates whether to wait for ALL or ANY of the jobs
+ * in \p jd to complete.
+ *
+ * A thread may only wait for jobs which it submitted.
+ *
+ * \returns number of jobs completed, or -1 if error.
*/
- int wait_all_jobs(const std::vector<gc_job_desc *> &jobs);
-#endif
-
+ virtual int
+ wait_jobs(unsigned int njobs,
+ gc_job_desc *jd[], bool done[], gc_wait_mode mode);
};
#endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r7087 - in gnuradio/branches/developers/eb/gcell/src: include lib,
eb <=