[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r7018 - gnuradio/branches/developers/eb/gcell/src/lib
From: |
eb |
Subject: |
[Commit-gnuradio] r7018 - gnuradio/branches/developers/eb/gcell/src/lib |
Date: |
Fri, 23 Nov 2007 23:48:59 -0700 (MST) |
Author: eb
Date: 2007-11-23 23:48:59 -0700 (Fri, 23 Nov 2007)
New Revision: 7018
Modified:
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-11-24 05:02:17 UTC (rev 7017)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-11-24 06:48:59 UTC (rev 7018)
@@ -56,9 +56,25 @@
}
};
+/*
+ * Return pointer to cache-aligned chunk of storage of size size bytes.
+ * Throw if can't allocate memory. The storage should be freed
+ * using "free" when done.
+ */
+static void *
+aligned_alloc(size_t size)
+{
+ void *p = 0;
+ if (posix_memalign(&p, CACHE_LINE_SIZE, size) != 0){
+ perror("posix_memalign");
+ throw std::runtime_error("memory");
+ }
+ return p;
+}
gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
- : d_nclients(0), d_client_thread(0), d_job_busy(0)
+ : d_eh_thread(0), d_eh_state(EHS_INIT), d_shutdown_requested(false),
+ d_nclients(0), d_client_thread(0), d_job_busy(0)
{
if (options != 0)
d_options = *options;
@@ -106,7 +122,6 @@
}
}
-
if (d_options.use_affinity){
printf("gc_job_manager: warning: affinity request was ignored\n");
}
@@ -135,32 +150,26 @@
// FIXME load some code in the SPE
}
-
// ----------------------------------------------------------------
// initalize the free list of job descriptors
- gc_jd_stack_init(&d_free_list);
+
+ d_free_list = (gc_jd_stack_t *) aligned_alloc(sizeof(gc_jd_stack_t));
+ // This ensures that the memory associated with d_free_list is
+ // automatically freed in the destructor or if an exception occurs
+ // here in the constructor.
+ _d_free_list_boost =
+ boost::shared_ptr<void>((void *) d_free_list, free_deleter());
+ gc_jd_stack_init(d_free_list);
printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
printf("max_jobs = %u\n", d_options.max_jobs);
// Initialize the array of job descriptors.
- void *p = 0;
- if (posix_memalign(&p, CACHE_LINE_SIZE,
- sizeof(d_jd[0]) * d_options.max_jobs) != 0){
- perror("posix_memalign");
- throw std::runtime_error("memory");
- }
- d_jd = (gc_job_desc_t *) p;
+ d_jd = (gc_job_desc_t *) aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs);
+ _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
memset(d_jd, 0, sizeof(d_jd[0]) * d_options.max_jobs);
- // This ensures that the memory associated with d_jd is
- // automatically freed in the destructor or if an exception occurs
- // here in the constructor. It is admittedly a somewhat strange
- // use; however we need the the custom deleter, which is only
- // available with shared_ptr.
- _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
-
// set unique job_id
for (int i = 0; i < (int) d_options.max_jobs; i++)
d_jd[i].sys.job_id = i;
@@ -169,13 +178,21 @@
for (int i = d_options.max_jobs - 1; i >= 0; i--)
free_job_desc(&d_jd[i]);
+ // ----------------------------------------------------------------
+ // initalize the job queue
+
+ d_queue = (gc_jd_queue_t *) aligned_alloc(sizeof(gc_jd_queue_t));
+ _d_queue_boost =
+ boost::shared_ptr<void>((void *) d_queue, free_deleter());
+ gc_jd_queue_init(d_queue);
+
// ----------------------------------------------------------------
// initialize d_client_thread
// FIXME robust deletion
d_client_thread = new gc_client_thread_info[d_options.max_client_threads];
-
+
// ----------------------------------------------------------------
// initialize bitvectors
@@ -186,16 +203,10 @@
// allocate all bitvectors in a single cache-aligned chunk
size_t nlongs = (1 + d_options.max_client_threads) * d_bvlen;
size_t byte_size = nlongs * sizeof(unsigned long);
- p = 0;
- if (posix_memalign(&p, CACHE_LINE_SIZE, byte_size) != 0){
- perror("posix_memalign");
- throw std::runtime_error("memory");
- }
+ void *p = aligned_alloc(byte_size);
+ _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
memset(p, 0, byte_size);
- // let boost keep track of the storage
- _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
-
// now point the other pointers into this storage.
// d_job_busy is the first one, followed by the ones in the client data
@@ -207,10 +218,10 @@
d_client_thread[i].d_jobs_were_waiting_for = v;
// ----------------------------------------------------------------
+ // create the spe event handler
+ create_event_handler();
- // FIXME sequence this...
- // FIXME create event handler thread
// FIXME create N worker threads
// FIXME confirm everything started OK
@@ -228,6 +239,8 @@
d_jd = 0; // handled via _d_jd_boost
d_job_busy = 0; // handled via _d_all_bitvectors
+ d_free_list = 0; // handled via _d_free_list_boost
+ d_queue = 0; // handled via _d_queue_boost
}
bool
@@ -246,7 +259,7 @@
gc_job_manager_impl::alloc_job_desc()
{
// stack is lock free, thus safe to call from any thread
- return gc_jd_stack_pop(&d_free_list);
+ return gc_jd_stack_pop(d_free_list);
}
void
@@ -254,7 +267,7 @@
{
// stack is lock free, thus safe to call from any thread
if (jd != 0)
- gc_jd_stack_push(&d_free_list, jd);
+ gc_jd_stack_push(d_free_list, jd);
}
bool
@@ -269,6 +282,34 @@
return false; // FIXME
}
+void
+gc_job_manager_impl::create_event_handler()
+{
+ // create the SPE event handler and register our interest in all events
+
+ d_spe_event_handler.ptr = spe_event_handler_create();
+ if (d_spe_event_handler.ptr == 0){
+ perror("spe_event_handler_create");
+ throw std::runtime_error("spe_event_handler_create");
+ }
+
+ for (unsigned int i = 0; i < d_options.nspes; i++){
+ spe_event_unit_t eu;
+ memset(&eu, 0, sizeof(eu));
+ eu.events = SPE_EVENT_ALL_EVENTS;
+ eu.spe = d_worker[i].spe_ctx;
+ eu.data.u32 = i; // available in events returned by spe_event_wait
+
+ if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
+ perror("spe_event_handler_register");
+ throw std::runtime_error("spe_event_handler_register");
+ }
+ }
+
+ // FIXME create the thread
+
+}
+
// ------------------------------------------------------------------------
worker_ctx::~worker_ctx()
@@ -283,3 +324,4 @@
}
state = WS_FREE;
}
+
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-11-24 05:02:17 UTC (rev 7017)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-11-24 06:48:59 UTC (rev 7018)
@@ -25,6 +25,7 @@
#include "gc_job_manager.h"
#include "gc_client_thread_info.h"
#include "gc_jd_stack.h"
+#include "gc_jd_queue.h"
#include <libspe2.h>
#include <vector>
#include <boost/shared_ptr.hpp>
@@ -42,11 +43,33 @@
struct worker_ctx {
worker_state state;
spe_context_ptr_t spe_ctx;
+ pthread_t thread;
- worker_ctx() : state(WS_FREE), spe_ctx(0) {}
+ worker_ctx() : state(WS_FREE), spe_ctx(0), thread(0) {}
~worker_ctx();
};
+enum evt_handler_state {
+ EHS_INIT, // being initialized
+ EHS_RUNNING, // thread is running
+ EHS_SHUTTING_DOWN, // in process of shutting down everything
+ EHS_DEAD, // thread is dead
+};
+
+struct spe_event_handler {
+ spe_event_handler_ptr_t ptr;
+
+ spe_event_handler() : ptr(0) {}
+ ~spe_event_handler(){
+ if (ptr){
+ if (spe_event_handler_destroy(ptr) != 0){
+ perror("spe_event_handler_destroy");
+ }
+ }
+ }
+};
+
+
/*!
* \brief Concrete class that manages SPE jobs.
*
@@ -58,8 +81,15 @@
gc_jm_options d_options;
spe_gang_context_sptr d_gang; // boost::shared_ptr
- worker_ctx d_worker[MAX_SPES];
+ worker_ctx d_worker[MAX_SPES]; // SPE ctx, thread, etc
+
+ pthread_t d_eh_thread; // the event handler thread
+ evt_handler_state d_eh_state;
+ bool d_shutdown_requested;
+ spe_event_handler d_spe_event_handler;
+
+
// All of the job descriptors are hung off of here.
// We allocate them all in a single cache aligned chunk.
gc_job_desc_t *d_jd; // [options.max_jobs]
@@ -82,11 +112,17 @@
// This points into _d_all_bitvectors.
unsigned long *d_job_busy;
- // The actually allocation chunk is hung off of d_jd.
- // This is the lock free stack where we keep track of the free ones.
- gc_jd_stack_t d_free_list; // stack of free job
descriptors
+ // Lock free stack where we keep track of the free job descriptors.
+ gc_jd_stack_t *d_free_list; // stack of free job
descriptors
+ boost::shared_ptr<void> _d_free_list_boost; // hack for automatic storage
mgmt
+ // The PPE inserts jobs here; SPEs pull jobs from here.
+ gc_jd_queue_t *d_queue; // job queue
+ boost::shared_ptr<void> _d_queue_boost; // hack for automatic storage
mgmt
+
+ void create_event_handler();
+
friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);
/*!
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r7018 - gnuradio/branches/developers/eb/gcell/src/lib,
eb <=