gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21640 - in gnunet-java: . src/org/gnunet/construct src/org


From: gnunet
Subject: [GNUnet-SVN] r21640 - in gnunet-java: . src/org/gnunet/construct src/org/gnunet/core src/org/gnunet/util src/org/grothoff test/org/gnunet/util
Date: Wed, 30 May 2012 14:42:36 +0200

Author: dold
Date: 2012-05-30 14:42:36 +0200 (Wed, 30 May 2012)
New Revision: 21640

Added:
   gnunet-java/src/org/gnunet/util/UnknownMessageBody.java
   gnunet-java/test/org/gnunet/util/ServerExample.java
Removed:
   gnunet-java/test/org/gnunet/util/ServerTest.java
Modified:
   gnunet-java/ISSUES
   gnunet-java/src/org/gnunet/construct/MessageLoader.java
   gnunet-java/src/org/gnunet/core/Core.java
   gnunet-java/src/org/gnunet/util/Connection.java
   gnunet-java/src/org/gnunet/util/Scheduler.java
   gnunet-java/src/org/gnunet/util/Server.java
   gnunet-java/src/org/grothoff/Runabout.java
Log:
most of the server lib implemented, some fixes

Modified: gnunet-java/ISSUES
===================================================================
--- gnunet-java/ISSUES  2012-05-30 11:21:31 UTC (rev 21639)
+++ gnunet-java/ISSUES  2012-05-30 12:42:36 UTC (rev 21640)
@@ -71,6 +71,9 @@
  * alternative 2: pass a special UnknownMessage to the message handler, filter 
it for higher-level
    APIs and signal an error.
 
