gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r30019 - in gnunet-java/src: main/java/org/gnunet/util test


From: gnunet
Subject: [GNUnet-SVN] r30019 - in gnunet-java/src: main/java/org/gnunet/util test/java/org/gnunet/util
Date: Wed, 9 Oct 2013 02:39:02 +0200

Author: dold
Date: 2013-10-09 02:39:02 +0200 (Wed, 09 Oct 2013)
New Revision: 30019

Modified:
   gnunet-java/src/main/java/org/gnunet/util/Client.java
   gnunet-java/src/main/java/org/gnunet/util/Connection.java
   gnunet-java/src/main/java/org/gnunet/util/Helper.java
   gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
   gnunet-java/src/main/java/org/gnunet/util/Resolver.java
   gnunet-java/src/main/java/org/gnunet/util/Scheduler.java
   gnunet-java/src/main/java/org/gnunet/util/Server.java
   gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java
Log:
- use MST in Connection instead of custom message tokenization
- remove ReceiveHandle, be closer to the C API
- add oneShot option to MST
- fix problems in MST


Modified: gnunet-java/src/main/java/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Client.java       2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Client.java       2013-10-09 
00:39:02 UTC (rev 30019)
@@ -146,8 +146,8 @@
      * @param timeout  deadline after which MessageReceiver.deadline will be 
called
      * @param receiver MessageReceiver that is responsible for the received 
message
      */
