[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3072 - in freeway/src/org/gnu/freeway: . server test trans
From: |
mdonoughe |
Subject: |
[GNUnet-SVN] r3072 - in freeway/src/org/gnu/freeway: . server test transport/tcp util/net |
Date: |
Tue, 27 Jun 2006 21:45:08 -0700 (PDT) |
Author: mdonoughe
Date: 2006-06-27 21:45:03 -0700 (Tue, 27 Jun 2006)
New Revision: 3072
Removed:
freeway/src/org/gnu/freeway/util/net/TCPServer.java
freeway/src/org/gnu/freeway/util/net/TCPSession.java
Modified:
freeway/src/org/gnu/freeway/AbstractClient.java
freeway/src/org/gnu/freeway/server/ClientServer.java
freeway/src/org/gnu/freeway/test/TCPTest.java
freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java
freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java
freeway/src/org/gnu/freeway/util/net/CSServer.java
freeway/src/org/gnu/freeway/util/net/CSSession.java
Log:
Move TCPServer and TCPSession into CSServer and CSSession because they
are the only implementations
Modified: freeway/src/org/gnu/freeway/AbstractClient.java
===================================================================
--- freeway/src/org/gnu/freeway/AbstractClient.java 2006-06-28 04:24:55 UTC
(rev 3071)
+++ freeway/src/org/gnu/freeway/AbstractClient.java 2006-06-28 04:45:03 UTC
(rev 3072)
@@ -60,7 +60,7 @@
return null;
}
- session=new TCPSession();
+ session=new CSSession();
if (!session.connect(ip,port,true)) {
log(Level.SEVERE,"Could not connect to gnunetd !");
Modified: freeway/src/org/gnu/freeway/server/ClientServer.java
===================================================================
--- freeway/src/org/gnu/freeway/server/ClientServer.java 2006-06-28
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/server/ClientServer.java 2006-06-28
04:45:03 UTC (rev 3072)
@@ -34,7 +34,7 @@
private Stat bytesOut;
- private TCPServer server;
+ private CSServer server;
private PersistentDecoder decoder;
@@ -44,7 +44,7 @@
super(true);
handlers=new ArrayList();
exitHandlers=new LinkedList();
- server=new TCPServer("C/S",this);
+ server=new CSServer("C/S",this);
decoder=null;
}
@@ -150,7 +150,7 @@
// debug("Accepted connection from
"+ip.getHostAddress()+":"+socket.socket().getPort()+".");
- hd=new TCPSession(server);
+ hd=new CSSession(server);
return (hd.connect(socket,true) ? hd : null);
}
Modified: freeway/src/org/gnu/freeway/test/TCPTest.java
===================================================================
--- freeway/src/org/gnu/freeway/test/TCPTest.java 2006-06-28 04:24:55 UTC
(rev 3071)
+++ freeway/src/org/gnu/freeway/test/TCPTest.java 2006-06-28 04:45:03 UTC
(rev 3072)
@@ -46,7 +46,7 @@
imax=4;
for (i=0; i<imax; i++) {
- acceptSocket=new TCPSession();
+ acceptSocket=new CSSession();
if (acceptSocket.connect(doAccept(serverSocket),true)) {
acceptSocket.setBlocking(true);
Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java
===================================================================
--- freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java 2006-06-28
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java 2006-06-28
04:45:03 UTC (rev 3072)
@@ -18,7 +18,7 @@
* Transport Session handle.
*/
-public class TCPSession extends org.gnu.freeway.util.net.TCPSession implements
Session
+public class TCPSession extends org.gnu.freeway.util.net.CSSession implements
Session
{
/** after how much time of the core not being associated with a tcp
connection anymore do we close it ? */
public static final long TIME_OUT =
Scheduler.SECS_30;
@@ -45,7 +45,7 @@
private long lastWrite;
- public TCPSession( TCPServer s, TCPTransport t )
+ public TCPSession( CSServer s, TCPTransport t )
{
super(s);
transport=t;
Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java
===================================================================
--- freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28
04:45:03 UTC (rev 3072)
@@ -26,7 +26,7 @@
private StatusCallsService status;
/** */
- private TCPServer server;
+ private CSServer server;
/** */
private int mtu;
@@ -44,7 +44,7 @@
public TCPTransport()
{
super(TCP_PROTOCOL_NUMBER,"TCP");
- server=new TCPServer("TCP PEER SERVER",this);
+ server=new CSServer("TCP PEER SERVER",this);
}
public String toString()
Modified: freeway/src/org/gnu/freeway/util/net/CSServer.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/CSServer.java 2006-06-28 04:24:55 UTC
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/CSServer.java 2006-06-28 04:45:03 UTC
(rev 3072)
@@ -4,24 +4,645 @@
package org.gnu.freeway.util.net;
+import org.gnu.freeway.util.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.logging.*;
+
/**
*
*/
-public interface CSServer
+public class CSServer extends LoggedObject
{
- public String getLabel();
+ /** */
+ private static final long SELECT_TIMEOUT =
Scheduler.SECS_3;
- public boolean isLaunched();
- public boolean launch( int port );
- public boolean shutdown();
+ /** Maximum of pending unhandled connections. */
+ private static final int MAX_QUEUED_REQUESTS =
5;
+ /** Maximum number of concurrent allowed sessions. */
+ private static final int MAX_SESSIONS
= 64;
+
+ /** Name of this server (for debugging purpose only). */
+ private String label;
+
+ /** Selector of the server thread */
+ private Selector selector;
+
+ /** The TCP socket that we listen on for new inbound connections. */
+ private ServerSocketChannel server;
+
+ /** Thread for listening for new connections. */
+ private MasterTask listenTask;
+
+ /** Thread for accepting new connections. */
+ private SlaveTask acceptTask;
+
+ /** Thread for reading on all open sockets. */
+ private SlaveTask readTask;
+
+ /** Thread for writing on all open sockets. */
+ private SlaveTask writeTask;
+
+ /** Should the listen thread exit ? */
+ private boolean running;
+
+ /** Array of currently active TCP sessions. */
+ private CSSession[] sessions;
+
+ /** */
+ private int sessionCount;
+
+ /** Sessions' current operations. */
+ private int[] sessionsOps;
+
+ /** */
+ private boolean acceptingOp;
+
+ /** Sessions lock. */
+ private Object internal;
+
+ /** */
+ private CSSessionHandler handler;
+
+
+ public CSServer( String str, CSSessionHandler h )
+ {
+ super(true);
+ label=str;
+ selector=null;
+ server=null;
+ listenTask=new MasterTask("TCP-LISTEN("+str+")",new
EvalAction(this,"performListen"));
+ acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new
EvalAction(this,"performAccept"));
+ readTask=listenTask.create("TCP-READ("+str+")",new
EvalAction(this,"performRead"));
+ writeTask=listenTask.create("TCP-WRITE("+str+")",new
EvalAction(this,"performWrite"));
+ running=false;
+ sessions=new CSSession[0];
+ sessionCount=0;
+ sessionsOps=new int[0];
+ acceptingOp=false;
+ internal=new Object();
+ handler=h;
+ }
+
+ public String toString()
+ {
+ return "Abstract TCP server";
+ }
+
+
+
////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public String getLabel()
+ {
+ return label;
+ }
+
+ public boolean isLaunched()
+ {
+ return running;
+ }
+
+ public boolean launch( int port )
+ {
+ int secs;
+
+ log(Level.INFO,label+" Launching TCP server...");
+
+ // open selector
+ try {
+ selector=Selector.open();
+ }
+ catch( IOException x ) {
+ err("Could not create selector !",x);
+ return false;
+ }
+
+ // create server socket
+ secs=5;
+ while (server==null && secs<60) {
+ try {
+ server=ServerSocketChannel.open();
+ server.configureBlocking(false);
+ server.socket().setReuseAddress(true);
+ server.socket().bind(new
InetSocketAddress(port),MAX_QUEUED_REQUESTS);
+ log(Level.INFO,label+" TCP server bound to port
"+port+".");
+ }
+ catch( IOException x ) {
+ err("Failed to open socket at port "+port+".
Trying again in "+secs+" seconds...",x);
+
+ Scheduler.sleep(Scheduler.seconds(secs));
+ secs+=5; // slow progression...
+
+ if (server!=null) {
+ try {
+ server.close();
+ }
+ catch( IOException xx ) {
+ }
+ server=null;
+ }
+ }
+ }
+
+ if (server==null) {
+ log(Level.SEVERE,label+" Could not create socket,
abort.");
+ try {
+ selector.close();
+ }
+ catch( IOException x ) {
+ }
+ selector=null;
+ return false;
+ }
+
+ // start listening thread
+ running=true;
+ listenTask.launch();
+ return true;
+ }
+
+ public boolean shutdown()
+ {
+ int i;
+
+ // signal listening thread
+ running=false;
+ selector.wakeup();
+
+ // stop listening thread
+ listenTask.shutdown();
+
+ try {
+ server.close();
+ }
+ catch( IOException x ) {
+ err("Failed to close socket !",x);
+ return false;
+ }
+ finally {
+ server=null;
+ try {
+ selector.close();
+ }
+ catch( IOException x ) {
+ err("Failed to close selector !",x);
+ return false;
+ }
+ finally {
+ selector=null;
+ }
+ }
+ log(Level.INFO,label+" TCP server stopped.");
+
+ synchronized(internal) {
+ for (i=0; i<sessions.length; i++) {
+ if (sessions[i]!=null) {
+ log(Level.WARNING,label+" Session still
alive : "+sessions[i].getLabel());
+ destroySession(i);
+ }
+ }
+ }
+ return true;
+ }
+
+ public void wakeUp()
+ {
+ selector.wakeup();
+ }
+
/**
- * Add manually a session to the pool of listened sessions.
+ * Add session to the pool of listened sessions. If it can't be added,
session will be disconnected and false returned.
*
- * @param s The session to add.
- * @return True if okay (enough ressources), false otherwise.
+ * @param s
+ * @return
+ * @see CSSession#disconnect()
*/
- public boolean register( CSSession s );
+ public boolean register( CSSession s )
+ {
+ if (addSession(s)>=0) {
+ // signal the thread that is blocked in a select call
that the set of sockets to listen to has changed
+ selector.wakeup();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Listen for incoming connections.
+ * Main method for the thread listening on the tcp socket and all tcp
connections.
+ * Whenever a message is received, it is processed by the handler.
+ * This thread waits for activity on any of the TCP connections and
processes deferred (async) writes and buffers reads
+ * until an entire message has been received.
+ *
+ * @throws IOException
+ */
+
+ public void performListen() throws IOException
+ {
+ SelectionKey key;
+ Iterator iter;
+ int mergedOps,ops,ret,i;
+
+ acceptingOp=true;
+
+ while (running) {
+ synchronized(internal) {
+ server.register(selector,(acceptingOp ?
SelectionKey.OP_ACCEPT : 0));
+
+ for (i=0; i<sessions.length; i++) {
+ if (sessions[i]!=null) {
+ if (sessions[i].isConnected())
{ // always check because impl. may disconnect after timeout...
+
key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i]));
+ if (key!=null) {
+ key.attach(new
Integer(i));
+ }
+ else {
+
destroySession(i);
+ }
+ }
+ else {
+ // clean up (depends on
session implementation : timeout detected, other side closed connection...)
+ destroySession(i);
+ }
+ }
+ }
+ }
+
+ // should wake up regularly (to clean up sessions...)
+ ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT));
+ if (ret==0) {
+ continue;
+ }
+
+ synchronized(internal) {
+ mergedOps=0;
+
+ iter=selector.selectedKeys().iterator();
+ while (iter.hasNext()) {
+ key=(SelectionKey) iter.next();
+ iter.remove();
+
+ if (key.isValid()) {
+ ops=key.readyOps();
+ mergedOps|=ops;
+ if ((ops &
SelectionKey.OP_ACCEPT)==0) { // read or write op
+ i=((Number)
key.attachment()).intValue();
+ sessionsOps[i]|=ops;
+ }
+ }
+ }
+
+ debug(label+" Selected #"+ret+" sockets with
merged ops { "+NetUtils.labelForOps(mergedOps)+" }.");
+
+ // signal appropriate tasks
+ if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) {
+ acceptingOp=false;
+ acceptTask.signal();
+ }
+ if ((mergedOps & SelectionKey.OP_READ)!=0) {
+ readTask.signal();
+ }
+ if ((mergedOps & SelectionKey.OP_WRITE)!=0) {
+ writeTask.signal();
+ }
+ }
+ }
+
+ // shutdown... close all sessions
+ synchronized(internal) {
+ for (i=0; i<sessions.length; i++) {
+ if (sessions[i]!=null) {
+ destroySession(i);
+ }
+ }
+ }
+ }
+
+ public void performAccept()
+ {
+ CSSession s;
+ SocketChannel c;
+
+ try {
+ for (c=server.accept(); c!=null; c=server.accept()) {
+ s=handler.handleAccept(c);
+ if (s!=null) {
+ if (addSession(s)<0) {
+ s.disconnect();
+ }
+ }
+ else {
+ try {
+ c.close();
+ }
+ catch( IOException xx ) {
+ err("Failed to close channel
!",xx);
+ }
+ }
+ }
+ }
+ catch( IOException x ) {
+ err("Failed to accept new connection !",x);
+ }
+
+ synchronized(internal) {
+ acceptingOp=true;
+ }
+
+ selector.wakeup();
+ }
+
+ public void performRead()
+ {
+ CSSession s;
+ int len;
+
+ do {
+ s=firstSessionWithOp(SelectionKey.OP_READ);
+ if (s!=null) {
+ len=s.doReceive();
+ if (len>0 && handler.handleRead(s,len)) {
+ clearSessionOp(s,SelectionKey.OP_READ);
+ }
+ else {
+ debug(s.getLabel()+" End of stream.");
+ destroySession(s);
+ }
+ }
+ }
+ while (s!=null);
+
+ // signal the thread that is blocked in a select call that the
set of sockets to listen to has changed
+ selector.wakeup();
+ }
+
+ public void performWrite()
+ {
+ CSSession s;
+ int len;
+
+ do {
+ s=firstSessionWithOp(SelectionKey.OP_WRITE);
+ if (s!=null) {
+ len=s.doSend();
+ if (len>0 && handler.handleWrite(s,len)) {
+ clearSessionOp(s,SelectionKey.OP_WRITE);
+ }
+ else {
+ debug(s.getLabel()+" End of stream.");
+ destroySession(s);
+ }
+ }
+ }
+ while (s!=null);
+
+ // signal the thread that is blocked in a select call that the
set of sockets to listen to has changed
+ selector.wakeup();
+ }
+
+ /**
+ * Add a new session to the array watched by the select thread. Grows
the array if needed.
+ *
+ * @param s Session to add.
+ * @return Index of added session, or -1 on error.
+ */
+
+ protected int addSession( CSSession s )
+ {
+ CSSession[] tmp;
+ int[] tmp2;
+ int i;
+
+ synchronized(internal) {
+ if (sessionCount==MAX_SESSIONS) {
+ log(Level.WARNING,"Too many sessions
("+MAX_SESSIONS+"), ignore connection.");
+ return -1;
+ }
+
+ for (i=0; i<sessions.length && sessions[i]!=null; i++)
{}
+ if (i==sessions.length) {
+ tmp=new CSSession[sessions.length+16];
+ Arrays.fill(tmp,null);
+
System.arraycopy(sessions,0,tmp,0,sessions.length);
+ sessions=tmp;
+
+ tmp2=new int[sessionsOps.length+16];
+ Arrays.fill(tmp2,0);
+
System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length);
+ sessionsOps=tmp2;
+ }
+
+ sessions[i]=s;
+ sessionsOps[i]=0;
+ sessionCount++;
+ debug("Add session at slot #"+i+"
"+Utils.gauge(sessionCount,sessions.length)+".");
+ return i;
+ }
+ }
+
+ protected CSSession firstSessionWithOp( int op )
+ {
+ int i;
+
+ synchronized(internal) {
+ for (i=0; i<sessionsOps.length && (sessionsOps[i] &
op)==0; i++) {}
+ return (i<sessionsOps.length ? sessions[i] : null);
+ }
+ }
+
+ protected boolean clearSessionOp( CSSession s, int op )
+ {
+ int i;
+
+ assert(s!=null);
+
+ synchronized(internal) {
+ for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
+ if (i==sessions.length) {
+ log(Level.WARNING,label+" Session not found :
"+s.getLabel()+".");
+ return false;
+ }
+
+ sessionsOps[i]&=~op;
+ return true;
+ }
+ }
+
+ protected boolean destroySession( CSSession s )
+ {
+ int i;
+
+ assert(s!=null);
+
+ synchronized(internal) {
+ for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
+ if (i==sessions.length) {
+ log(Level.WARNING,label+" Session not found :
"+s.getLabel()+".");
+ return false;
+ }
+
+ return destroySession(i);
+ }
+ }
+
+ /**
+ * The client has disconnected. Close the socket, free the buffers,
unlink session from the linked list.
+ * Remove a session, either the other side closed the connection or we
have otherwise reason to believe
+ * that it should better be killed.
+ *
+ * @param index index to the session handle
+ * @return
+ */
+
+ protected boolean destroySession( int index )
+ {
+ assert(index>=0);
+
+ synchronized(internal) {
+ if (index>=sessions.length || sessions[index]==null) {
+ log(Level.WARNING,label+" No session at slot
"+index+".");
+ return false;
+ }
+
+ if (sessions[index].isConnected()) {
+ sessions[index].disconnect();
+ }
+
+ handler.handleDestroy(sessions[index]);
+
+ sessions[index]=null;
+ sessionsOps[index]=0;
+ sessionCount--;
+ debug("Destroyed session at slot #"+index+"
"+Utils.gauge(sessionCount,sessions.length)+".");
+ }
+ return true;
+ }
}
+
+
+/*
+ public void add( int id, Class c, CommandHandler h )
+ {
+ decoder.add(id,c);
+ setHandler(c,h);
+ }
+
+ public void setDefault( int id, Class c, CommandHandler h )
+ {
+ decoder.add(id,c);
+ setHandler(c,h);
+ decoder.setDefault(id);
+ }
+
+ public void setCorrupted( int id, Class c, CommandHandler h )
+ {
+ decoder.add(id,c);
+ setHandler(c,h);
+ decoder.setCorrupted(id);
+ }
+
+ public boolean hasHandler( Class c )
+ {
+ synchronized(handlers) {
+ return handlers.get(c)!=null;
+ }
+ }
+
+ public CommandHandler getHandler( Class c )
+ {
+ synchronized(handlers) {
+ return (CommandHandler) handlers.get(c);
+ }
+ }
+
+ public boolean setHandler( Class c, CommandHandler h )
+ {
+ synchronized(handlers) {
+ if (handlers.get(c)!=null) {
+ log(Level.WARNING,"Could not assign handler,
class "+c.getName()+" is already registered.");
+ return false;
+ }
+ handlers.put(c,h);
+ return true;
+ }
+ }
+
+ public boolean removeHandler( Class c, CommandHandler h )
+ {
+ synchronized(handlers) {
+ if (handlers.get(c)==null) {
+ log(Level.WARNING,"Could not remove handler,
class "+c.getName()+" is not registered.");
+ return false;
+ }
+ handlers.remove(c);
+ return true;
+ }
+ }
+
+
+deriver de TCP Server:
+ public void start()
+ {
+ add(ProxyCommand.HELLO_ID,ProxyHello.class,this);
+ add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this);
+ add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this);
+
+ setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this);
+
setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this);
+
+ super.start();
+ }
+
+ public void stop()
+ {
+ super.stop();
+ }
+
+ public TCP Session createSession( PersistentDecoder decoder )
+ {
+ return new ProxySession(decoder);
+ }
+
+ public boolean handle( TCP Session session, Persistent p )
+ {
+ ProxySession s;
+ ProxyToken token;
+ int id;
+
+ id=((ProxyCommand) p).getID();
+
+ s=(ProxySession) session;
+ if (!s.isWelcomed()) {
+ if (id!=ProxyCommand.HELLO_ID) {
+ log("No hello received, close session.");
+ return false;
+ }
+ s.welcome();
+
+ token=new ProxyToken(s.getToken());
+ token.setAddress("",0);
+ sendToClient(s,token);
+ return true;
+ }
+
+ switch (id) {
+ case ProxyCommand.CONNECT_ID:
+ break;
+
+ case ProxyCommand.SETBLOCKING_ID:
+ break;
+
+ default:
+ log("Unknown message : "+p);
+ return false;
+ }
+ return true;
+ }
+
+*/
Modified: freeway/src/org/gnu/freeway/util/net/CSSession.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:24:55 UTC
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:45:03 UTC
(rev 3072)
@@ -4,36 +4,407 @@
package org.gnu.freeway.util.net;
+import org.gnu.freeway.util.*;
+
import java.net.*;
import java.nio.channels.*;
+import java.util.logging.*;
/**
+ * Per-client data structure (kept in linked list). Also: the opaque
+ * handle for client connections passed by the core to the CSHandlers.
+ * Opaque handle for client connections passed by
+ * the core to the CSHandlers.
*
+ * A connection to a freeway client application. To be used in non-blocking
mode.
+ *
+ * Struct to refer to a GNUnet TCP connection.
+ * This is more than just a socket because if the server
+ * drops the connection, the client automatically tries
+ * to reconnect (and for that needs connection information).
+ *
+ * Code for synchronized access to TCP streams
+ *
+ * Generic TCP code for reliable, mostly blocking, record-oriented TCP
+ * connections. GNUnet uses the "tcpio" code for trusted client-server
+ * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note
+ * that an unblocking write is also provided since if both client and
+ * server use blocking IO, both may block on a write and cause a
+ * mutual inter-process deadlock.
+ *
+ * Since we do not want other peers (!) to be able to block a peer by
+ * not reading from the TCP stream, the peer-to-peer TCP transport
+ * uses unreliable, buffered, non-blocking, record-oriented TCP code
+ * with a select call to reduce the number of threads which is
+ * provided in transports/tcp.c.
+ * Generic TCP code. This module is used to receive or send records
+ * (!) from a TCP stream. The code automatically attempts to
+ * re-connect if the other side closes the connection.<br>
+ *
+ * The code can be used on the server- or the client side, just in
+ * case of the server the reconnect can of course not be used. The TCP
+ * stream is broken into records of maximum length MAX_BUFFER_SIZE,
+ * each preceeded by a 16 bits integer (not signed) giving the length of the
+ * following record.<p>
*/
-public interface CSSession
+public class CSSession extends LoggedObject
{
- public String getLabel();
+ /** */
+ private CSServer server;
- public int getOps();
- public SelectionKey registerOps( Selector sel, int ops );
+ /** Socket to communicate with the other side. */
+ private PersistentSocket socket;
- public boolean isConnected();
- public boolean connect( InetAddress ip, int port, boolean careful );
- public boolean connect( SocketChannel channel, boolean careful );
- public boolean disconnect();
+ /** */
+ private String label;
- public boolean isBlocking();
- public void setBlocking( boolean flag );
+ /** Lock used to synchronized read operations. */
+ protected Object readLock;
- public int doReceive();
- public boolean hasReceived();
- public Persistent receive( Class c );
- public Persistent receive( PersistentDecoder decoder );
+ /** Lock used to synchronized write operations. */
+ protected Object writeLock;
- public boolean send( Persistent p );
- public boolean sendAndCheck( Persistent p );
- public boolean flushAndSend( Persistent p );
- public boolean hasToSend();
- public int doSend();
+
+ public CSSession()
+ {
+ this(null);
+ }
+
+ public CSSession( CSServer s )
+ {
+ super(true);
+ server=s;
+ socket=new PersistentSocket();
+ socket.setDebug(false);
+ label=socket.getLabel();
+ readLock=new Object();
+ writeLock=new Object();
+ }
+
+ public String toString()
+ {
+ return "Client/server session";
+ }
+
+
+
////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public String getLabel()
+ {
+ return label;
+ }
+
+ public int getOps()
+ {
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ return (socket.shouldWrite() ?
(SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ);
+ }
+ }
+ }
+
+ public SelectionKey registerOps( Selector sel, int ops )
+ {
+ SelectionKey key;
+
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ key=socket.getChannel().keyFor(sel);
+ if (key==null) {
+ try {
+
key=socket.getChannel().register(sel,0);
+ }
+ catch( ClosedChannelException x ) {
+ err(label+" Failed to register
on selector !",x);
+ return null;
+ }
+ }
+ key.interestOps(ops);
+ return key;
+ }
+ }
+ }
+
+ public boolean isConnected()
+ {
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ return !socket.isClosed();
+ }
+ }
+ }
+
+ /**
+ * Connect this session to the specified ip and port in *blocking mode*.
+ * Used when connecting to a server at {ip,port}.
+ *
+ * @param ip IP of the host to connect to.
+ * @param port The port number.
+ * @param careful Should we treat socket with respect (SO_LINGER
not set) ?
+ * @return True if successful, false on failure.
+ */
+
+ public boolean connect( InetAddress ip, int port, boolean careful )
+ {
+ boolean res;
+
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ res=socket.open(ip,port,careful);
+ if (res) {
+ label=socket.getLabel();
+ debug(label+" Connected.");
+ }
+ return res;
+ }
+ }
+ }
+
+ /**
+ * Connect this session to the specified channel in *non blocking* mode.
+ * Used when connecting to a client from a server.
+ *
+ * @param channel The open client socket.
+ * @param careful Should we treat socket with respect (SO_LINGER
not set) ?
+ * @return True if successful, false on failure.
+ */
+
+ public boolean connect( SocketChannel channel, boolean careful )
+ {
+ boolean res;
+
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ res=socket.open(channel,careful);
+ if (res) {
+ label=socket.getLabel();
+ debug(label+" Connected.");
+ }
+ return res;
+ }
+ }
+ }
+
+ /**
+ * Close the session.
+ *
+ * @return True if succeedeed, false otherwise.
+ */
+
+ public boolean disconnect()
+ {
+ boolean res;
+
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ if (socket.isClosed()) {
+ log(Level.WARNING,"Session is already
closed.");
+ return false;
+ }
+
+ res=socket.close();
+ if (res) {
+ debug(label+" Disconnected.");
+ }
+ return res;
+ }
+ }
+ }
+
+ public boolean isBlocking()
+ {
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ return socket.isBlocking();
+ }
+ }
+ }
+
+ public void setBlocking( boolean flag )
+ {
+ synchronized(readLock) {
+ synchronized(writeLock) {
+ socket.setBlocking(flag);
+ }
+ }
+ }
+
+ /**
+ * Buffer data received from the other side.
+ *
+ * @return True if at least one byte has been received, false if
the socket was closed by the other side or if an error occured.
+ */
+
+ public int doReceive()
+ {
+ int len;
+
+ synchronized(readLock) {
+ len=socket.doRead();
+ if (len>0) {
+ debug(label+" Have read "+len+" bytes.");
+ }
+ return len;
+ }
+ }
+
+ public boolean hasReceived()
+ {
+ synchronized(readLock) {
+ return socket.shouldDequeue();
+ }
+ }
+
+ /**
+ * @param c
+ * @return
+ */
+
+ public Persistent receive( Class c )
+ {
+ Persistent p;
+
+ synchronized(readLock) {
+ if (!socket.shouldDequeue() && socket.isBlocking()) {
+ doReceive();
+ }
+
+ p=socket.dequeue(c);
+ if (p!=null) {
+ debug(label+" Received message : "+p+".");
+ }
+ return p;
+ }
+ }
+
+ /**
+ * Decode buffered data. If in blocking mode and no messages are
buffered, an attempt is made to read fresh data.
+ *
+ * @param decoder Decoder used to transform transmitted data into
messages.
+ * @return Any decoded data if available, null
otherwise.
+ */
+
+ public Persistent receive( PersistentDecoder decoder )
+ {
+ Persistent p;
+
+ synchronized(readLock) {
+ if (!socket.shouldDequeue() && socket.isBlocking()) {
+ doReceive();
+ }
+
+ p=socket.dequeue(decoder);
+ if (p!=null) {
+ debug(label+" Received message : "+p+".");
+ }
+ return p;
+ }
+ }
+
+ /**
+ * Add data to the buffer, and if blocking, start transferring buffered
data.
+ *
+ * <div>When in blocking mode, try to also send buffered data to the
other side. Returns true if, at least, one byte
+ * has been transmitted. Please note that it does *not* imply that any
part of the data <code>p</code> has been transmitted,
+ * since other data may had been buffered previously (transfer is
initiated but may be incomplete).</div>
+ *
+ * <div>In non-blocking mode, returns true. The actual transfer happens
asynchronously.</div>
+ *
+ * @param p The data to write (duplicated, because may be buffered
and stored a certain amount of time...).
+ * @return True if in non-blocking mode, or if at least one byte
of buffered data has been transmitted, false otherwise.
+ */
+
+ public boolean send( Persistent p )
+ {
+ p=PersistentHelper.copy(p);
+
+ synchronized(writeLock) {
+ socket.enqueue(p);
+ debug(label+" Sent message : "+p+".");
+
+ if (server!=null) {
+ server.wakeUp();
+ }
+
+ return (socket.isBlocking() ? doSend()>0 : true);
+ }
+ }
+
+ public boolean sendAndCheck( Persistent p )
+ {
+ CSResult res;
+
+ synchronized(writeLock) {
+ if (!send(p)) {
+ return false;
+ }
+
+ res=(CSResult) receive(CSResult.class);
+ return (res!=null && res.isOkay());
+ }
+ }
+
+ /**
+ * Flush buffered data, buffer given data <code>p</code> and try to
initiate transfer of this data.
+ * Note that it is possible that only a part of the message is sent.
+ *
+ * Returning true here means that at least a small part of the message
has been transmitted,
+ * though it may be transmitted entirely a bit later.
+ *
+ * @param p The data to write (duplicated, because may be buffered
and stored a certain amount of time...).
+ * @return False if an I/O error occurred, or if it did not
transmit any byte of the message. Return true otherwise.
+ */
+
+ public boolean flushAndSend( Persistent p )
+ {
+ boolean empty;
+
+ p=PersistentHelper.copy(p);
+
+ synchronized(writeLock) {
+ doSend();
+
+ empty=!socket.shouldWrite();
+
+ socket.enqueue(p);
+ debug(label+" Sent message : "+p+".");
+
+ if (server!=null) {
+ server.wakeUp();
+ }
+
+ return (empty ? doSend()>0 : false);
+ }
+ }
+
+ public boolean hasToSend()
+ {
+ synchronized(writeLock) {
+ return socket.shouldWrite();
+ }
+ }
+
+ /**
+ * Send buffered data, if any.
+ *
+ * @return True if at least one byte has been transmitted, false
otherwise
+ * (an error occured, the other side is not ready,
or there is no data in buffer).
+ */
+
+ public int doSend()
+ {
+ int len;
+
+ synchronized(writeLock) {
+ len=0;
+ if (socket.shouldWrite()) {
+ len=socket.doWrite();
+ if (len>0) {
+ debug(label+" Have written "+len+"
bytes.");
+ }
+ }
+ return len;
+ }
+ }
}
Deleted: freeway/src/org/gnu/freeway/util/net/TCPServer.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:24:55 UTC
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:45:03 UTC
(rev 3072)
@@ -1,648 +0,0 @@
-/**
- * @PROJECT_INFO@
- */
-
-package org.gnu.freeway.util.net;
-
-import org.gnu.freeway.util.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.logging.*;
-
-/**
- *
- */
-
-public class TCPServer extends LoggedObject implements CSServer
-{
- /** */
- private static final long SELECT_TIMEOUT =
Scheduler.SECS_3;
-
- /** Maximum of pending unhandled connections. */
- private static final int MAX_QUEUED_REQUESTS =
5;
-
- /** Maximum number of concurrent allowed sessions. */
- private static final int MAX_SESSIONS
= 64;
-
- /** Name of this server (for debugging purpose only). */
- private String label;
-
- /** Selector of the server thread */
- private Selector selector;
-
- /** The TCP socket that we listen on for new inbound connections. */
- private ServerSocketChannel server;
-
- /** Thread for listening for new connections. */
- private MasterTask listenTask;
-
- /** Thread for accepting new connections. */
- private SlaveTask acceptTask;
-
- /** Thread for reading on all open sockets. */
- private SlaveTask readTask;
-
- /** Thread for writing on all open sockets. */
- private SlaveTask writeTask;
-
- /** Should the listen thread exit ? */
- private boolean running;
-
- /** Array of currently active TCP sessions. */
- private CSSession[] sessions;
-
- /** */
- private int sessionCount;
-
- /** Sessions' current operations. */
- private int[] sessionsOps;
-
- /** */
- private boolean acceptingOp;
-
- /** Sessions lock. */
- private Object internal;
-
- /** */
- private CSSessionHandler handler;
-
-
- public TCPServer( String str, CSSessionHandler h )
- {
- super(true);
- label=str;
- selector=null;
- server=null;
- listenTask=new MasterTask("TCP-LISTEN("+str+")",new
EvalAction(this,"performListen"));
- acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new
EvalAction(this,"performAccept"));
- readTask=listenTask.create("TCP-READ("+str+")",new
EvalAction(this,"performRead"));
- writeTask=listenTask.create("TCP-WRITE("+str+")",new
EvalAction(this,"performWrite"));
- running=false;
- sessions=new CSSession[0];
- sessionCount=0;
- sessionsOps=new int[0];
- acceptingOp=false;
- internal=new Object();
- handler=h;
- }
-
- public String toString()
- {
- return "Abstract TCP server";
- }
-
-
-
////////////////////////////////////////////////////////////////////////////////////////////////
-
- public String getLabel()
- {
- return label;
- }
-
- public boolean isLaunched()
- {
- return running;
- }
-
- public boolean launch( int port )
- {
- int secs;
-
- log(Level.INFO,label+" Launching TCP server...");
-
- // open selector
- try {
- selector=Selector.open();
- }
- catch( IOException x ) {
- err("Could not create selector !",x);
- return false;
- }
-
- // create server socket
- secs=5;
- while (server==null && secs<60) {
- try {
- server=ServerSocketChannel.open();
- server.configureBlocking(false);
- server.socket().setReuseAddress(true);
- server.socket().bind(new
InetSocketAddress(port),MAX_QUEUED_REQUESTS);
- log(Level.INFO,label+" TCP server bound to port
"+port+".");
- }
- catch( IOException x ) {
- err("Failed to open socket at port "+port+".
Trying again in "+secs+" seconds...",x);
-
- Scheduler.sleep(Scheduler.seconds(secs));
- secs+=5; // slow progression...
-
- if (server!=null) {
- try {
- server.close();
- }
- catch( IOException xx ) {
- }
- server=null;
- }
- }
- }
-
- if (server==null) {
- log(Level.SEVERE,label+" Could not create socket,
abort.");
- try {
- selector.close();
- }
- catch( IOException x ) {
- }
- selector=null;
- return false;
- }
-
- // start listening thread
- running=true;
- listenTask.launch();
- return true;
- }
-
- public boolean shutdown()
- {
- int i;
-
- // signal listening thread
- running=false;
- selector.wakeup();
-
- // stop listening thread
- listenTask.shutdown();
-
- try {
- server.close();
- }
- catch( IOException x ) {
- err("Failed to close socket !",x);
- return false;
- }
- finally {
- server=null;
- try {
- selector.close();
- }
- catch( IOException x ) {
- err("Failed to close selector !",x);
- return false;
- }
- finally {
- selector=null;
- }
- }
- log(Level.INFO,label+" TCP server stopped.");
-
- synchronized(internal) {
- for (i=0; i<sessions.length; i++) {
- if (sessions[i]!=null) {
- log(Level.WARNING,label+" Session still
alive : "+sessions[i].getLabel());
- destroySession(i);
- }
- }
- }
- return true;
- }
-
- public void wakeUp()
- {
- selector.wakeup();
- }
-
- /**
- * Add session to the pool of listened sessions. If it can't be added,
session will be disconnected and false returned.
- *
- * @param s
- * @return
- * @see CSSession#disconnect()
- */
-
- public boolean register( CSSession s )
- {
- if (addSession(s)>=0) {
- // signal the thread that is blocked in a select call
that the set of sockets to listen to has changed
- selector.wakeup();
- return true;
- }
- return false;
- }
-
- /**
- * Listen for incoming connections.
- * Main method for the thread listening on the tcp socket and all tcp
connections.
- * Whenever a message is received, it is processed by the handler.
- * This thread waits for activity on any of the TCP connections and
processes deferred (async) writes and buffers reads
- * until an entire message has been received.
- *
- * @throws IOException
- */
-
- public void performListen() throws IOException
- {
- SelectionKey key;
- Iterator iter;
- int mergedOps,ops,ret,i;
-
- acceptingOp=true;
-
- while (running) {
- synchronized(internal) {
- server.register(selector,(acceptingOp ?
SelectionKey.OP_ACCEPT : 0));
-
- for (i=0; i<sessions.length; i++) {
- if (sessions[i]!=null) {
- if (sessions[i].isConnected())
{ // always check because impl. may disconnect after timeout...
-
key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i]));
- if (key!=null) {
- key.attach(new
Integer(i));
- }
- else {
-
destroySession(i);
- }
- }
- else {
- // clean up (depends on
session implementation : timeout detected, other side closed connection...)
- destroySession(i);
- }
- }
- }
- }
-
- // should wake up regularly (to clean up sessions...)
- ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT));
- if (ret==0) {
- continue;
- }
-
- synchronized(internal) {
- mergedOps=0;
-
- iter=selector.selectedKeys().iterator();
- while (iter.hasNext()) {
- key=(SelectionKey) iter.next();
- iter.remove();
-
- if (key.isValid()) {
- ops=key.readyOps();
- mergedOps|=ops;
- if ((ops &
SelectionKey.OP_ACCEPT)==0) { // read or write op
- i=((Number)
key.attachment()).intValue();
- sessionsOps[i]|=ops;
- }
- }
- }
-
- debug(label+" Selected #"+ret+" sockets with
merged ops { "+NetUtils.labelForOps(mergedOps)+" }.");
-
- // signal appropriate tasks
- if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) {
- acceptingOp=false;
- acceptTask.signal();
- }
- if ((mergedOps & SelectionKey.OP_READ)!=0) {
- readTask.signal();
- }
- if ((mergedOps & SelectionKey.OP_WRITE)!=0) {
- writeTask.signal();
- }
- }
- }
-
- // shutdown... close all sessions
- synchronized(internal) {
- for (i=0; i<sessions.length; i++) {
- if (sessions[i]!=null) {
- destroySession(i);
- }
- }
- }
- }
-
- public void performAccept()
- {
- CSSession s;
- SocketChannel c;
-
- try {
- for (c=server.accept(); c!=null; c=server.accept()) {
- s=handler.handleAccept(c);
- if (s!=null) {
- if (addSession(s)<0) {
- s.disconnect();
- }
- }
- else {
- try {
- c.close();
- }
- catch( IOException xx ) {
- err("Failed to close channel
!",xx);
- }
- }
- }
- }
- catch( IOException x ) {
- err("Failed to accept new connection !",x);
- }
-
- synchronized(internal) {
- acceptingOp=true;
- }
-
- selector.wakeup();
- }
-
- public void performRead()
- {
- CSSession s;
- int len;
-
- do {
- s=firstSessionWithOp(SelectionKey.OP_READ);
- if (s!=null) {
- len=s.doReceive();
- if (len>0 && handler.handleRead(s,len)) {
- clearSessionOp(s,SelectionKey.OP_READ);
- }
- else {
- debug(s.getLabel()+" End of stream.");
- destroySession(s);
- }
- }
- }
- while (s!=null);
-
- // signal the thread that is blocked in a select call that the
set of sockets to listen to has changed
- selector.wakeup();
- }
-
- public void performWrite()
- {
- CSSession s;
- int len;
-
- do {
- s=firstSessionWithOp(SelectionKey.OP_WRITE);
- if (s!=null) {
- len=s.doSend();
- if (len>0 && handler.handleWrite(s,len)) {
- clearSessionOp(s,SelectionKey.OP_WRITE);
- }
- else {
- debug(s.getLabel()+" End of stream.");
- destroySession(s);
- }
- }
- }
- while (s!=null);
-
- // signal the thread that is blocked in a select call that the
set of sockets to listen to has changed
- selector.wakeup();
- }
-
- /**
- * Add a new session to the array watched by the select thread. Grows
the array if needed.
- *
- * @param s Session to add.
- * @return Index of added session, or -1 on error.
- */
-
- protected int addSession( CSSession s )
- {
- CSSession[] tmp;
- int[] tmp2;
- int i;
-
- synchronized(internal) {
- if (sessionCount==MAX_SESSIONS) {
- log(Level.WARNING,"Too many sessions
("+MAX_SESSIONS+"), ignore connection.");
- return -1;
- }
-
- for (i=0; i<sessions.length && sessions[i]!=null; i++)
{}
- if (i==sessions.length) {
- tmp=new CSSession[sessions.length+16];
- Arrays.fill(tmp,null);
-
System.arraycopy(sessions,0,tmp,0,sessions.length);
- sessions=tmp;
-
- tmp2=new int[sessionsOps.length+16];
- Arrays.fill(tmp2,0);
-
System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length);
- sessionsOps=tmp2;
- }
-
- sessions[i]=s;
- sessionsOps[i]=0;
- sessionCount++;
- debug("Add session at slot #"+i+"
"+Utils.gauge(sessionCount,sessions.length)+".");
- return i;
- }
- }
-
- protected CSSession firstSessionWithOp( int op )
- {
- int i;
-
- synchronized(internal) {
- for (i=0; i<sessionsOps.length && (sessionsOps[i] &
op)==0; i++) {}
- return (i<sessionsOps.length ? sessions[i] : null);
- }
- }
-
- protected boolean clearSessionOp( CSSession s, int op )
- {
- int i;
-
- assert(s!=null);
-
- synchronized(internal) {
- for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
- if (i==sessions.length) {
- log(Level.WARNING,label+" Session not found :
"+s.getLabel()+".");
- return false;
- }
-
- sessionsOps[i]&=~op;
- return true;
- }
- }
-
- protected boolean destroySession( CSSession s )
- {
- int i;
-
- assert(s!=null);
-
- synchronized(internal) {
- for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
- if (i==sessions.length) {
- log(Level.WARNING,label+" Session not found :
"+s.getLabel()+".");
- return false;
- }
-
- return destroySession(i);
- }
- }
-
- /**
- * The client has disconnected. Close the socket, free the buffers,
unlink session from the linked list.
- * Remove a session, either the other side closed the connection or we
have otherwise reason to believe
- * that it should better be killed.
- *
- * @param index index to the session handle
- * @return
- */
-
- protected boolean destroySession( int index )
- {
- assert(index>=0);
-
- synchronized(internal) {
- if (index>=sessions.length || sessions[index]==null) {
- log(Level.WARNING,label+" No session at slot
"+index+".");
- return false;
- }
-
- if (sessions[index].isConnected()) {
- sessions[index].disconnect();
- }
-
- handler.handleDestroy(sessions[index]);
-
- sessions[index]=null;
- sessionsOps[index]=0;
- sessionCount--;
- debug("Destroyed session at slot #"+index+"
"+Utils.gauge(sessionCount,sessions.length)+".");
- }
- return true;
- }
-}
-
-
-/*
- public void add( int id, Class c, CommandHandler h )
- {
- decoder.add(id,c);
- setHandler(c,h);
- }
-
- public void setDefault( int id, Class c, CommandHandler h )
- {
- decoder.add(id,c);
- setHandler(c,h);
- decoder.setDefault(id);
- }
-
- public void setCorrupted( int id, Class c, CommandHandler h )
- {
- decoder.add(id,c);
- setHandler(c,h);
- decoder.setCorrupted(id);
- }
-
- public boolean hasHandler( Class c )
- {
- synchronized(handlers) {
- return handlers.get(c)!=null;
- }
- }
-
- public CommandHandler getHandler( Class c )
- {
- synchronized(handlers) {
- return (CommandHandler) handlers.get(c);
- }
- }
-
- public boolean setHandler( Class c, CommandHandler h )
- {
- synchronized(handlers) {
- if (handlers.get(c)!=null) {
- log(Level.WARNING,"Could not assign handler,
class "+c.getName()+" is already registered.");
- return false;
- }
- handlers.put(c,h);
- return true;
- }
- }
-
- public boolean removeHandler( Class c, CommandHandler h )
- {
- synchronized(handlers) {
- if (handlers.get(c)==null) {
- log(Level.WARNING,"Could not remove handler,
class "+c.getName()+" is not registered.");
- return false;
- }
- handlers.remove(c);
- return true;
- }
- }
-
-
-deriver de TCP Server:
- public void start()
- {
- add(ProxyCommand.HELLO_ID,ProxyHello.class,this);
- add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this);
- add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this);
-
- setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this);
-
setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this);
-
- super.start();
- }
-
- public void stop()
- {
- super.stop();
- }
-
- public TCP Session createSession( PersistentDecoder decoder )
- {
- return new ProxySession(decoder);
- }
-
- public boolean handle( TCP Session session, Persistent p )
- {
- ProxySession s;
- ProxyToken token;
- int id;
-
- id=((ProxyCommand) p).getID();
-
- s=(ProxySession) session;
- if (!s.isWelcomed()) {
- if (id!=ProxyCommand.HELLO_ID) {
- log("No hello received, close session.");
- return false;
- }
- s.welcome();
-
- token=new ProxyToken(s.getToken());
- token.setAddress("",0);
- sendToClient(s,token);
- return true;
- }
-
- switch (id) {
- case ProxyCommand.CONNECT_ID:
- break;
-
- case ProxyCommand.SETBLOCKING_ID:
- break;
-
- default:
- log("Unknown message : "+p);
- return false;
- }
- return true;
- }
-
-*/
Deleted: freeway/src/org/gnu/freeway/util/net/TCPSession.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/TCPSession.java 2006-06-28
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/TCPSession.java 2006-06-28
04:45:03 UTC (rev 3072)
@@ -1,410 +0,0 @@
-/**
- * @PROJECT_INFO@
- */
-
-package org.gnu.freeway.util.net;
-
-import org.gnu.freeway.util.*;
-
-import java.net.*;
-import java.nio.channels.*;
-import java.util.logging.*;
-
-/**
- * Per-client data structure (kept in linked list). Also: the opaque
- * handle for client connections passed by the core to the CSHandlers.
- * Opaque handle for client connections passed by
- * the core to the CSHandlers.
- *
- * A connection to a freeway client application. To be used in non-blocking
mode.
- *
- * Struct to refer to a GNUnet TCP connection.
- * This is more than just a socket because if the server
- * drops the connection, the client automatically tries
- * to reconnect (and for that needs connection information).
- *
- * Code for synchronized access to TCP streams
- *
- * Generic TCP code for reliable, mostly blocking, record-oriented TCP
- * connections. GNUnet uses the "tcpio" code for trusted client-server
- * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note
- * that an unblocking write is also provided since if both client and
- * server use blocking IO, both may block on a write and cause a
- * mutual inter-process deadlock.
- *
- * Since we do not want other peers (!) to be able to block a peer by
- * not reading from the TCP stream, the peer-to-peer TCP transport
- * uses unreliable, buffered, non-blocking, record-oriented TCP code
- * with a select call to reduce the number of threads which is
- * provided in transports/tcp.c.
- * Generic TCP code. This module is used to receive or send records
- * (!) from a TCP stream. The code automatically attempts to
- * re-connect if the other side closes the connection.<br>
- *
- * The code can be used on the server- or the client side, just in
- * case of the server the reconnect can of course not be used. The TCP
- * stream is broken into records of maximum length MAX_BUFFER_SIZE,
- * each preceeded by a 16 bits integer (not signed) giving the length of the
- * following record.<p>
- */
-
-public class TCPSession extends LoggedObject implements CSSession
-{
- /** */
- private TCPServer server;
-
- /** Socket to communicate with the other side. */
- private PersistentSocket socket;
-
- /** */
- private String label;
-
- /** Lock used to synchronized read operations. */
- protected Object readLock;
-
- /** Lock used to synchronized write operations. */
- protected Object writeLock;
-
-
- public TCPSession()
- {
- this(null);
- }
-
- public TCPSession( TCPServer s )
- {
- super(true);
- server=s;
- socket=new PersistentSocket();
- socket.setDebug(false);
- label=socket.getLabel();
- readLock=new Object();
- writeLock=new Object();
- }
-
- public String toString()
- {
- return "Client/server session";
- }
-
-
-
////////////////////////////////////////////////////////////////////////////////////////////////
-
- public String getLabel()
- {
- return label;
- }
-
- public int getOps()
- {
- synchronized(readLock) {
- synchronized(writeLock) {
- return (socket.shouldWrite() ?
(SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ);
- }
- }
- }
-
- public SelectionKey registerOps( Selector sel, int ops )
- {
- SelectionKey key;
-
- synchronized(readLock) {
- synchronized(writeLock) {
- key=socket.getChannel().keyFor(sel);
- if (key==null) {
- try {
-
key=socket.getChannel().register(sel,0);
- }
- catch( ClosedChannelException x ) {
- err(label+" Failed to register
on selector !",x);
- return null;
- }
- }
- key.interestOps(ops);
- return key;
- }
- }
- }
-
- public boolean isConnected()
- {
- synchronized(readLock) {
- synchronized(writeLock) {
- return !socket.isClosed();
- }
- }
- }
-
- /**
- * Connect this session to the specified ip and port in *blocking mode*.
- * Used when connecting to a server at {ip,port}.
- *
- * @param ip IP of the host to connect to.
- * @param port The port number.
- * @param careful Should we treat socket with respect (SO_LINGER
not set) ?
- * @return True if successful, false on failure.
- */
-
- public boolean connect( InetAddress ip, int port, boolean careful )
- {
- boolean res;
-
- synchronized(readLock) {
- synchronized(writeLock) {
- res=socket.open(ip,port,careful);
- if (res) {
- label=socket.getLabel();
- debug(label+" Connected.");
- }
- return res;
- }
- }
- }
-
- /**
- * Connect this session to the specified channel in *non blocking* mode.
- * Used when connecting to a client from a server.
- *
- * @param channel The open client socket.
- * @param careful Should we treat socket with respect (SO_LINGER
not set) ?
- * @return True if successful, false on failure.
- */
-
- public boolean connect( SocketChannel channel, boolean careful )
- {
- boolean res;
-
- synchronized(readLock) {
- synchronized(writeLock) {
- res=socket.open(channel,careful);
- if (res) {
- label=socket.getLabel();
- debug(label+" Connected.");
- }
- return res;
- }
- }
- }
-
- /**
- * Close the session.
- *
- * @return True if succeedeed, false otherwise.
- */
-
- public boolean disconnect()
- {
- boolean res;
-
- synchronized(readLock) {
- synchronized(writeLock) {
- if (socket.isClosed()) {
- log(Level.WARNING,"Session is already
closed.");
- return false;
- }
-
- res=socket.close();
- if (res) {
- debug(label+" Disconnected.");
- }
- return res;
- }
- }
- }
-
- public boolean isBlocking()
- {
- synchronized(readLock) {
- synchronized(writeLock) {
- return socket.isBlocking();
- }
- }
- }
-
- public void setBlocking( boolean flag )
- {
- synchronized(readLock) {
- synchronized(writeLock) {
- socket.setBlocking(flag);
- }
- }
- }
-
- /**
- * Buffer data received from the other side.
- *
- * @return True if at least one byte has been received, false if
the socket was closed by the other side or if an error occured.
- */
-
- public int doReceive()
- {
- int len;
-
- synchronized(readLock) {
- len=socket.doRead();
- if (len>0) {
- debug(label+" Have read "+len+" bytes.");
- }
- return len;
- }
- }
-
- public boolean hasReceived()
- {
- synchronized(readLock) {
- return socket.shouldDequeue();
- }
- }
-
- /**
- * @param c
- * @return
- */
-
- public Persistent receive( Class c )
- {
- Persistent p;
-
- synchronized(readLock) {
- if (!socket.shouldDequeue() && socket.isBlocking()) {
- doReceive();
- }
-
- p=socket.dequeue(c);
- if (p!=null) {
- debug(label+" Received message : "+p+".");
- }
- return p;
- }
- }
-
- /**
- * Decode buffered data. If in blocking mode and no messages are
buffered, an attempt is made to read fresh data.
- *
- * @param decoder Decoder used to transform transmitted data into
messages.
- * @return Any decoded data if available, null
otherwise.
- */
-
- public Persistent receive( PersistentDecoder decoder )
- {
- Persistent p;
-
- synchronized(readLock) {
- if (!socket.shouldDequeue() && socket.isBlocking()) {
- doReceive();
- }
-
- p=socket.dequeue(decoder);
- if (p!=null) {
- debug(label+" Received message : "+p+".");
- }
- return p;
- }
- }
-
- /**
- * Add data to the buffer, and if blocking, start transferring buffered
data.
- *
- * <div>When in blocking mode, try to also send buffered data to the
other side. Returns true if, at least, one byte
- * has been transmitted. Please note that it does *not* imply that any
part of the data <code>p</code> has been transmitted,
- * since other data may had been buffered previously (transfer is
initiated but may be incomplete).</div>
- *
- * <div>In non-blocking mode, returns true. The actual transfer happens
asynchronously.</div>
- *
- * @param p The data to write (duplicated, because may be buffered
and stored a certain amount of time...).
- * @return True if in non-blocking mode, or if at least one byte
of buffered data has been transmitted, false otherwise.
- */
-
- public boolean send( Persistent p )
- {
- p=PersistentHelper.copy(p);
-
- synchronized(writeLock) {
- socket.enqueue(p);
- debug(label+" Sent message : "+p+".");
-
- if (server!=null) {
- server.wakeUp();
- }
-
- return (socket.isBlocking() ? doSend()>0 : true);
- }
- }
-
- public boolean sendAndCheck( Persistent p )
- {
- CSResult res;
-
- synchronized(writeLock) {
- if (!send(p)) {
- return false;
- }
-
- res=(CSResult) receive(CSResult.class);
- return (res!=null && res.isOkay());
- }
- }
-
- /**
- * Flush buffered data, buffer given data <code>p</code> and try to
initiate transfer of this data.
- * Note that it is possible that only a part of the message is sent.
- *
- * Returning true here means that at least a small part of the message
has been transmitted,
- * though it may be transmitted entirely a bit later.
- *
- * @param p The data to write (duplicated, because may be buffered
and stored a certain amount of time...).
- * @return False if an I/O error occurred, or if it did not
transmit any byte of the message. Return true otherwise.
- */
-
- public boolean flushAndSend( Persistent p )
- {
- boolean empty;
-
- p=PersistentHelper.copy(p);
-
- synchronized(writeLock) {
- doSend();
-
- empty=!socket.shouldWrite();
-
- socket.enqueue(p);
- debug(label+" Sent message : "+p+".");
-
- if (server!=null) {
- server.wakeUp();
- }
-
- return (empty ? doSend()>0 : false);
- }
- }
-
- public boolean hasToSend()
- {
- synchronized(writeLock) {
- return socket.shouldWrite();
- }
- }
-
- /**
- * Send buffered data, if any.
- *
- * @return True if at least one byte has been transmitted, false
otherwise
- * (an error occured, the other side is not ready,
or there is no data in buffer).
- */
-
- public int doSend()
- {
- int len;
-
- synchronized(writeLock) {
- len=0;
- if (socket.shouldWrite()) {
- len=socket.doWrite();
- if (len>0) {
- debug(label+" Have written "+len+"
bytes.");
- }
- }
- return len;
- }
- }
-}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3072 - in freeway/src/org/gnu/freeway: . server test transport/tcp util/net,
mdonoughe <=