+
+* the recvDone is kind of clunky (see test/org.gnunet.util.ServerExample)
+
 * finally core/statistics/dht/resolver/... have unit tests!
   (and coverage works again, but i can't access the cobertura via ssh yet)
  * currently most test rely on a running gnunet, and use the default 
configuration

Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-05-30 
11:21:31 UTC (rev 21639)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-05-30 
12:42:36 UTC (rev 21640)
@@ -43,6 +43,19 @@
     private static final Logger logger = LoggerFactory
             .getLogger(MessageLoader.class);
 
+
+
+
+    public static class UnknownUnionException extends RuntimeException {
+        public UnknownUnionException(String msg) {
+            super(msg);
+        }
+    }
+
+    public static class UnknownUnionIdException extends RuntimeException {
+
+    }
+
     /**
      * Maps a class and tag to the corresponding union case.
      * <p/>
@@ -146,7 +159,7 @@
     public static Class<? extends MessageUnion> getUnionClass(Class<? extends 
MessageUnion> unionInterface, int tag) {
         Map<Integer, Class<? extends MessageUnion>> map = 
unionmap.get(unionInterface);
         if (map == null) {
-            throw new AssertionError("don't know how to handle unions of type 
'" + unionInterface + "'");
+            throw new UnknownUnionException("don't know how to handle unions 
of type '" + unionInterface + "'");
         }
 
         Class<? extends MessageUnion> cls = map.get(tag);
@@ -157,6 +170,7 @@
         return cls;
     }
 
+
     public static int getUnionTag(Class<? extends MessageUnion> 
unionInterface, Class<? extends MessageUnion> unionCase) {
         Map<Class<? extends MessageUnion>, Integer> map = 
tagmap.get(unionInterface);
         if (map == null)

Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java   2012-05-30 11:21:31 UTC (rev 
21639)
+++ gnunet-java/src/org/gnunet/core/Core.java   2012-05-30 12:42:36 UTC (rev 
21640)
@@ -113,6 +113,11 @@
 
             sink.send(sm);
         }
+
+        @Override
+        public void onCancel(boolean alreadyTransmitted) {
+            throw new AssertionError("TransmitRequest cannot be canceled");
+        }
     }
 
     /**
@@ -125,6 +130,7 @@
         public PeerIdentity target;
         public MessageTransmitter transmitter;
         public int smrId;
+        public Cancelable transmitRequestCancel;
 
         @Override
         public AbsoluteTime getDeadline() {
@@ -142,6 +148,14 @@
 
             sink.send(smr);
         }
+
+        @Override
+        public void onCancel(boolean alreadyTransmitted) {
+            // only thing we have to do is cancel the following 
transmitRequest, if any!
+            if (transmitRequestCancel != null) {
+                transmitRequestCancel.cancel();
+            }
+        }
     }
 
     private class InitRequest extends RequestQueue.Request {
@@ -163,6 +177,17 @@
 
             sink.send(initMessage);
         }
+
+        @Override
+        public void onCancel(boolean alreadyTransmitted) {
+            throw new AssertionError("init request can't be canceled");
+        }
+
+        @Override
+        public boolean onReconnect() {
+            // keep the init message on reconnect.
+            return true;
+        }
     }
 
     public class CoreReceiver extends RunaboutMessageReceiver {
@@ -243,7 +268,8 @@
                 }
             });
 
-            requestQueue.add(transmitRequest);
+            Cancelable c = requestQueue.add(transmitRequest);
+            req.transmitRequestCancel = c;
         }
 
         @Override
@@ -253,7 +279,12 @@
 
         @Override
         public void handleError() {
-            throw new AssertionError("unexpected");
+            if (disconnectHandler != null) {
+                for (Map.Entry<PeerIdentity, ConnectedPeerInfo> e : 
connectedPeers.entrySet()) {
+                    disconnectHandler.onDisconnect(e.getKey());
+                }
+            }
+            connectedPeers.clear();
         }
     }
 
@@ -299,9 +330,6 @@
 
         cpi.requestsToPeer.put(notifyRequest.smrId, notifyRequest);
 
-        System.out.println("core notifyTransmitReady");
-
-
         return requestQueue.add(notifyRequest);
     }
 
@@ -344,10 +372,6 @@
         }
         messageHandler = runabout;
         interested = RunaboutUtil.getRunaboutMessageTypes(runabout);
-        System.out.println("message types:");
-        for (int i : interested) {
-            System.out.println(i);
-        }System.out.println("message types end");
     }
 
     /**

Modified: gnunet-java/src/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Connection.java     2012-05-30 11:21:31 UTC 
(rev 21639)
+++ gnunet-java/src/org/gnunet/util/Connection.java     2012-05-30 12:42:36 UTC 
(rev 21640)
@@ -21,6 +21,8 @@
 package org.gnunet.util;
 
 import org.gnunet.construct.Construct;
+import org.gnunet.construct.MessageLoader;
+import org.gnunet.construct.ProtocolViolation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,14 +147,26 @@
             finished = true;
             recvBuffer.flip();
 
-            GnunetMessage msg = Construct.parseAs(recvBuffer, 
GnunetMessage.class);
+            boolean found = true;
 
-            receiver.process(msg.body);
+            try {
+                MessageLoader.getUnionClass(GnunetMessage.Body.class, 
msgh.messageType);
+            } catch (ProtocolViolation e) {
+                found = false;
+            }
+
+            if (found) {
+                GnunetMessage msg = Construct.parseAs(recvBuffer, 
GnunetMessage.class);
+                receiver.process(msg.body);
+            } else {
+                UnknownMessageBody b = new UnknownMessageBody();
+                b.id = msgh.messageType;
+                receiver.process(b);
+            }
         }
 
         @Override
         public void run(Scheduler.RunContext ctx) {
-            logger.debug("receiving in helper");
             recvTask = null;
             if (ctx.reasons.contains(Scheduler.Reason.TIMEOUT)) {
                 currentReceiveHelper = null;
@@ -168,9 +182,9 @@
                         receiver.handleError();
                         return;
                     }
-                    logger.debug(String.format("connectionChannel read %s 
bytes", n));
+                    logger.debug(String.format("read %s bytes", n));
                 } catch (IOException e) {
-                    logger.error("read failed with exception:", e);
+                    logger.error("read failed:", e);
                     receiver.handleError();
                     return;
                 }
@@ -180,6 +194,8 @@
                     } else {
                         recvBuffer.rewind();
                         msgh = Construct.parseAs(recvBuffer, 
GnunetMessage.Header.class);
+
+                        logger.debug("expecting message of size {}, type {}", 
msgh.messageSize, msgh.messageType);
                         if (msgh.messageSize > GnunetMessage.Header.SIZE) {
                             if (recvBuffer.capacity() < msgh.messageSize) {
                                 ByteBuffer buf = 
ByteBuffer.allocate(msgh.messageSize);
@@ -338,6 +354,7 @@
     }
 
     public Connection(SocketChannel sock) {
+        assert sock != null;
         this.connectionChannel = sock;
     }
 
@@ -453,8 +470,6 @@
             throw new AssertionError("cannot receive if not connected");
         }
 
-        logger.debug("scheduling receive with timeout " + timeout);
-
         recvBuffer.clear();
         recvBuffer.limit(GnunetMessage.Header.SIZE);
         final ReceiveHelper rh = new ReceiveHelper(receiver, timeout);

Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java      2012-05-30 11:21:31 UTC 
(rev 21639)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java      2012-05-30 12:42:36 UTC 
(rev 21640)
@@ -222,7 +222,6 @@
         }
 
         private void deregister() {
-            logger.debug("deregistering");
             if (eventChannels == null) {
                 return;
             }

Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-05-30 11:21:31 UTC (rev 
21639)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-05-30 12:42:36 UTC (rev 
21640)
@@ -20,35 +20,43 @@
 
 package org.gnunet.util;
 
-
 import org.grothoff.Runabout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.net.SocketAddress;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
 public class Server {
+    private static final Logger logger = LoggerFactory
+            .getLogger(Server.class);
+
     private final RelativeTime idleTimeout;
     private final boolean requireFound;
     private List<ServerSocketChannel> listenSockets;
-    private List<ClientHandle> clients;
+    private List<ClientHandle> clients = new LinkedList<ClientHandle>();
 
-    private HashMap<Class, MessageRunabout> handlers;
+    private MessageRunabout receivedMessagehandler;
     private List<DisconnectHandler> disconnectHandlers = new 
LinkedList<DisconnectHandler>();
+    private ArrayList<Class> expectedMessages;
 
+    public interface DisconnectHandler {
+        void onDisconnect(ClientHandle clientHandle);
+    }
 
+
     public class ClientHandle {
         private RelativeTime clientTimeout;
         private Connection connection;
 
         private int referenceCount = 0;
         private Connection.ReceiveHandle currentReceive;
+        private boolean shutdownRequested;
 
         private ClientHandle(SocketChannel accept) {
             connection = new Connection(accept);
@@ -87,10 +95,17 @@
                 currentReceive = connection.receive(RelativeTime.FOREVER, new 
MessageReceiver() {
                     @Override
                     public void process(GnunetMessage.Body msg) {
+                        if (receivedMessagehandler == null) {
+                            throw new AssertionError("received message, but no 
handler installed");
+                        }
+                        receivedMessagehandler.setSender(ClientHandle.this);
+                        receivedMessagehandler.visitAppropriate(msg);
                     }
 
                     @Override
                     public void handleError() {
+                        logger.warn("error receiving from client");
+                        disconnect();
                     }
                 });
             } else {
@@ -118,10 +133,21 @@
         public void disableReceiveDoneWarning() {
 
         }
+
+        public void keep() {
+            referenceCount++;
+        }
+
+        public void drop() {
+            referenceCount--;
+            if (referenceCount == 0 && shutdownRequested) {
+                disconnect();
+            }
+        }
     }
 
 
-    abstract class MessageRunabout extends Runabout {
+    abstract static class MessageRunabout extends Runabout {
         private ClientHandle currentSender;
 
         /**
@@ -131,7 +157,7 @@
          * @return handle of the client whose message is currently being 
visited
          */
         public ClientHandle getSender() {
-            return null;
+            return currentSender;
         }
 
         private void setSender(ClientHandle clientHandle) {
@@ -140,24 +166,27 @@
     }
 
 
-    /**
-     * @param srv ...
-     */
     private void doAccept(final ServerSocketChannel srv) {
         Scheduler.TaskConfiguration b = new 
Scheduler.TaskConfiguration(RelativeTime.FOREVER,
-        new Scheduler.Task() {
-            @Override
-            public void run(Scheduler.RunContext ctx) {
-                try {
-                    ClientHandle clientHandle = new ClientHandle(srv.accept());
-                    clients.add(clientHandle);
+                new Scheduler.Task() {
+                    @Override
+                    public void run(Scheduler.RunContext ctx) {
+                        logger.debug("client connected");
+                        try {
+                            SocketChannel cli = srv.accept();
+                            if (cli != null) {
 
-                } catch (IOException e) {
-                    throw new RuntimeException("accept failed", e);
-                }
-                doAccept(srv);
-            }
-        });
+                                cli.configureBlocking(false);
+                                ClientHandle clientHandle = new 
ClientHandle(cli);
+                                clients.add(clientHandle);
+                            }
+
+                        } catch (IOException e) {
+                            throw new RuntimeException("accept failed", e);
+                        }
+                        doAccept(srv);
+                    }
+                });
         b.selectAccept(srv);
         b.schedule();
     }
