[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r21640 - in gnunet-java: . src/org/gnunet/construct src/org
From: |
gnunet |
Subject: |
[GNUnet-SVN] r21640 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/core src/org/gnunet/util src/org/grothoff test/org/gnunet/util |
Date: |
Wed, 30 May 2012 14:42:36 +0200 |
Author: dold
Date: 2012-05-30 14:42:36 +0200 (Wed, 30 May 2012)
New Revision: 21640
Added:
gnunet-java/src/org/gnunet/util/UnknownMessageBody.java
gnunet-java/test/org/gnunet/util/ServerExample.java
Removed:
gnunet-java/test/org/gnunet/util/ServerTest.java
Modified:
gnunet-java/ISSUES
gnunet-java/src/org/gnunet/construct/MessageLoader.java
gnunet-java/src/org/gnunet/core/Core.java
gnunet-java/src/org/gnunet/util/Connection.java
gnunet-java/src/org/gnunet/util/Scheduler.java
gnunet-java/src/org/gnunet/util/Server.java
gnunet-java/src/org/grothoff/Runabout.java
Log:
most of the server lib implemented, some fixes
Modified: gnunet-java/ISSUES
===================================================================
--- gnunet-java/ISSUES 2012-05-30 11:21:31 UTC (rev 21639)
+++ gnunet-java/ISSUES 2012-05-30 12:42:36 UTC (rev 21640)
@@ -71,6 +71,9 @@
* alternative 2: pass a special UnknownMessage to the message handler, filter
it for higher-level
APIs and signal an error.
+
+* the recvDone is kind of clunky (see test/org.gnunet.util.ServerExample)
+
* finally core/statistics/dht/resolver/... have unit tests!
(and coverage works again, but i can't access the cobertura via ssh yet)
* currently most test rely on a running gnunet, and use the default
configuration
Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-05-30
11:21:31 UTC (rev 21639)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-05-30
12:42:36 UTC (rev 21640)
@@ -43,6 +43,19 @@
private static final Logger logger = LoggerFactory
.getLogger(MessageLoader.class);
+
+
+
+ public static class UnknownUnionException extends RuntimeException {
+ public UnknownUnionException(String msg) {
+ super(msg);
+ }
+ }
+
+ public static class UnknownUnionIdException extends RuntimeException {
+
+ }
+
/**
* Maps a class and tag to the corresponding union case.
* <p/>
@@ -146,7 +159,7 @@
public static Class<? extends MessageUnion> getUnionClass(Class<? extends
MessageUnion> unionInterface, int tag) {
Map<Integer, Class<? extends MessageUnion>> map =
unionmap.get(unionInterface);
if (map == null) {
- throw new AssertionError("don't know how to handle unions of type
'" + unionInterface + "'");
+ throw new UnknownUnionException("don't know how to handle unions
of type '" + unionInterface + "'");
}
Class<? extends MessageUnion> cls = map.get(tag);
@@ -157,6 +170,7 @@
return cls;
}
+
public static int getUnionTag(Class<? extends MessageUnion>
unionInterface, Class<? extends MessageUnion> unionCase) {
Map<Class<? extends MessageUnion>, Integer> map =
tagmap.get(unionInterface);
if (map == null)
Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java 2012-05-30 11:21:31 UTC (rev
21639)
+++ gnunet-java/src/org/gnunet/core/Core.java 2012-05-30 12:42:36 UTC (rev
21640)
@@ -113,6 +113,11 @@
sink.send(sm);
}
+
+ @Override
+ public void onCancel(boolean alreadyTransmitted) {
+ throw new AssertionError("TransmitRequest cannot be canceled");
+ }
}
/**
@@ -125,6 +130,7 @@
public PeerIdentity target;
public MessageTransmitter transmitter;
public int smrId;
+ public Cancelable transmitRequestCancel;
@Override
public AbsoluteTime getDeadline() {
@@ -142,6 +148,14 @@
sink.send(smr);
}
+
+ @Override
+ public void onCancel(boolean alreadyTransmitted) {
+ // only thing we have to do is cancel the following
transmitRequest, if any!
+ if (transmitRequestCancel != null) {
+ transmitRequestCancel.cancel();
+ }
+ }
}
private class InitRequest extends RequestQueue.Request {
@@ -163,6 +177,17 @@
sink.send(initMessage);
}
+
+ @Override
+ public void onCancel(boolean alreadyTransmitted) {
+ throw new AssertionError("init request can't be canceled");
+ }
+
+ @Override
+ public boolean onReconnect() {
+ // keep the init message on reconnect.
+ return true;
+ }
}
public class CoreReceiver extends RunaboutMessageReceiver {
@@ -243,7 +268,8 @@
}
});
- requestQueue.add(transmitRequest);
+ Cancelable c = requestQueue.add(transmitRequest);
+ req.transmitRequestCancel = c;
}
@Override
@@ -253,7 +279,12 @@
@Override
public void handleError() {
- throw new AssertionError("unexpected");
+ if (disconnectHandler != null) {
+ for (Map.Entry<PeerIdentity, ConnectedPeerInfo> e :
connectedPeers.entrySet()) {
+ disconnectHandler.onDisconnect(e.getKey());
+ }
+ }
+ connectedPeers.clear();
}
}
@@ -299,9 +330,6 @@
cpi.requestsToPeer.put(notifyRequest.smrId, notifyRequest);
- System.out.println("core notifyTransmitReady");
-
-
return requestQueue.add(notifyRequest);
}
@@ -344,10 +372,6 @@
}
messageHandler = runabout;
interested = RunaboutUtil.getRunaboutMessageTypes(runabout);
- System.out.println("message types:");
- for (int i : interested) {
- System.out.println(i);
- }System.out.println("message types end");
}
/**
Modified: gnunet-java/src/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Connection.java 2012-05-30 11:21:31 UTC
(rev 21639)
+++ gnunet-java/src/org/gnunet/util/Connection.java 2012-05-30 12:42:36 UTC
(rev 21640)
@@ -21,6 +21,8 @@
package org.gnunet.util;
import org.gnunet.construct.Construct;
+import org.gnunet.construct.MessageLoader;
+import org.gnunet.construct.ProtocolViolation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,14 +147,26 @@
finished = true;
recvBuffer.flip();
- GnunetMessage msg = Construct.parseAs(recvBuffer,
GnunetMessage.class);
+ boolean found = true;
- receiver.process(msg.body);
+ try {
+ MessageLoader.getUnionClass(GnunetMessage.Body.class,
msgh.messageType);
+ } catch (ProtocolViolation e) {
+ found = false;
+ }
+
+ if (found) {
+ GnunetMessage msg = Construct.parseAs(recvBuffer,
GnunetMessage.class);
+ receiver.process(msg.body);
+ } else {
+ UnknownMessageBody b = new UnknownMessageBody();
+ b.id = msgh.messageType;
+ receiver.process(b);
+ }
}
@Override
public void run(Scheduler.RunContext ctx) {
- logger.debug("receiving in helper");
recvTask = null;
if (ctx.reasons.contains(Scheduler.Reason.TIMEOUT)) {
currentReceiveHelper = null;
@@ -168,9 +182,9 @@
receiver.handleError();
return;
}
- logger.debug(String.format("connectionChannel read %s
bytes", n));
+ logger.debug(String.format("read %s bytes", n));
} catch (IOException e) {
- logger.error("read failed with exception:", e);
+ logger.error("read failed:", e);
receiver.handleError();
return;
}
@@ -180,6 +194,8 @@
} else {
recvBuffer.rewind();
msgh = Construct.parseAs(recvBuffer,
GnunetMessage.Header.class);
+
+ logger.debug("expecting message of size {}, type {}",
msgh.messageSize, msgh.messageType);
if (msgh.messageSize > GnunetMessage.Header.SIZE) {
if (recvBuffer.capacity() < msgh.messageSize) {
ByteBuffer buf =
ByteBuffer.allocate(msgh.messageSize);
@@ -338,6 +354,7 @@
}
public Connection(SocketChannel sock) {
+ assert sock != null;
this.connectionChannel = sock;
}
@@ -453,8 +470,6 @@
throw new AssertionError("cannot receive if not connected");
}
- logger.debug("scheduling receive with timeout " + timeout);
-
recvBuffer.clear();
recvBuffer.limit(GnunetMessage.Header.SIZE);
final ReceiveHelper rh = new ReceiveHelper(receiver, timeout);
Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java 2012-05-30 11:21:31 UTC
(rev 21639)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java 2012-05-30 12:42:36 UTC
(rev 21640)
@@ -222,7 +222,6 @@
}
private void deregister() {
- logger.debug("deregistering");
if (eventChannels == null) {
return;
}
Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-05-30 11:21:31 UTC (rev
21639)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-05-30 12:42:36 UTC (rev
21640)
@@ -20,35 +20,43 @@
package org.gnunet.util;
-
import org.grothoff.Runabout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
public class Server {
+ private static final Logger logger = LoggerFactory
+ .getLogger(Server.class);
+
private final RelativeTime idleTimeout;
private final boolean requireFound;
private List<ServerSocketChannel> listenSockets;
- private List<ClientHandle> clients;
+ private List<ClientHandle> clients = new LinkedList<ClientHandle>();
- private HashMap<Class, MessageRunabout> handlers;
+ private MessageRunabout receivedMessagehandler;
private List<DisconnectHandler> disconnectHandlers = new
LinkedList<DisconnectHandler>();
+ private ArrayList<Class> expectedMessages;
+ public interface DisconnectHandler {
+ void onDisconnect(ClientHandle clientHandle);
+ }
+
public class ClientHandle {
private RelativeTime clientTimeout;
private Connection connection;
private int referenceCount = 0;
private Connection.ReceiveHandle currentReceive;
+ private boolean shutdownRequested;
private ClientHandle(SocketChannel accept) {
connection = new Connection(accept);
@@ -87,10 +95,17 @@
currentReceive = connection.receive(RelativeTime.FOREVER, new
MessageReceiver() {
@Override
public void process(GnunetMessage.Body msg) {
+ if (receivedMessagehandler == null) {
+ throw new AssertionError("received message, but no
handler installed");
+ }
+ receivedMessagehandler.setSender(ClientHandle.this);
+ receivedMessagehandler.visitAppropriate(msg);
}
@Override
public void handleError() {
+ logger.warn("error receiving from client");
+ disconnect();
}
});
} else {
@@ -118,10 +133,21 @@
public void disableReceiveDoneWarning() {
}
+
+ public void keep() {
+ referenceCount++;
+ }
+
+ public void drop() {
+ referenceCount--;
+ if (referenceCount == 0 && shutdownRequested) {
+ disconnect();
+ }
+ }
}
- abstract class MessageRunabout extends Runabout {
+ abstract static class MessageRunabout extends Runabout {
private ClientHandle currentSender;
/**
@@ -131,7 +157,7 @@
* @return handle of the client whose message is currently being
visited
*/
public ClientHandle getSender() {
- return null;
+ return currentSender;
}
private void setSender(ClientHandle clientHandle) {
@@ -140,24 +166,27 @@
}
- /**
- * @param srv ...
- */
private void doAccept(final ServerSocketChannel srv) {
Scheduler.TaskConfiguration b = new
Scheduler.TaskConfiguration(RelativeTime.FOREVER,
- new Scheduler.Task() {
- @Override
- public void run(Scheduler.RunContext ctx) {
- try {
- ClientHandle clientHandle = new ClientHandle(srv.accept());
- clients.add(clientHandle);
+ new Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ logger.debug("client connected");
+ try {
+ SocketChannel cli = srv.accept();
+ if (cli != null) {
- } catch (IOException e) {
- throw new RuntimeException("accept failed", e);
- }
- doAccept(srv);
- }
- });
+ cli.configureBlocking(false);
+ ClientHandle clientHandle = new
ClientHandle(cli);
+ clients.add(clientHandle);
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("accept failed", e);
+ }
+ doAccept(srv);
+ }
+ });
b.selectAccept(srv);
b.schedule();
}
@@ -177,7 +206,9 @@
try {
for (SocketAddress addr : addresses) {
ServerSocketChannel socket = ServerSocketChannel.open();
+ socket.configureBlocking(false);
socket.socket().bind(addr);
+ logger.debug("socket listening on {}", addr.toString());
listenSockets.add(socket);
doAccept(socket);
}
@@ -192,26 +223,13 @@
* There can only be one runabout per message type.
* (Discrepancy with the C-API, could be changed in the future)
*
- * @param cb handler
+ * @param msgRunabout handler
*/
- public void addHandler(MessageRunabout cb) {
- Class rc = cb.getClass();
- for (Method m : rc.getMethods()) {
- if (!(m.getName().equals("visit") && m.getParameterTypes().length
== 1)) {
- continue;
- }
- Class msgClass = m.getParameterTypes()[0];
- if (handlers.containsKey(msgClass)) {
- throw new AssertionError("only one Runabout per message
allowed");
- }
- handlers.put(msgClass, cb);
- }
+ public void setHandler(MessageRunabout msgRunabout) {
+ receivedMessagehandler = msgRunabout;
+ expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout);
}
- public interface DisconnectHandler {
- void onDisconnect(ClientHandle clientHandle);
- }
-
public Cancelable notifyDisconnect(final DisconnectHandler
disconnectHandler) {
this.disconnectHandlers.add(disconnectHandler);
return new Cancelable() {
@@ -222,7 +240,12 @@
};
}
- public void destroy() {
+ /**
+ * Stop the listen socket and get ready to shutdown the server
+ * once only 'monitor' clients are left.
+ */
+ public void stopListening() {
}
+
}
Added: gnunet-java/src/org/gnunet/util/UnknownMessageBody.java
===================================================================
--- gnunet-java/src/org/gnunet/util/UnknownMessageBody.java
(rev 0)
+++ gnunet-java/src/org/gnunet/util/UnknownMessageBody.java 2012-05-30
12:42:36 UTC (rev 21640)
@@ -0,0 +1,35 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+package org.gnunet.util;
+
+/**
+ * Special GnunetMessage body, used to signal that the message containing the
body
+ * is not understood, and therefore no real message body could be constructed.
+ *
+ * Note that this class implements GnunetMessage.Body but does not have a
MessageID associated.
+ * This message should not be sent/received over the network as a message body.
+ *
+ * @author Florian Dold
+ */
+public class UnknownMessageBody implements GnunetMessage.Body {
+ public int id;
+ public byte[] data;
+}
Modified: gnunet-java/src/org/grothoff/Runabout.java
===================================================================
--- gnunet-java/src/org/grothoff/Runabout.java 2012-05-30 11:21:31 UTC (rev
21639)
+++ gnunet-java/src/org/grothoff/Runabout.java 2012-05-30 12:42:36 UTC (rev
21640)
@@ -265,7 +265,8 @@
} catch (IllegalAccessException e) {
throw new RunaboutException(e.toString());
} catch (InvocationTargetException e) {
- throw new RunaboutException(e.toString());
+ e.getCause().printStackTrace();
+ throw new
RunaboutException(e.getCause().toString());
}
}
};
Copied: gnunet-java/test/org/gnunet/util/ServerExample.java (from rev 21637,
gnunet-java/test/org/gnunet/util/ServerTest.java)
===================================================================
--- gnunet-java/test/org/gnunet/util/ServerExample.java
(rev 0)
+++ gnunet-java/test/org/gnunet/util/ServerExample.java 2012-05-30 12:42:36 UTC
(rev 21640)
@@ -0,0 +1,78 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+package org.gnunet.util;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import static org.gnunet.util.Server.*;
+
+/**
+ * Example server implementation.
+ *
+ * @author Florian Dold
+ */
+public class ServerExample {
+
+ public static void main(String[] args) {
+ // usually servers should run inside a service, this is just for
testing
+ new Program(args) {
+ @Override
+ public void run() {
+ Server s = new Server(new SocketAddress[]{new
InetSocketAddress("127.0.0.1", 3456)},
+ RelativeTime.MINUTE,
+ false);
+ s.setHandler(new Server.MessageRunabout() {
+ public void visit(TESTMessage tm) {
+ System.out.println("got TEST message");
+ final Server.ClientHandle sender = getSender();
+ sender.notifyTransmitReady(4, RelativeTime.FOREVER,
new MessageTransmitter() {
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ sink.send(new TESTMessage());
+ System.out.println("TEST message sent");
+ sender.receiveDone(true);
+ }
+
+ @Override
+ public void handleError() {
+ System.out.println("error talking to client!");
+ }
+ });
+ }
+
+ public void visit(UnknownMessageBody b) {
+ System.out.println("got message of unknown type " +
b.id);
+ }
+ });
+
+ s.notifyDisconnect(new DisconnectHandler() {
+ @Override
+ public void onDisconnect(Server.ClientHandle clientHandle)
{
+ System.out.println("client disconnected");
+
+ }
+ });
+
+ }
+ }.start();
+ }
+}
Deleted: gnunet-java/test/org/gnunet/util/ServerTest.java
===================================================================
--- gnunet-java/test/org/gnunet/util/ServerTest.java 2012-05-30 11:21:31 UTC
(rev 21639)
+++ gnunet-java/test/org/gnunet/util/ServerTest.java 2012-05-30 12:42:36 UTC
(rev 21640)
@@ -1,27 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2011, 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-package org.gnunet.util;
-
-/**
- * @author Florian Dold
- */
-public class ServerTest {
-}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r21640 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/core src/org/gnunet/util src/org/grothoff test/org/gnunet/util,
gnunet <=