-    public Cancelable receiveOne(RelativeTime timeout, MessageReceiver 
receiver) {
-        return connection.receive(timeout, receiver);
+    public void receiveOne(RelativeTime timeout, MessageReceiver receiver) {
+        connection.receive(timeout, receiver);
     }
 
     /**

Modified: gnunet-java/src/main/java/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Connection.java   2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Connection.java   2013-10-09 
00:39:02 UTC (rev 30019)
@@ -21,8 +21,6 @@
 package org.gnunet.util;
 
 import org.gnunet.construct.Construct;
-import org.gnunet.construct.MessageLoader;
-import org.gnunet.construct.ProtocolViolationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,18 +64,15 @@
     private Cancelable connectHandle = null;
 
     /**
-     * The ReceiveHelper responsible for receiving a whole message from the 
service
+     * The ReceiveTask responsible for receiving a whole message from the 
service
      * and calling the respective MessageReceiver.
      */
-    private ReceiveHelper currentReceiveHelper = null;
+    private Scheduler.TaskIdentifier receiveTaskId;
 
-    /**
-     * The buffer with the (partial) message received from the service.
-     * Initially, this buffer has the size of the smallest possible messages, 
but grows when
-     * receiving larger messages.
-     */
-    private ByteBuffer recvBuffer = 
ByteBuffer.allocate(GnunetMessage.Header.SIZE);
+    private MessageReceiver currentReceiver;
 
+    private AbsoluteTime receiveDeadline;
+
     /**
      * The handle for the current transmission. Writes data to the socket.
      */
@@ -91,12 +86,21 @@
     private TransmitHelper nextTransmitHelper = null;
 
     /**
+     * Has the last call to the mst produced a message?
+     */
+    private boolean processedMessage;
+
+    /**
      * The transmitters passed to transmitReadyNotify(...) write to this 
buffer by calling
      * methods on the MessageSink passed to the 
Transmitter.transmit(MessageSink s) method.
      * Initially, this buffer has the size of the smallest possible messages, 
but grows when
      * transmitting larger messages.
      */
     private ByteBuffer transmitBuffer = 
ByteBuffer.allocate(GnunetMessage.Header.SIZE);
+
+    /**
+     * Has the connection been disconnected?
+     */
     private boolean disconnected = false;
 
     /**
@@ -105,10 +109,14 @@
     private Scheduler.TaskIdentifier notifyConnectedTimeout;
 
     /**
-     * Continuation to call when connected
+     * Continuation to call when connected.
      */
     private Continuation notifyConnectedContinuation;
 
+    /**
+     * Message stream tokenizer for messages received from the connection.
+     */
+    private MessageStreamTokenizer mst;
 
     /**
      * An address probe is a connection to a socket that may succeed or not.
@@ -152,137 +160,60 @@
     }
 
     /**
-     * The ReceiveHelper is responsible for receiving a whole
+     * The ReceiveTask is responsible for receiving a whole
      * GnunetMessage and call the respective MessageReceiver with the message 
on success,
      * and null on failure or timeout.
      */
-    private class ReceiveHelper implements Scheduler.Task {
-        private MessageReceiver receiver;
-        private RelativeTime timeout;
-        private GnunetMessage.Header msgh = null;
-        private Scheduler.TaskIdentifier recvTask = null;
-        private boolean finished = false;
-        // is this receiver actively working? if not, the connection process 
has to kick off the receiver
-        // (or select behaves badly)
-        private boolean working = false;
+    private class ReceiveTask implements Scheduler.Task {
+        /**
+         * The task object's work is over, either because it succeeded at its 
job,
+         * or it has been canceled.
+         */
+        public boolean done;
 
-        public ReceiveHelper(MessageReceiver receiver, RelativeTime timeout) {
-            this.receiver = receiver;
-            this.timeout = timeout;
+        private void error() {
+            currentReceiver.handleError();
+            done = true;
         }
 
-        public void dispatchMessage() {
-            assert msgh != null;
-            currentReceiveHelper = null;
-            finished = true;
-            recvBuffer.flip();
-
-            boolean found = true;
-            Class unionClass = null;
-
-            try {
-                unionClass = 
MessageLoader.getUnionClass(GnunetMessage.Body.class, msgh.messageType);
-            } catch (ProtocolViolationException e) {
-                found = false;
-            }
-
-            logger.debug("dispatching received message");
-            if (found) {
-                GnunetMessage msg;
-                try {
-                    msg = Construct.parseAs(recvBuffer, GnunetMessage.class);
-                } catch (OutOfMemoryError e) {
-                    throw new OutOfMemoryError("oom while parsing " + 
unionClass);
-                }
-                receiver.process(msg.body);
-            } else {
-                UnknownMessageBody b = new UnknownMessageBody();
-                b.id = msgh.messageType;
-
-                // may throw exception, doesn't matter as it's the last call
-                receiver.process(b);
-            }
-        }
-
         @Override
         public void run(Scheduler.RunContext ctx) {
-            recvTask = null;
+            if (currentReceiver == null) {
+                throw new AssertionError();
+            }
+            receiveTaskId = null;
             if (ctx.reasons.contains(Scheduler.Reason.TIMEOUT)) {
-                currentReceiveHelper = null;
-                receiver.handleError();
+                error();
             } else if (ctx.reasons.contains(Scheduler.Reason.READ_READY)) {
+                logger.debug("ready to receive");
                 try {
-                    int n = connectionChannel.read(recvBuffer);
-                    if (n == -1) {
-                        currentReceiveHelper = null;
-                        logger.warn("lost connection to service, {}", 
connectionChannel.socket().toString());
-                        connectionChannel.close();
-                        connectionChannel = null;
-                        if (Connection.this.currentTransmitHelper != null) {
-                            Connection.this.currentTransmitHelper.cancel();
-                            Connection.this.currentTransmitHelper = null;
-                        }
-                        try {
-                            receiver.handleError();
-                        } finally {
-                            return;
-                        }
+                    processedMessage = false;
+                    int n = mst.readFrom(connectionChannel, true);
+                    logger.debug("read {} bytes into mst", n);
+                    if (processedMessage) {
+                        done = true;
+                        return;
                     }
-                    logger.debug(String.format("read %s bytes from %s", n, 
connectionChannel.socket().toString()));
-                } catch (IOException e) {
-                    logger.error("read failed:", e);
-                    try {
-                        receiver.handleError();
-                    } finally {
+                    if (-1 == n) {
+                        error();
                         return;
                     }
+                } catch (IOException e) {
+                    error();
+                    return;
                 }
-                if (recvBuffer.remaining() == 0) {
-                    if (msgh != null) {
-                        dispatchMessage();
-                    } else {
-                        recvBuffer.rewind();
-                        msgh = Construct.parseAs(recvBuffer, 
GnunetMessage.Header.class);
-
-                        logger.debug("expecting message of size {}, type {}", 
msgh.messageSize, msgh.messageType);
-                        if (msgh.messageSize > GnunetMessage.Header.SIZE) {
-                            if (recvBuffer.capacity() < msgh.messageSize) {
-                                ByteBuffer buf = 
ByteBuffer.allocate(msgh.messageSize);
-                                recvBuffer.flip();
-                                buf.put(recvBuffer);
-                                recvBuffer = buf;
-                            }
-                            recvBuffer.limit(msgh.messageSize);
-                            schedule();
-                        } else {
-                            dispatchMessage();
-                        }
-                    }
-                } else {
-                    schedule();
+                if (receiveDeadline.isDue()) {
+                    error();
+                    return;
                 }
+                receiveTaskId = 
Scheduler.addRead(receiveDeadline.getRemaining(), connectionChannel, this);
             } else if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
-                // nothing to do!
+                done = true;
             } else {
                 // XXX: what to do here?
                 throw new RuntimeException("receive failed");
             }
         }
-
-        private void schedule() {
-            working = true;
-            recvTask = Scheduler.addRead(timeout, connectionChannel, this);
-        }
-
-        public void cancel() {
-            if (finished) {
-                throw new AssertionError("canceling finished receive");
-            }
-            if (recvTask != null) {
-                recvTask.cancel();
-                recvTask = null;
-            }
-        }
     }
 
 
@@ -397,6 +328,28 @@
         }
     }
 
+    private class ConnectionMstCallback implements MstCalllback {
+        private void dispatch(GnunetMessage.Body mb) {
+            if (processedMessage) {
+                throw new AssertionError();
+            }
+            if (null == currentReceiver) {
+                throw new AssertionError();
+            }
+            currentReceiver.process(mb);
+            processedMessage = true;
+        }
+        @Override
+        public void onUnknownMessage(UnknownMessageBody b) {
+            dispatch(b);
+        }
+
+        @Override
+        public void onKnownMessage(GnunetMessage msg) {
+            dispatch(msg.body);
+        }
+    }
+
     /**
      * Create a connection to the given hostname/port.
      *
@@ -404,12 +357,14 @@
      * @param port     port of the host to connect to
      */
     public Connection(String hostname, int port) {
+        mst = new MessageStreamTokenizer(new ConnectionMstCallback());
         addressProbes = new LinkedList<AddressProbe>();
         ConnectionResolveHandler addressHandler = new 
ConnectionResolveHandler(port);
         resolveHandle = Resolver.getInstance().resolveHostname(hostname, 
RelativeTime.FOREVER, addressHandler);
     }
 
     public Connection(SocketChannel sock) {
+        mst = new MessageStreamTokenizer(new ConnectionMstCallback());
         assert sock != null;
         this.connectionChannel = sock;
     }
@@ -469,6 +424,10 @@
     }
 
 
+    /**
+     * Actually connect the socket that select reported as ready to connect.
+     * Discards all remaining address probes.
+     */
     private void finishConnect(AddressProbe probe) {
         // can happen if the addres probe task was already scheduled
         if (connectionChannel != null) {
@@ -512,9 +471,6 @@
         if (currentTransmitHelper != null) {
             currentTransmitHelper.start();
         }
-        if (currentReceiveHelper != null && !currentReceiveHelper.working) {
-            currentReceiveHelper.schedule();
-        }
         Continuation c = notifyConnectedContinuation;
         notifyConnectedContinuation = null;
         if (notifyConnectedTimeout != null) {
@@ -544,42 +500,45 @@
         return connectionChannel != null && connectionChannel.isConnected();
     }
 
-
-    public interface ReceiveHandle extends Cancelable {
-    }
-
     /**
      * Receive one message from the network.
      *
      * @param timeout  deadline after which receiver.onError() will be called
      * @param receiver MessageReceiver that is responsible for the received 
message
      */
-    public ReceiveHandle receive(RelativeTime timeout, final MessageReceiver 
receiver) {
-        if (currentReceiveHelper != null) {
-            throw new AssertionError("receive must not be called while 
receiving");
-        }
-
+    public void receive(final RelativeTime timeout, final MessageReceiver 
receiver) {
+        if (receiveTaskId != null)
+            throw new AssertionError("already receiving");
         if (!isConnected()) {
             throw new AssertionError("cannot receive if not connected");
         }
 
-        recvBuffer.clear();
-        recvBuffer.limit(GnunetMessage.Header.SIZE);
-        final ReceiveHelper rh = new ReceiveHelper(receiver, timeout);
-        currentReceiveHelper = rh;
+        currentReceiver = receiver;
+        receiveDeadline = timeout.toAbsolute();
 
-        // we can only schedule the receive helper if we are sure the 
connection is made, otherwise
-        // select will misbehave!
-        if (connectionChannel.isConnected()) {
-            currentReceiveHelper.schedule();
-        }
+        // make sure that the receiver is never called directly
+        Scheduler.add(new Scheduler.Task() {
+            @Override
+            public void run(Scheduler.RunContext ctx) {
+                // full message still in buffer?
+                processedMessage = false;
+                if (mst.extractOne()) {
+                    logger.debug("full message was in buffer, not reading from 
socket");
+                    if (!processedMessage) {
+                        throw new AssertionError();
+                    }
+                    return;
+                }
 
-        return new ReceiveHandle() {
-            @Override
-            public void cancel() {
-                rh.cancel();
+                // did we get disconnected in the mean time?
+                if (connectionChannel == null) {
+                    return;
+                }
+
+                final ReceiveTask task = new ReceiveTask();
+                receiveTaskId = Scheduler.addRead(timeout, connectionChannel, 
task);
             }
-        };
+        });
     }
 
     /**
@@ -626,7 +585,6 @@
         };
     }
 
-
     /**
      * Call cont after establishing the connection or when the timeout has 
occured.
      *
@@ -662,22 +620,18 @@
             logger.error("disconnect called twice");
         }
         disconnected = true;
-
+        if (receiveTaskId != null) {
+            receiveTaskId.cancel();
+            receiveTaskId = null;
+        }
         if (currentTransmitHelper != null) {
             currentTransmitHelper.cancel();
             currentTransmitHelper = null;
         }
-
         if (nextTransmitHelper != null) {
             nextTransmitHelper.cancel();
             nextTransmitHelper = null;
         }
-
-        if (currentReceiveHelper != null) {
-            currentReceiveHelper.cancel();
-            currentReceiveHelper = null;
-        }
-
         if (resolveHandle != null) {
             resolveHandle.cancel();
             resolveHandle = null;
@@ -694,5 +648,11 @@
             }
             connectionChannel = null;
         }
+
+        if (addressProbes != null) {
+            for (AddressProbe ap : addressProbes) {
+                ap.cancel();
+            }
+        }
     }
 }

Modified: gnunet-java/src/main/java/org/gnunet/util/Helper.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Helper.java       2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Helper.java       2013-10-09 
00:39:02 UTC (rev 30019)
@@ -161,7 +161,13 @@
         @Override
         public void run(Scheduler.RunContext ctx) {
             readTaskId = null;
-            int n = mst.readFrom(readThread.pipe.source());
+            int n = 0;
+            try {
+                n = mst.readFrom(readThread.pipe.source(), false);
+            } catch (IOException e) {
+                logger.warn("helper reader got io exception: {}", e);
+                return;
+            }
             if (n != -1 && readThread.pipe.source().isOpen()) {
                 readTaskId = readTaskConfig.schedule();
             }

Modified: gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java       
2013-10-08 23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java       
2013-10-09 00:39:02 UTC (rev 30019)
@@ -39,7 +39,7 @@
  */
 public class MessageStreamTokenizer {
     private static final Logger logger = LoggerFactory
-            .getLogger(Service.class);
+            .getLogger(MessageStreamTokenizer.class);
     private MstCalllback mstCalllback;
     private ByteBuffer buffer;
     GnunetMessage.Header msgh;
@@ -49,16 +49,6 @@
         this.buffer = ByteBuffer.allocate(4);
     }
 
-    public void ensureBufferSize() {
-        if (buffer.capacity() < msgh.messageSize) {
-            ByteBuffer buf = ByteBuffer.allocate(msgh.messageSize);
-            buffer.flip();
-            buf.put(buffer);
-            buf.flip();
-            buffer = buf;
-        }
-    }
-
     public void readAndDispatch() {
         Class unionClass = null;
         boolean found = true;
@@ -85,38 +75,59 @@
         }
     }
 
+
     /**
-     * Read from a channel into the mst.
+     * Try to extract one message from the MST, call appropriate callbacks.
      *
-     * @param source channel to read from
-     * @return -1 on end of stream, number of bytes read otherwise
+     * @return true if message could be extracted, false if not enough data is 
available
      */
-    public int readFrom(ReadableByteChannel source) {
-        int n;
-        try {
-            n = source.read(buffer);
-        } catch (ClosedChannelException e) {
-            return -1;
-        }catch (IOException e) {
-            throw new IOError(e);
-        }
+    public boolean extractOne() {
+        System.out.println("trying to extract message from buffer");
         if (msgh == null && buffer.position() >= 4) {
-            logger.debug("got header in mst");
-            // remember write position and prepare for reading
-            int writePos = buffer.position();
+            // prepare for reading
             buffer.flip();
             msgh = Construct.parseAs(buffer, GnunetMessage.Header.class);
-            ensureBufferSize();
-            // prepare for writing again, and restore write position
+            // undo read
             buffer.position(0);
-            buffer.compact();
+            logger.debug("got header in mst, (" + buffer.limit() + "/" + 
msgh.messageSize + " read)");
+            if (buffer.capacity() < msgh.messageSize) {
+                ByteBuffer newBuf = ByteBuffer.allocate(msgh.messageSize);
+                newBuf.put(buffer);
+                buffer = newBuf;
+            } else {
+                // set pos to limit and limit to capacity and
+                buffer.compact();
+            }
+            logger.debug("buffer pos is now " + buffer.position());
         }
         if (msgh != null && buffer.position() >= msgh.messageSize) {
             buffer.flip();
             readAndDispatch();
             msgh = null;
             buffer.compact();
+            return true;
         }
+        return false;
+    }
+
+
+    /**
+     * Read from a channel into the mst. Does not call any callbacks.
+     *
+     * @param source channel to read from
+     * @return -1 on end of stream, number of bytes read otherwise
+     */
+    public int readFrom(ReadableByteChannel source, boolean oneShot) throws 
IOException {
+        int n;
+        n = source.read(buffer);
+        if (oneShot) {
+            extractOne();
+        }
+        else {
+            while (extractOne()) {
+                // loop
+            }
+        }
         return n;
     }
 }

Modified: gnunet-java/src/main/java/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Resolver.java     2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Resolver.java     2013-10-09 
00:39:02 UTC (rev 30019)
@@ -167,7 +167,6 @@
         private boolean finished = false;
         private boolean canceled = false;
         private Cancelable transmitTask = null;
-        private Cancelable receiveTask = null;
 
         public void cancel() {
             if (finished) {
@@ -179,9 +178,6 @@
             if (queuedRequests.contains(this)) {
                 queuedRequests.remove(this);
             } else {
-                if (receiveTask != null) {
-                    receiveTask.cancel();
-                }
                 if (transmitTask != null) {
                     transmitTask.cancel();
                 }
@@ -278,10 +274,9 @@
                 rh.transmitTask = null;
 
                 logger.debug("recv in notifyTransmitReady cb");
-                rh.receiveTask = client.receiveOne(deadline.getRemaining(), 
new MessageReceiver() {
+                client.receiveOne(deadline.getRemaining(), new 
MessageReceiver() {
                     @Override
                     public void process(GnunetMessage.Body msg) {
-                        rh.receiveTask = null;
                         ResolverResponse gmsg = (ResolverResponse) msg;
                         if (gmsg.responseBody != null) {
                             try {
@@ -294,7 +289,7 @@
                                 }
 
                                 rh.cb.onAddress(in_addr);
-                                rh.receiveTask = 
client.receiveOne(deadline.getRemaining(), this);
+                                client.receiveOne(deadline.getRemaining(), 
this);
                             } catch (UnknownHostException e) {
                                 throw new 
ProtocolViolationException("malformed address");
                             }

Modified: gnunet-java/src/main/java/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Scheduler.java    2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Scheduler.java    2013-10-09 
00:39:02 UTC (rev 30019)
@@ -328,6 +328,9 @@
         }
 
         public void addSelectEvent(SelectableChannel channel, int event) {
+            if (channel == null) {
+                throw new AssertionError("channel may not be null");
+            }
             if (subscriptions == null)
                 subscriptions = new Subscriptions();
             subscriptions.add(channel, event);

Modified: gnunet-java/src/main/java/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Server.java       2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Server.java       2013-10-09 
00:39:02 UTC (rev 30019)
@@ -131,11 +131,6 @@
         private int referenceCount = 0;
 
         /**
-         * Handle for canceling the receive process of this client, null if no 
receive is currently going on.
-         */
-        private Cancelable currentReceive;
-
-        /**
          * Set to true if the connection to this client should not prevent the 
server from shutting down.
          */
         private boolean isMonitor;
@@ -209,14 +204,10 @@
          * @param stayConnected false if connection to the client should be 
closed
          */
         public void receiveDone(boolean stayConnected) {
-            if (currentReceive != null) {
-                throw new AssertionError("receiveDone() called, but still 
waiting for message");
-            }
             if (stayConnected) {
-                currentReceive = connection.receive(RelativeTime.FOREVER, new 
MessageReceiver() {
+                connection.receive(RelativeTime.FOREVER, new MessageReceiver() 
{
                     @Override
                     public void process(GnunetMessage.Body msg) {
-                        currentReceive = null;
                         if ((msg instanceof UnknownMessageBody) || 
!expectedMessages.contains(msg.getClass())) {
                             if (requireFound) {
                                 logger.info("disconnecting client sending 
unknown message");

Modified: gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java
===================================================================
--- gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java 2013-10-08 
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java 2013-10-09 
00:39:02 UTC (rev 30019)
@@ -40,6 +40,7 @@
             .getLogger(ResolverTest.class);
     @Test
     public void test_resolver() {
+        Program.configureLogging("DEBUG");
         final Wrapper<Boolean> finished1 = new Wrapper<Boolean>(true);
         final Wrapper<Boolean> finished2 = new Wrapper<Boolean>(true);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]