@@ -177,7 +206,9 @@
         try {
             for (SocketAddress addr : addresses) {
                 ServerSocketChannel socket = ServerSocketChannel.open();
+                socket.configureBlocking(false);
                 socket.socket().bind(addr);
+                logger.debug("socket listening on {}", addr.toString());
                 listenSockets.add(socket);
                 doAccept(socket);
             }
@@ -192,26 +223,13 @@
      * There can only be one runabout per message type.
      * (Discrepancy with the C-API, could be changed in the future)
      *
-     * @param cb handler
+     * @param msgRunabout handler
      */
-    public void addHandler(MessageRunabout cb) {
-        Class rc = cb.getClass();
-        for (Method m : rc.getMethods()) {
-            if (!(m.getName().equals("visit") && m.getParameterTypes().length 
== 1)) {
-                continue;
-            }
-            Class msgClass = m.getParameterTypes()[0];
-            if (handlers.containsKey(msgClass)) {
-                throw new AssertionError("only one Runabout per message 
allowed");
-            }
-            handlers.put(msgClass, cb);
-        }
+    public void setHandler(MessageRunabout msgRunabout) {
+        receivedMessagehandler = msgRunabout;
+        expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout);
     }
 
-    public interface DisconnectHandler {
-        void onDisconnect(ClientHandle clientHandle);
-    }
-
     public Cancelable notifyDisconnect(final DisconnectHandler 
disconnectHandler) {
         this.disconnectHandlers.add(disconnectHandler);
         return new Cancelable() {
@@ -222,7 +240,12 @@
         };
     }
 
-    public void destroy() {
+    /**
+     * Stop the listen socket and get ready to shutdown the server
+     * once only 'monitor' clients are left.
+     */
+    public void stopListening() {
 
     }
+
 }

Added: gnunet-java/src/org/gnunet/util/UnknownMessageBody.java
===================================================================
--- gnunet-java/src/org/gnunet/util/UnknownMessageBody.java                     
        (rev 0)
+++ gnunet-java/src/org/gnunet/util/UnknownMessageBody.java     2012-05-30 
12:42:36 UTC (rev 21640)
@@ -0,0 +1,35 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING.  If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+package org.gnunet.util;
+
+/**
+ * Special GnunetMessage body, used to signal that the message containing the 
body
+ * is not understood, and therefore no real message body could be constructed.
+ *
+ * Note that this class implements GnunetMessage.Body but does not have a 
MessageID associated.
+ * This message should not be sent/received over the network as a message body.
+ *
+ * @author Florian Dold
+ */
+public class UnknownMessageBody implements GnunetMessage.Body {
+    public int id;
+    public byte[] data;
+}

Modified: gnunet-java/src/org/grothoff/Runabout.java
===================================================================
--- gnunet-java/src/org/grothoff/Runabout.java  2012-05-30 11:21:31 UTC (rev 
21639)
+++ gnunet-java/src/org/grothoff/Runabout.java  2012-05-30 12:42:36 UTC (rev 
21640)
@@ -265,7 +265,8 @@
                                 } catch (IllegalAccessException e) {
                                     throw new RunaboutException(e.toString());
                                 } catch (InvocationTargetException e) {
-                                    throw new RunaboutException(e.toString());
+                                    e.getCause().printStackTrace();
+                                    throw new 
RunaboutException(e.getCause().toString());
                                 }
                             }
                         };

