[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r7082 - in gnuradio/branches/developers/eb/gcell/src:
From: |
eb |
Subject: |
[Commit-gnuradio] r7082 - in gnuradio/branches/developers/eb/gcell/src: include lib |
Date: |
Thu, 6 Dec 2007 20:33:52 -0700 (MST) |
Author: eb
Date: 2007-12-06 20:33:51 -0700 (Thu, 06 Dec 2007)
New Revision: 7082
Modified:
gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager
Modified: gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/include/gc_job_desc.h
2007-12-07 03:33:51 UTC (rev 7082)
@@ -59,10 +59,16 @@
//! final job status
typedef enum {
JS_OK,
+ JS_SHUTTING_DOWN, // job mananger is shutting down
+ JS_TOO_MANY_CLIENTS, // too many client threads
JS_UNKNOWN_METHOD, // didn't recognize the method ID
- JS_BAD_SIGNATURE, // arg counts and/or types are wrong
+ JS_BAD_SIGNATURE, // arg counts and/or types are wrong for given
method
JS_BAD_ALIGNMENT, // indirect arg EA not 16-byte aligned
JS_BAD_LENGTH, // indirect arg length not a multiple of 16 bytes
long
+ JS_BAD_N_DIRECT, // too many direct args
+ JS_BAD_N_INDIRECT, // too many indirect args
+ JS_ARGS_TOO_LONG, // total length of indirect args exceed limit
+ JS_BAD_JUJU, // misc problem: you're having a bad day
} gc_job_status_t;
#define MAX_ARGS_DIRECT 8
@@ -112,8 +118,8 @@
*/
typedef struct gc_job_args
{
- uint32_t n_by_val; // # of "value" args
- uint32_t n_by_ref; // # of "indirct" args
(DMA'd in/out)
+ uint32_t n_direct; // # of "direct" args
+ uint32_t n_indirect; // # of "indirct" args
(DMA'd in/out)
uint32_t _pad[2]; // (padding)
gc_tag_t dir_tag[MAX_ARGS_DIRECT] _AL16; // type of "direct"
argument[i]
gc_arg_union_t dir_val[MAX_ARGS_DIRECT] _AL16; // "direct" argument values
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_client_thread_info.h
2007-12-07 03:33:51 UTC (rev 7082)
@@ -48,11 +48,12 @@
d_jobs_were_waiting_for = 0;
}
- volatile uint32_t d_free; // is this cti free? (1->free, 0->in
use)
+ uint32_t d_free; // is this cti free? (1->free, 0->in
use)
gc_ct_state_t d_state;
omni_mutex d_mutex; // used to suspend/resume client
omni_condition d_cond; // used to suspend/resume client
unsigned long *d_jobs_were_waiting_for; // bitvector
+ uint16_t d_client_id; // which client info are we?
};
#endif /* INCLUDED_GC_CLIENT_THREAD_INFO_H */
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager.h
2007-12-07 03:33:51 UTC (rev 7082)
@@ -106,6 +106,8 @@
/*!
* \brief Wait for specified job to complete.
+ *
+ * A thread may only wait for jobs which it submitted.
*/
virtual bool wait_job(gc_job_desc *jd) = 0;
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-12-07 03:33:51 UTC (rev 7082)
@@ -28,14 +28,23 @@
#include <stdio.h>
#include <stdexcept>
#include <stdlib.h>
+#include <atomic_dec_if_positive.h>
+static const size_t CACHE_LINE_SIZE = 128;
+
static const bool VERBOSE = true;
-static const size_t CACHE_LINE_SIZE = 128;
static const unsigned int DEFAULT_MAX_JOBS = 128;
static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
+// FIXME this really depends on the SPU code...
+static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
+
+static bool s_key_initialized = false;
+static pthread_key_t s_client_key;
+
+
// custom deleter of gang_contexts for use with boost::shared_ptr
class gang_deleter {
public:
@@ -57,7 +66,18 @@
}
};
+
/*
+ * Called when client thread is destroyed.
+ * We mark our client info free.
+ */
+static void
+client_key_destructor(void *p)
+{
+ ((gc_client_thread_info *) p)->d_free = 1;
+}
+
+/*
* Return pointer to cache-aligned chunk of storage of size size bytes.
* Throw if can't allocate memory. The storage should be freed
* with "free" when done. The memory is initialized to zero.
@@ -81,8 +101,15 @@
: d_spu_args(0),
d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
d_shutdown_requested(false),
- d_nclients(0), d_client_thread(0), d_job_busy(0)
+ d_client_thread(0), d_job_busy(0)
{
+ if (!s_key_initialized){
+ int r = pthread_key_create(&s_client_key, client_key_destructor);
+ if (r != 0)
+ throw std::runtime_error("pthread_key_create");
+ s_key_initialized = true;
+ }
+
if (options != 0)
d_options = *options;
@@ -229,9 +256,16 @@
// ----------------------------------------------------------------
// initialize d_client_thread
- // FIXME robust deletion
- d_client_thread = new gc_client_thread_info[d_options.max_client_threads];
+ {
+ gc_client_thread_info_sa cti(
+ new gc_client_thread_info[d_options.max_client_threads]);
+ d_client_thread.swap(cti);
+
+ for (unsigned int i = 0; i < d_options.max_client_threads; i++)
+ d_client_thread[i].d_client_id = i;
+ }
+
// ----------------------------------------------------------------
// initialize bitvectors
@@ -268,11 +302,6 @@
{
shutdown();
- // FIXME whatever else needs to be done
-
- delete [] d_client_thread;
- d_client_thread = 0;
-
d_jd = 0; // handled via _d_jd_boost
d_job_busy = 0; // handled via _d_all_bitvectors
d_free_list = 0; // handled via _d_free_list_boost
@@ -317,18 +346,100 @@
gc_jd_stack_push(d_free_list, jd);
}
+/*
+ * We check as much as we can here on the PPE side, so that the SPE
+ * doesn't have to.
+ */
+static bool
+check_args(gc_job_desc *jd, gc_job_args *args)
+{
+ if (args->n_direct > MAX_ARGS_DIRECT){
+ jd->status = JS_BAD_N_DIRECT;
+ return false;
+ }
+
+ if (args->n_indirect > MAX_ARGS_INDIRECT){
+ jd->status = JS_BAD_N_INDIRECT;
+ return false;
+ }
+
+ size_t total_len = 0;
+ for (unsigned int i = 0; i < args->n_indirect; i++){
+ if ((args->ind_len[i] & 0xf) != 0){
+ jd->status = JS_BAD_LENGTH;
+ return false;
+ }
+ if ((args->ind_ea[i] & 0xf) != 0){
+ jd->status = JS_BAD_ALIGNMENT;
+ return false;
+ }
+ total_len += args->ind_len[i];
+ }
+
+ if (total_len > MAX_TOTAL_INDIRECT_LENGTH){
+ jd->status = JS_ARGS_TOO_LONG;
+ return false;
+ }
+
+ return true;
+}
+
bool
gc_job_manager_impl::submit_job(gc_job_desc *jd)
{
- if (d_shutdown_requested)
+ if (d_shutdown_requested){
+ jd->status = JS_SHUTTING_DOWN;
return false;
+ }
- return false; // FIXME
+ // Ensure we've got a client_thread_info assigned to this thread.
+
+ gc_client_thread_info *cti =
+ (gc_client_thread_info *) pthread_getspecific(s_client_key);
+ if (cti == 0){
+ if ((cti = alloc_cti()) == 0){
+ fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client
threads.\n");
+ jd->status = JS_TOO_MANY_CLIENTS;
+ return false;
+ }
+ int r = pthread_setspecific(s_client_key, cti);
+ if (r != 0){
+ jd->status = JS_BAD_JUJU;
+ fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
+ return false;
+ }
+ }
+
+ // FIXME map and/or check method against all known methods
+
+ if (!check_args(jd, &jd->input))
+ return false;
+
+ if (!check_args(jd, &jd->output))
+ return false;
+
+ jd->status = JS_OK;
+ jd->sys.client_id = cti->d_client_id;
+ gc_jd_queue_enqueue(d_queue, jd);
+
+ return true;
}
bool
gc_job_manager_impl::wait_job(gc_job_desc *jd)
{
+ gc_client_thread_info *cti =
+ (gc_client_thread_info *) pthread_getspecific(s_client_key);
+ if (cti == 0)
+ return false;
+
+ if (jd->sys.client_id != cti->d_client_id){
+ fprintf(stderr, "gc_job_manager_impl::wait_job: can't want for job you
didn't submit\n");
+ return false;
+ }
+
+ // FIXME...
+
return false; // FIXME
}
@@ -699,7 +810,33 @@
////////////////////////////////////////////////////////////////////////
+gc_client_thread_info *
+gc_job_manager_impl::alloc_cti()
+{
+ for (unsigned int i = 0; i < d_options.max_client_threads; i++){
+ if (d_client_thread[i].d_free){
+ // try to atomically grab it
+ if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
+ // got it...
+ d_client_thread[i].d_state = CT_RUNNING;
+ memset(&d_client_thread[i].d_jobs_were_waiting_for, 0, d_bvlen *
sizeof(unsigned long));
+ return &d_client_thread[i];
+ }
+ }
+ }
+ return 0;
+}
+void
+gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
+{
+ assert((size_t) (cti - d_client_thread.get()) <
d_options.max_client_threads);
+ cti->d_free = 1;
+}
+
+
+////////////////////////////////////////////////////////////////////////
+
worker_ctx::~worker_ctx()
{
if (spe_ctx){
@@ -711,3 +848,4 @@
}
state = WS_FREE;
}
+
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-12-07 01:11:26 UTC (rev 7081)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-12-07 03:33:51 UTC (rev 7082)
@@ -30,8 +30,10 @@
#include <libspe2.h>
#include <vector>
#include <boost/shared_ptr.hpp>
+#include <boost/scoped_array.hpp>
typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr;
+typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa;
enum worker_state {
@@ -107,8 +109,7 @@
gc_job_desc_t *d_jd; // [options.max_jobs]
boost::shared_ptr<void> _d_jd_boost; // hack for automatic storage
mgmt
- int d_nclients; // active length of
d_client_thread
- gc_client_thread_info *d_client_thread; // [options.max_client_threads]
+ gc_client_thread_info_sa d_client_thread; // [options.max_client_threads]
// We use bitvectors to represent the busy/free state of a job. Each
// bitvector is d_bvlen longs in length.
@@ -133,6 +134,9 @@
boost::shared_ptr<void> _d_queue_boost; // hack for automatic storage
mgmt
+ gc_client_thread_info *alloc_cti();
+ void free_cti(gc_client_thread_info *cti);
+
void create_event_handler();
void set_eh_state(evt_handler_state s);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r7082 - in gnuradio/branches/developers/eb/gcell/src: include lib,
eb <=