[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
many-to-many thread/socket model
From: |
Ricardo Gameiro |
Subject: |
many-to-many thread/socket model |
Date: |
Wed, 01 Sep 2004 12:50:28 +0100 |
User-agent: |
Mozilla Thunderbird 0.6 (X11/20040502) |
Hello,
I've been trying to implement a many-to-many thread/socket relation
using common c++, and I'm having a strange problem which is showing in
Solaris, though it is working on Linux.
My goal is to have a list of "workable" sockets, that will be "worked
on" by one arbitrary thread (executing a state machine) from a thread
pool, until it is not possible for the thread, to do any additional work
on the socket (due to not having all required input available, etc),
making the thread available again for work on any "workable" socket.
To implement this, I'm using the SocketPort/SocketService class. Still,
I'm not using the "pending" method to perform work on the socket, I'm
using it to add "this" socket to the list of workable sockets
(workToDo), and post a Semaphore, on which every thread in the worker
thread pool is waiting.
This is working... more or less :-(... because if the "service" thread
polls the socket again, before the worker thread does it's job, it will
queue the socket in the workable list multiple times. Well to solve
this, I added a "setDetectPending( false )" to the "pending" method in
the SocketPort class, and added a "setDetectPending( true )" after the
worker thread finishes working on the socket.
On Linux it worked, but on Solaris it is possible to send the first
chunk of data to the socket (it is polled by the service thread the
first time), but any subsequent data will not trigger the "pending"
method on the SocketPort object.
I'm not even sure if this should be working on Linux, or if it is a
Solaris related problem, or if it is a Common C++ problem.
I'm suspecting that for some reason, the "service" thread is not aware
of the latest "setDetectPending( true )" which was executed on another
thread, though after a first quick glance on the Common C++ code, I
think it should (??).
I'm attaching a simple trimmed down piece of code wich will reproduce
the problem (on Solaris). It's an echo server.
Did this make any sense? Any ideias, help, suggestions would be appreciated.
TIA,
Ricardo Gameiro
#include <list>
#include <iostream>
#include <cc++/thread.h>
#include <cc++/socket.h>
using namespace std;
using namespace ost;
class Sock;
Mutex wtdM;
list<Sock *> workToDo;
Semaphore workAvail( 0 );
class Sock : public virtual SocketPort {
friend class SockWorker;
public:
Sock( SocketService * pService, TCPSocket &pServer ) :
SocketPort( pService, pServer )
{
tpport_t port;
cerr << "got connection from " << getPeer( &port );
cerr << ":" << port << endl;
setCompletion( false );
setTimer( 20000 ); // Hardcoded, must be changed
}
virtual void pending()
{
tpport_t port;
cerr << "pending data from " << getPeer( &port );
cerr << ":" << port << endl;
setTimer( 20000 ); // Reset timer
setDetectPending( false );
wtdM.enter();
workToDo.insert( workToDo.end(), this );
wtdM.leave();
workAvail.post();
}
virtual void expired()
{
tpport_t port;
cerr << "connection from " << getPeer( &port );
cerr << ":" << port << " expired" << endl;
delete this;
}
virtual void disconnect()
{
tpport_t port;
cerr << "connection from " << getPeer( &port );
cerr << ":" << port << " expired" << endl;
delete this;
}
};
class SockWorker : public Thread {
public:
void run()
{
while( true ) {
cerr << "child " << getId() << " is waiting for work"
<< endl;
workAvail.wait();
cerr << "child " << getId() << " was scheduled to work"
<< endl;
wtdM.enter();
list<Sock *>::iterator myWork = workToDo.begin();
workToDo.pop_front();
wtdM.leave();
Sock * mySock = *myWork;
try {
int bytesRead;
unsigned int totalRead = 0;
char ioBuffer[ 4096 ];
while( ( bytesRead = mySock->receive( ioBuffer,
sizeof( ioBuffer ) ) ) > 0 ) {
cerr << "child " << getId() << " got "
<< bytesRead << " bytes" << endl;
mySock->send( ioBuffer, bytesRead );
totalRead += bytesRead;
}
if( totalRead == 0 ) {
cerr << "child " << getId() << " got
broken sock" << endl;
delete this;
}
}
catch ( ... )
{
cerr << "sock write threw an exception" << endl;
}
mySock->setDetectPending( true );
}
}
};
class SockScheduler : public virtual Thread, public virtual TCPSocket {
public:
SockScheduler( IPV4Address & machine, int port ) :
TCPSocket( machine, port ), Thread()
{
pollSvc = new SocketService( 0 );
start();
}
void run()
{
while ( 1 ) {
try {
new Sock( pollSvc, * ( TCPSocket * ) this );
}
catch ( ... )
{
cerr << "sock accept failed" << endl;
exit();
}
}
}
private:
SocketService * pollSvc;
};
int main( int argc, char ** argv ) {
try {
IPV4Address listen_address( "0.0.0.0" );
SockScheduler * X = new SockScheduler( listen_address, 4000 );
for( int i = 0; i < 2; i++ ) {
SockWorker * Y = new SockWorker;
Y->start();
}
( ( Thread * ) X )->join();
}
catch(...) {
cerr << "caught someting" << endl;
}
}
- many-to-many thread/socket model,
Ricardo Gameiro <=