Copied: gnunet-java/test/org/gnunet/util/ServerExample.java (from rev 21637, 
gnunet-java/test/org/gnunet/util/ServerTest.java)
===================================================================
--- gnunet-java/test/org/gnunet/util/ServerExample.java                         
(rev 0)
+++ gnunet-java/test/org/gnunet/util/ServerExample.java 2012-05-30 12:42:36 UTC 
(rev 21640)
@@ -0,0 +1,78 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING.  If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+package org.gnunet.util;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import static org.gnunet.util.Server.*;
+
+/**
+ * Example server implementation.
+ *
+ * @author Florian Dold
+ */
+public class ServerExample {
+
+    public static void main(String[] args) {
+        // usually servers should run inside a service, this is just for 
testing
+        new Program(args) {
+            @Override
+            public void run() {
+                Server s = new Server(new SocketAddress[]{new 
InetSocketAddress("127.0.0.1", 3456)},
+                        RelativeTime.MINUTE,
+                        false);
+                s.setHandler(new Server.MessageRunabout() {
+                    public void visit(TESTMessage tm) {
+                        System.out.println("got TEST message");
+                        final Server.ClientHandle sender = getSender();
+                        sender.notifyTransmitReady(4, RelativeTime.FOREVER, 
new MessageTransmitter() {
+                            @Override
+                            public void transmit(Connection.MessageSink sink) {
+                                sink.send(new TESTMessage());
+                                System.out.println("TEST message sent");
+                                sender.receiveDone(true);
+                            }
+
+                            @Override
+                            public void handleError() {
+                                System.out.println("error talking to client!");
+                            }
+                        });
+                    }
+
+                    public void visit(UnknownMessageBody b) {
+                        System.out.println("got message of unknown type " + 
b.id);
+                    }
+                });
+
+                s.notifyDisconnect(new DisconnectHandler() {
+                    @Override
+                    public void onDisconnect(Server.ClientHandle clientHandle) 
{
+                        System.out.println("client disconnected");
+
+                    }
+                });
+
+            }
+        }.start();
+    }
+}

Deleted: gnunet-java/test/org/gnunet/util/ServerTest.java
===================================================================
--- gnunet-java/test/org/gnunet/util/ServerTest.java    2012-05-30 11:21:31 UTC 
(rev 21639)
+++ gnunet-java/test/org/gnunet/util/ServerTest.java    2012-05-30 12:42:36 UTC 
(rev 21640)
@@ -1,27 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2011, 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING.  If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-package org.gnunet.util;
-
-/**
- * @author Florian Dold
- */
-public class ServerTest {
-}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]