gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21245 - in gnunet-java/src/org/gnunet: construct construct


From: gnunet
Subject: [GNUnet-SVN] r21245 - in gnunet-java/src/org/gnunet: construct construct/parsers core dht statistics util
Date: Thu, 3 May 2012 13:54:32 +0200

Author: dold
Date: 2012-05-03 13:54:32 +0200 (Thu, 03 May 2012)
New Revision: 21245

Added:
   gnunet-java/src/org/gnunet/dht/BlockType.java
   gnunet-java/src/org/gnunet/dht/ClientGetMessage.java
   gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java
   gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
   gnunet-java/src/org/gnunet/dht/ClientResultMessage.java
   gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java
   gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java
   gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java
   gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
   gnunet-java/src/org/gnunet/dht/RouteOption.java
   gnunet-java/src/org/gnunet/statistics/Request.java
   gnunet-java/src/org/gnunet/statistics/RequestMessage.java
   gnunet-java/src/org/gnunet/statistics/RequestQueue.java
   gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
   gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
   gnunet-java/src/org/gnunet/statistics/SetMessage.java
Modified:
   gnunet-java/src/org/gnunet/construct/Construct.java
   gnunet-java/src/org/gnunet/construct/MessageLoader.java
   gnunet-java/src/org/gnunet/construct/MsgMap.txt
   gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
   gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
   gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
   gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
   gnunet-java/src/org/gnunet/core/Core.java
   gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
   gnunet-java/src/org/gnunet/statistics/Statistics.java
   gnunet-java/src/org/gnunet/util/Client.java
   gnunet-java/src/org/gnunet/util/Connection.java
   gnunet-java/src/org/gnunet/util/Resolver.java
   gnunet-java/src/org/gnunet/util/Scheduler.java
   gnunet-java/src/org/gnunet/util/Server.java
Log:
refactored Scheduler, Statistics, fixes

Modified: gnunet-java/src/org/gnunet/construct/Construct.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Construct.java 2012-05-03 11:36:52 UTC 
(rev 21244)
+++ gnunet-java/src/org/gnunet/construct/Construct.java 2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -36,6 +36,7 @@
  * @author Christian Grothoff
  * @author Florian Dold
  */
address@hidden("unchecked")
 public class Construct {
     private static final Logger logger = LoggerFactory
             .getLogger(Construct.class);
@@ -48,8 +49,9 @@
      * Information the root of the parser, if the target is nested in another 
message.
      */
     private static class ParserContext {
+        List<Field> parserPath = new ArrayList<Field>();
+        // fully determined by parserPath
         ArrayList<Field> frameSizePath = new ArrayList<Field>();
-        List<Field> parserPath = new ArrayList<Field>();
 
         @Override
         public boolean equals(Object other) {

Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-05-03 
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java     2012-05-03 
11:54:32 UTC (rev 21245)
@@ -129,6 +129,7 @@
     }
 
 
+    @SuppressWarnings("unchecked")
     private static Class<? extends MessageUnion> loadClass(String className) {
         ClassLoader cl = ClassLoader.getSystemClassLoader();
         Class<MessageUnion> msgClass;

Modified: gnunet-java/src/org/gnunet/construct/MsgMap.txt
===================================================================
--- gnunet-java/src/org/gnunet/construct/MsgMap.txt     2012-05-03 11:36:52 UTC 
(rev 21244)
+++ gnunet-java/src/org/gnunet/construct/MsgMap.txt     2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -3,25 +3,29 @@
 org.gnunet.util.Resolver$Address|0=org.gnunet.util.Resolver$TextualAddress
 org.gnunet.util.Resolver$Address|1=org.gnunet.util.Resolver$NumericAddress
 
org.gnunet.util.GnunetMessage$Body|68=org.gnunet.core.Core$DisconnectNotifyMessage
-org.gnunet.util.GnunetMessage$Body|171=org.gnunet.statistics.Statistics$ResponseEndMessage
-org.gnunet.util.GnunetMessage$Body|170=org.gnunet.statistics.Statistics$ResponseValueMessage
 
org.gnunet.util.GnunetMessage$Body|70=org.gnunet.core.Core$NotifyInboundTrafficMessage
-org.gnunet.util.GnunetMessage$Body|169=org.gnunet.statistics.Statistics$RequestMessage
 
org.gnunet.util.GnunetMessage$Body|71=org.gnunet.core.Core$NotifyOutboundTrafficMessage
-org.gnunet.util.GnunetMessage$Body|168=org.gnunet.statistics.Statistics$SetMessage
 org.gnunet.util.GnunetMessage$Body|4=org.gnunet.util.Resolver$GetMessage
 org.gnunet.util.GnunetMessage$Body|64=org.gnunet.core.Core$InitMessage
+org.gnunet.util.GnunetMessage$Body|5=org.gnunet.util.Resolver$ResolverResponse
 org.gnunet.util.GnunetMessage$Body|65=org.gnunet.core.Core$InitReplyMessage
-org.gnunet.util.GnunetMessage$Body|5=org.gnunet.util.Resolver$ResolverResponse
-org.gnunet.util.GnunetMessage$Body|143=org.gnunet.dht.DistributedHashTable$DHTClientGetMessage
+org.gnunet.util.GnunetMessage$Body|143=org.gnunet.dht.ClientGetMessage
+org.gnunet.util.GnunetMessage$Body|142=org.gnunet.dht.ClientPutMessage
 org.gnunet.util.GnunetMessage$Body|67=org.gnunet.core.Core$ConnectNotifyMessage
-org.gnunet.util.GnunetMessage$Body|142=org.gnunet.dht.DistributedHashTable$DHTClientPutMessage
 org.gnunet.util.GnunetMessage$Body|76=org.gnunet.core.Core$SendMessage
 org.gnunet.util.GnunetMessage$Body|74=org.gnunet.core.Core$SendMessageRequest
 org.gnunet.util.GnunetMessage$Body|75=org.gnunet.core.Core$SendMessageReady
+org.gnunet.util.GnunetMessage$Body|153=org.gnunet.dht.MonitorStartMessage
 org.gnunet.util.GnunetMessage$Body|42001=org.gnunet.core.Core$MyMessage
 
org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.NetworkSizeEstimation$UpdateMessage
 
org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.NetworkSizeEstimation$StartMessage
-org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.DistributedHashTable$DHTClientGetStopMessage
-org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.DistributedHashTable$DHTClientResultMessage
-# generated 2012/04/19 13:45:01
+org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage
+org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.ClientResultMessage
+org.gnunet.util.GnunetMessage$Body|149=org.gnunet.dht.MonitorGetMessage
+org.gnunet.util.GnunetMessage$Body|150=org.gnunet.dht.MonitorGetRespMessage
+org.gnunet.util.GnunetMessage$Body|151=org.gnunet.dht.MonitorPutMessage
+org.gnunet.util.GnunetMessage$Body|171=org.gnunet.statistics.ResponseEndMessage
+org.gnunet.util.GnunetMessage$Body|170=org.gnunet.statistics.ResponseValueMessage
+org.gnunet.util.GnunetMessage$Body|169=org.gnunet.statistics.RequestMessage
+org.gnunet.util.GnunetMessage$Body|168=org.gnunet.statistics.SetMessage
+# generated 2012/05/02 20:26:30

Modified: gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/FillParser.java        
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/FillParser.java        
2012-05-03 11:54:32 UTC (rev 21245)
@@ -54,6 +54,7 @@
         ArrayList<Message> list = new ArrayList<Message>(10);
 
         while (remaining > 0) {
+            @SuppressWarnings("unchecked")
             Message next = ReflectUtil.justInstantiate((Class<Message>) 
targetField.getType().getComponentType());
             int s = elemParser.parse(srcBuf, frameOffset, frameObj, next);
             size += s;
@@ -63,7 +64,7 @@
         try {
             targetField.set(dstObj, list.toArray());
         } catch (IllegalAccessException e) {
-            throw new AssertionError("cannot acces field");
+            throw new AssertionError("cannot access field");
         }
 
         return size;

Modified: gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java      
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java      
2012-05-03 11:54:32 UTC (rev 21245)
@@ -46,6 +46,7 @@
         ReflectUtil.justSet(dstObj, targetField, arr);
 
         for (int i = 0; i < elemNumber; ++i) {
+            @SuppressWarnings("unchecked")
             Message elemObj = 
ReflectUtil.justInstantiate((Class<Message>)targetField.getType().getComponentType());
             Array.set(arr, i, elemObj);
 

Modified: gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java       
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java       
2012-05-03 11:54:32 UTC (rev 21245)
@@ -8,6 +8,9 @@
 import java.util.List;
 
 
+
+// unchecked casts are necessary
address@hidden("unchecked")
 public class UnionParser implements Parser {
     private final List<Field> frameSizePath;
 
@@ -81,7 +84,8 @@
 
         return p.write(dstBuf, (Message) ReflectUtil.justGet(src, 
targetField));
     }
-    
+
+    @SuppressWarnings("unchecked")
     public int getTag(Message m) {
         return MessageLoader.getUnionTag(unionType, (Class<MessageUnion>) 
ReflectUtil.justGet(m, targetField).getClass());
     }

Modified: 
gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java   
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java   
2012-05-03 11:54:32 UTC (rev 21245)
@@ -37,7 +37,8 @@
     @Override
     public int parse(final ByteBuffer srcBuf, int frameOffset, Message 
frameObj, final Message dstObj) {
         final int elemNumber = (int) sizeField.get(dstObj);
-        final Class arrayElementType = 
targetField.getType().getComponentType();
+        @SuppressWarnings("unchecked")
+        final Class<Message> arrayElementType = (Class<Message>) 
targetField.getType().getComponentType();
 
         int size = 0;
 
@@ -47,7 +48,7 @@
         for (int i = 0; i < elemNumber; ++i) {
             Message elemObj;
 
-            elemObj = (Message) ReflectUtil.justInstantiate(arrayElementType);
+            elemObj = (Message) 
ReflectUtil.<Message>justInstantiate(arrayElementType);
 
 
             Array.set(arr, i, elemObj);

Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java   2012-05-03 11:36:52 UTC (rev 
21244)
+++ gnunet-java/src/org/gnunet/core/Core.java   2012-05-03 11:54:32 UTC (rev 
21245)
@@ -633,6 +633,7 @@
         return ret;
     }
 
+    @SuppressWarnings("unchecked")
     private static int[] getRunaboutMessageTypes(Runabout r) {
         ArrayList<Class> visitees = getRunaboutVisitees(r);
         int[] msgtypes = new int[visitees.size()];

Added: gnunet-java/src/org/gnunet/dht/BlockType.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/BlockType.java                               
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/BlockType.java       2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -0,0 +1,61 @@
+package org.gnunet.dht;
+
+/**
+ * Information on how to interpret a block of data.
+ */
+public enum BlockType {
+    /**
+     * Any type of block, used as a wildcard when searching.  Should
+     * never be attached to a specific block.
+     */
+    ANY(0),
+    /**
+     * Data block (leaf) in the CHK tree.
+     */
+    DBLOCK(1),
+    /**
+     * Inner block in the CHK tree.
+     */
+    IBLOCK(2),
+    /**
+     * Type of a block representing a keyword search result.  Note that
+     * the values for KBLOCK, SBLOCK and NBLOCK must be consecutive.
+     */
+    KBLOCK(3),
+    /**
+     * Type of a block that is used to advertise content in a namespace.
+     */
+    SBLOCK(4),
+    /**
+     * Type of a block that is used to advertise a namespace.
+     */
+    NBLOCK(5),
+    /**
+     * Type of a block representing a block to be encoded on demand from disk.
+     * Should never appear on the network directly.
+     */
+    FS_ONDEMAND(6),
+    /**
+     * Type of a block that contains a HELLO for a peer (for
+     * DHT find-peer operations).
+     */
+    DHT_HELLO(7),
+    /**
+     * Block for testing.
+     */
+    TEST(8),
+    /**
+     * Block for storing .gnunet-domains
+     */
+    DNS(10),
+    /**
+     * Block for storing record data
+     */
+    NAMERECORD(11);
+
+    public final int val;
+
+    BlockType(int val) {
+        this.val = val;
+    }
+}

Added: gnunet-java/src/org/gnunet/dht/ClientGetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientGetMessage.java                        
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientGetMessage.java        2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,31 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(143)
+public class ClientGetMessage implements GnunetMessage.Body {
+    /**
+     * Combination of RouteOption.*
+     */
+    @UInt32
+    public int options;
+    @UInt32
+    public int desiredReplicationLevel;
+    @UInt32
+    public int type;
+    @NestedMessage
+    public HashCode key;
+    @UInt64
+    public long uniqueId;
+    @ByteFill
+    public byte[] xquery;
+}

Added: gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java                    
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java    2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,25 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(144)
+public class ClientGetStopMessage implements GnunetMessage.Body {
+    @UInt32
+    public int reserved = 0;
+    @UInt64
+    public long unique_id;
+    @NestedMessage
+    public HashCode key;
+}

Added: gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutMessage.java                        
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientPutMessage.java        2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,38 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.ByteFill;
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(142)
+public class ClientPutMessage implements GnunetMessage.Body {
+    /**
+     * Type of data to insert, one of BlockType.*
+     */
+    @UInt32
+    public int type;
+    /**
+     * Combination of RouteOption.*
+     */
+    @UInt32
+    public int options;
+    @UInt32
+    public int desiredReplicationLevel;
+    @NestedMessage
+    public AbsoluteTimeMessage expiration;
+    @NestedMessage
+    public HashCode hash;
+    @ByteFill
+    public byte[] data;
+}

Added: gnunet-java/src/org/gnunet/dht/ClientResultMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientResultMessage.java                     
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientResultMessage.java     2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,36 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:06 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(145)
+public class ClientResultMessage implements GnunetMessage.Body {
+    @UInt32
+    public int type;
+    @UInt32
+    public int putPathLength;
+    @UInt32
+    public int getPathLength;
+    @UInt64
+    public long uid;
+    @NestedMessage
+    public AbsoluteTimeMessage expiration;
+    @NestedMessage
+    public HashCode key;
+    @VariableSizeArray(lengthField = "putPathLength")
+    public PeerIdentity[] putPath;
+    @VariableSizeArray(lengthField = "getPathLength")
+    public PeerIdentity[] getPath;
+    @ByteFill
+    public byte[] data;
+}

Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-03 
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-03 
11:54:32 UTC (rev 21245)
@@ -1,6 +1,5 @@
 package org.gnunet.dht;
 
-import org.gnunet.construct.*;
 import org.gnunet.util.*;
 import org.gnunet.util.getopt.Option;
 import org.gnunet.util.getopt.OptionAction;
@@ -23,366 +22,9 @@
     private static final Logger logger = LoggerFactory
             .getLogger(DistributedHashTable.class);
 
+    private Configuration cfg;
 
     /**
-     * Information on how to interpret a block of data.
-     */
-    public enum BlockType {
-        /**
-         * Any type of block, used as a wildcard when searching.  Should
-         * never be attached to a specific block.
-         */
-        ANY(0),
-        /**
-         * Data block (leaf) in the CHK tree.
-         */
-        DBLOCK(1),
-        /**
-         * Inner block in the CHK tree.
-         */
-        IBLOCK(2),
-        /**
-         * Type of a block representing a keyword search result.  Note that
-         * the values for KBLOCK, SBLOCK and NBLOCK must be consecutive.
-         */
-        KBLOCK(3),
-        /**
-         * Type of a block that is used to advertise content in a namespace.
-         */
-        SBLOCK(4),
-        /**
-         * Type of a block that is used to advertise a namespace.
-         */
-        NBLOCK(5),
-        /**
-         * Type of a block representing a block to be encoded on demand from 
disk.
-         * Should never appear on the network directly.
-         */
-        FS_ONDEMAND(6),
-        /**
-         * Type of a block that contains a HELLO for a peer (for
-         * DHT find-peer operations).
-         */
-        DHT_HELLO(7),
-        /**
-         * Block for testing.
-         */
-        TEST(8),
-        /**
-         * Block for storing .gnunet-domains
-         */
-        DNS(10),
-        /**
-         * Block for storing record data
-         */
-        NAMERECORD(11);
-
-        private int val;
-
-        BlockType(int val) {
-            this.val = val;
-        }
-    }
-
-
-    /**
-     * Options passed to the dht service for routing requests.
-     */
-    enum RouteOption {
-        /**
-         * Default.  Do nothing special.
-         */
-        NONE(0),
-        /**
-         * Each peer along the way should look at 'enc' (otherwise
-         * only the k-peers closest to the key should look at it).
-         */
-        DEMULTIPLEX_EVERYWHERE(1),
-        /**
-         * We should keep track of the route that the message
-         * took in the P2P network.
-         */
-        RECORD_ROUTE(2),
-        /**
-         * This is a 'FIND-PEER' request, so approximate results are fine.
-         */
-        FIND_PEER(4),
-        /**
-         * Possible message option for query key randomization.
-         */
-        BART(8);
-
-        private int val;
-
-        RouteOption(int val) {
-            this.val = val;
-        }
-    }
-
-    @UnionCase(143)
-    public static class DHTClientGetMessage implements GnunetMessage.Body {
-        /**
-         * Combination of RouteOption.*
-         */
-        @UInt32
-        public int options;
-        @UInt32
-        public int desiredReplicationLevel;
-        @UInt32
-        public int type;
-        @NestedMessage
-        public HashCode key;
-        @UInt64
-        public long uniqueId;
-        @ByteFill
-        public byte[] xquery;
-    }
-
-    @UnionCase(142)
-    public static class DHTClientPutMessage implements GnunetMessage.Body {
-        /**
-         * Type of data to insert, one of BlockType.*
-         */
-        @UInt32
-        public int type;
-        /**
-         * Combination of RouteOption.*
-         */
-        @UInt32
-        public int options;
-        @UInt32
-        public int desiredReplicationLevel;
-        @NestedMessage
-        public AbsoluteTimeMessage expiration;
-        @NestedMessage
-        public HashCode hash;
-        @ByteFill
-        public byte[] data;
-    }
-
-    @UnionCase(144)
-    public static class DHTClientGetStopMessage implements GnunetMessage.Body {
-        @UInt32
-        public int reserved = 0;
-        @UInt64
-        public long unique_id;
-        @NestedMessage
-        public HashCode key;
-    }
-
-    @UnionCase(145)
-    public static class DHTClientResultMessage implements GnunetMessage.Body {
-        @UInt32
-        public int type;
-        @UInt32
-        public int putPathLength;
-        @UInt32
-        public int getPathLength;
-        @UInt64
-        public long uid;
-        @NestedMessage
-        public AbsoluteTimeMessage expiration;
-        @NestedMessage
-        public HashCode key;
-        @VariableSizeArray(lengthField = "putPathLength")
-        public PeerIdentity[] putPath;
-        @VariableSizeArray(lengthField = "getPathLength")
-        public PeerIdentity[] getPath;
-        @ByteFill
-        public byte[] data;
-    }
-
-
-    /**
-     * Message to request monitoring messages, clients --> DHT service.
-     */
-    @UnionCase(153)
-    public static class DHTMonitorStartMessage implements GnunetMessage.Body {
-        /**
-         * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
-         */
-        @UInt32
-        public int type;
-
-        /**
-         * Flag whether to notify about GET messages.
-         */
-        @UInt16
-        public int get;
-
-        /**
-         * Flag whether to notify about GET_REPONSE messages.
-         */
-        @UInt16
-        public int getResp;
-
-        /**
-         * Flag whether to notify about PUT messages.
-         */
-        @UInt16
-        public int put;
-
-        /**
-         * Flag whether to use the provided key to filter messages.
-         */
-        @UInt16
-        public int filter_key;
-
-        /*
-        The key to filter messages by.
-         */
-        @NestedMessage
-        public HashCode key;
-    }
-
-    /**
-     * Message to monitor put requests going through peer, DHT service --> 
clients.
-     */
-    @UnionCase(151)
-    public static class DHTMonitorPutMessage implements GnunetMessage.Body {
-        /**
-         * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
-         */
-        @UInt32
-        public int options;
-
-        /**
-         * The type of data in the request.
-         */
-        @UInt32
-        public int type;
-
-        /**
-         * Hop count so far.
-         */
-        @UInt32
-        public int hop_count;
-
-        /**
-         * Replication level for this message
-         */
-        @UInt32
-        public int desired_replication_level;
-
-        /**
-         * Number of peers recorded in the outgoing path from source to the
-         * storage location of this message.
-         */
-        @UInt32
-        public int put_path_length;
-
-        /**
-         * How long should this data persist?
-         */
-        @NestedMessage
-        public AbsoluteTimeMessage expirationTime;
-
-        /**
-         * The key to store the value under.
-         */
-        @NestedMessage
-        public HashCode key;
-
-        @VariableSizeArray(lengthField = "put_path_length")
-        public PeerIdentity[] putPath;
-
-        @ByteFill
-        public byte[] data;
-    }
-
-
-    /**
-     * Message to monitor get requests going through peer, DHT service -> 
clients.
-     */
-    @UnionCase(149)
-    public class DHTMonitorGetMessage implements GnunetMessage.Body {
-        /**
-         * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
-         */
-        @UInt32
-        public int options;
-
-        /**
-         * The type of data in the request.
-         */
-        @UInt32
-        public int type;
-
-        /**
-         * Hop count
-         */
-        @UInt32
-        public int hop_count;
-
-        /**
-         * Replication level for this message
-         */
-        @UInt32
-        public int desired_replication_level;
-
-        /**
-         * Number of peers recorded in the outgoing path from source to the
-         * storage location of this message.
-         */
-        @UInt32
-        public int get_path_length;
-
-        /**
-         * The key to store the value under.
-         */
-        @NestedMessage
-        public HashCode key;
-
-        @VariableSizeArray(lengthField = "get_path_length")
-        public PeerIdentity[] getPath;
-    }
-
-    /**
-     * Message to monitor get results going through peer, DHT service --> 
clients.
-     */
-    @UnionCase(150)
-    public class MonitorGetRespMessage implements GnunetMessage.Body {
-        /**
-         * Content type.
-         */
-        @UInt32
-        int type;
-
-        /**
-         * Length of the PUT path that follows (if tracked).
-         */
-        @UInt32
-        int put_path_length;
-
-        /**
-         * Length of the GET path that follows (if tracked).
-         */
-        @UInt32
-        int get_path_length;
-
-        /**
-         * When does the content expire?
-         */
-        @NestedMessage
-        public AbsoluteTimeMessage expiration;
-
-        /**
-         * The key of the corresponding GET request.
-         */
-        @NestedMessage
-        public HashCode key;
-
-        @VariableSizeArray(lengthField = "put_path_length")
-        public PeerIdentity[] putPath;
-
-        @VariableSizeArray(lengthField = "get_path_length")
-        public PeerIdentity[] getPath;
-
-        @ByteFill
-        public byte[] data;
-    }
-
-    /**
      * Callback object for requests to the dht
      */
     public interface ResultCallback {
@@ -417,7 +59,7 @@
      * @param cfg the configuration to use
      */
     public DistributedHashTable(Configuration cfg) {
-        Configuration cfg1 = cfg;
+        this.cfg = cfg;
         client = new Client("dht", cfg);
     }
 
@@ -437,7 +79,7 @@
     public void put(HashCode key, byte[] data, int replicationLevel, 
Set<RouteOption> routeOptions,
                     BlockType type, AbsoluteTime expiration,
                     RelativeTime timeout, final Continuation cont) {
-        final DHTClientPutMessage cpm = new DHTClientPutMessage();
+        final ClientPutMessage cpm = new ClientPutMessage();
         cpm.data = data;
         cpm.hash = key;
         cpm.desiredReplicationLevel = replicationLevel;
@@ -488,7 +130,7 @@
             client.notifyTransmitReady(RelativeTime.FOREVER, false, 0, new 
MessageTransmitter() {
                 @Override
                 public void transmit(Connection.MessageSink sink) {
-                    final DHTClientGetStopMessage sm = new 
DHTClientGetStopMessage();
+                    final ClientGetStopMessage sm = new ClientGetStopMessage();
                     sm.key = key;
                     sm.unique_id = uid;
                     sink.send(sm);
@@ -511,7 +153,7 @@
         public void process(GnunetMessage.Body msg) {
             receiveHandle = null;
 
-            DHTClientResultMessage rm = (DHTClientResultMessage) msg;
+            ClientResultMessage rm = (ClientResultMessage) msg;
             GetRequest request = null;
 
             for (GetRequest r : activeRequests) {
@@ -560,7 +202,7 @@
         request.cb = cb;
         request.deadline = timeout.toAbsolute();
 
-        final DHTClientGetMessage getMessage = new DHTClientGetMessage();
+        final ClientGetMessage getMessage = new ClientGetMessage();
         getMessage.desiredReplicationLevel = replication;
         getMessage.key = key;
         getMessage.options = 0;

Added: gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java                       
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java       2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,55 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.VariableSizeArray;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor get requests going through peer, DHT service -> clients.
+ */
address@hidden(149)
+public class MonitorGetMessage implements GnunetMessage.Body {
+    /**
+     * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+     */
+    @UInt32
+    public int options;
+
+    /**
+     * The type of data in the request.
+     */
+    @UInt32
+    public int type;
+
+    /**
+     * Hop count
+     */
+    @UInt32
+    public int hop_count;
+
+    /**
+     * Replication level for this message
+     */
+    @UInt32
+    public int desired_replication_level;
+
+    /**
+     * Number of peers recorded in the outgoing path from source to the
+     * storage location of this message.
+     */
+    @UInt32
+    public int get_path_length;
+
+    /**
+     * The key to store the value under.
+     */
+    @NestedMessage
+    public HashCode key;
+
+    @VariableSizeArray(lengthField = "get_path_length")
+    public PeerIdentity[] getPath;
+}

Added: gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java                   
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java   2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,52 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor get results going through peer, DHT service --> clients.
+ */
address@hidden(150)
+public class MonitorGetRespMessage implements GnunetMessage.Body {
+    /**
+     * Content type.
+     */
+    @UInt32
+    int type;
+
+    /**
+     * Length of the PUT path that follows (if tracked).
+     */
+    @UInt32
+    int put_path_length;
+
+    /**
+     * Length of the GET path that follows (if tracked).
+     */
+    @UInt32
+    int get_path_length;
+
+    /**
+     * When does the content expire?
+     */
+    @NestedMessage
+    public AbsoluteTimeMessage expiration;
+
+    /**
+     * The key of the corresponding GET request.
+     */
+    @NestedMessage
+    public HashCode key;
+
+    @VariableSizeArray(lengthField = "put_path_length")
+    public PeerIdentity[] putPath;
+
+    @VariableSizeArray(lengthField = "get_path_length")
+    public PeerIdentity[] getPath;
+
+    @ByteFill
+    public byte[] data;
+}

Added: gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java                       
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java       2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,62 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor put requests going through peer, DHT service --> clients.
+ */
address@hidden(151)
+public class MonitorPutMessage implements GnunetMessage.Body {
+    /**
+     * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+     */
+    @UInt32
+    public int options;
+
+    /**
+     * The type of data in the request.
+     */
+    @UInt32
+    public int type;
+
+    /**
+     * Hop count so far.
+     */
+    @UInt32
+    public int hop_count;
+
+    /**
+     * Replication level for this message
+     */
+    @UInt32
+    public int desired_replication_level;
+
+    /**
+     * Number of peers recorded in the outgoing path from source to the
+     * storage location of this message.
+     */
+    @UInt32
+    public int put_path_length;
+
+    /**
+     * How long should this data persist?
+     */
+    @NestedMessage
+    public AbsoluteTimeMessage expirationTime;
+
+    /**
+     * The key to store the value under.
+     */
+    @NestedMessage
+    public HashCode key;
+
+    @VariableSizeArray(lengthField = "put_path_length")
+    public PeerIdentity[] putPath;
+
+    @ByteFill
+    public byte[] data;
+}

Added: gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java                     
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java     2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,50 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt16;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+ * Message to request monitoring messages, clients --> DHT service.
+ */
address@hidden(153)
+public class MonitorStartMessage implements GnunetMessage.Body {
+    /**
+     * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+     */
+    @UInt32
+    public int type;
+
+    /**
+     * Flag whether to notify about GET messages.
+     */
+    @UInt16
+    public int get;
+
+    /**
+     * Flag whether to notify about GET_REPONSE messages.
+     */
+    @UInt16
+    public int getResp;
+
+    /**
+     * Flag whether to notify about PUT messages.
+     */
+    @UInt16
+    public int put;
+
+    /**
+     * Flag whether to use the provided key to filter messages.
+     */
+    @UInt16
+    public int filter_key;
+
+    /*
+    The key to filter messages by.
+     */
+    @NestedMessage
+    public HashCode key;
+}

Added: gnunet-java/src/org/gnunet/dht/RouteOption.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/RouteOption.java                             
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/RouteOption.java     2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -0,0 +1,35 @@
+package org.gnunet.dht;
+
+/**
+ * Options passed to the dht service for routing requests.
+ */
+enum RouteOption {
+    /**
+     * Default.  Do nothing special.
+     */
+    NONE(0),
+    /**
+     * Each peer along the way should look at 'enc' (otherwise
+     * only the k-peers closest to the key should look at it).
+     */
+    DEMULTIPLEX_EVERYWHERE(1),
+    /**
+     * We should keep track of the route that the message
+     * took in the P2P network.
+     */
+    RECORD_ROUTE(2),
+    /**
+     * This is a 'FIND-PEER' request, so approximate results are fine.
+     */
+    FIND_PEER(4),
+    /**
+     * Possible message option for query key randomization.
+     */
+    BART(8);
+
+    private int val;
+
+    RouteOption(int val) {
+        this.val = val;
+    }
+}

Added: gnunet-java/src/org/gnunet/statistics/Request.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Request.java                          
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/Request.java  2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -0,0 +1,29 @@
+package org.gnunet.statistics;
+
+import org.gnunet.util.AbsoluteTime;
+import org.gnunet.util.MessageTransmitter;
+
+public interface Request extends MessageTransmitter {
+    /**
+     * cancel action specific to a certain type of request
+     */
+    void cancel();
+
+    /**
+     * Deadline for transmitting the request.
+     *
+     */
+    AbsoluteTime getDeadline();
+
+    /**
+     *
+     * @return true if the request should be kept after the destroy request
+     */
+    boolean onDestroy();
+
+    /**
+     *
+     * @return true if the request should be kept after the reconnect
+     */
+    boolean onReconnect();
+}

Added: gnunet-java/src/org/gnunet/statistics/RequestMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/RequestMessage.java                   
        (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/RequestMessage.java   2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,13 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
address@hidden(169)
+public class RequestMessage implements GnunetMessage.Body {
+    @ZeroTerminatedString
+    public String subsystemName;
+    @ZeroTerminatedString
+    public String statisticsName;
+}

Added: gnunet-java/src/org/gnunet/statistics/RequestQueue.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/RequestQueue.java                     
        (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/RequestQueue.java     2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,129 @@
+package org.gnunet.statistics;
+
+import org.gnunet.util.*;
+
+import java.util.LinkedList;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 8:03 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class RequestQueue {
+
+    /**
+     * Requests to be transmitted to the service.
+     */
+    private final LinkedList<Request> requests = new LinkedList<Request>();
+    /**
+     * The designated receiver for all messages.
+     */
+    private MessageReceiver receiver;
+
+    /**
+     * The active transmit request handle, if any.
+     */
+    private Cancelable currentTransmit;
+
+    /**
+     * Current receive handler.
+     */
+    private Cancelable currentReceive;
+
+    private boolean destroyRequested = false;
+    private final Client client;
+
+    public RequestQueue(Client client, MessageReceiver receiver) {
+        this.client = client;
+        this.receiver = receiver;
+    }
+
+    /**
+     * Handle next request.
+     */
+    private void handleNextRequest() {
+        if (currentTransmit != null) {
+            return;
+        }
+
+        final Request request = requests.poll();
+        if (request == null) {
+            if (destroyRequested) {
+                client.disconnect();
+            }
+            return;
+        }
+
+        currentTransmit = 
client.notifyTransmitReady(request.getDeadline().getRemaining(), true, 0, new 
MessageTransmitter() {
+            @Override
+            public void transmit(Connection.MessageSink sink) {
+                currentTransmit = null;
+
+                request.transmit(sink);
+
+                handleReceive();
+                handleNextRequest();
+            }
+
+            @Override
+            public void handleError() {
+                request.handleError();
+            }
+        });
+    }
+
+    private void handleReceive() {
+        if (currentReceive != null || destroyRequested) {
+            return;
+        }
+        currentReceive = client.receive(RelativeTime.FOREVER, new 
MessageReceiver() {
+            @Override
+            public void process(GnunetMessage.Body msg) {
+                currentReceive = null;
+
+                receiver.process(msg);
+
+                handleNextRequest();
+                handleReceive();
+            }
+
+            @Override
+            public void handleError() {
+                receiver.handleError();
+            }
+        });
+    }
+
+    public Cancelable add(final Request request) {
+        requests.add(request);
+        handleNextRequest();
+
+        return new Cancelable() {
+            @Override
+            public void cancel() {
+                RequestQueue.this.requests.remove(request);
+                request.cancel();
+            }
+        };
+    }
+
+    public void destroy() {
+        destroyRequested = true;
+
+        final LinkedList<Request> remove = new LinkedList<Request>();
+
+        for (Request r : requests) {
+            boolean keep = r.onDestroy();
+            if (!keep) {
+                remove.add(r);
+            }
+        }
+        requests.removeAll(remove);
+
+        if (requests.isEmpty()) {
+            client.disconnect();
+        }
+    }
+}

Added: gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java               
                (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java       
2012-05-03 11:54:32 UTC (rev 21245)
@@ -0,0 +1,16 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:09 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(171)
+public class ResponseEndMessage implements GnunetMessage.Body {
+    // empty
+}

Added: gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java             
                (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java     
2012-05-03 11:54:32 UTC (rev 21245)
@@ -0,0 +1,31 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:09 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(170)
+public class ResponseValueMessage implements GnunetMessage.Body {
+    /**
+     * Unique numerical identifier for the value (will
+     * not change during the same client-session).  Highest
+     * bit will be set for persistent values.
+     */
+    @UInt32
+    public long uid;
+    @UInt64
+    public long value;
+    @ZeroTerminatedString
+    public String subsystemName;
+    @ZeroTerminatedString
+    public String statisticName;
+}

Added: gnunet-java/src/org/gnunet/statistics/SetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/SetMessage.java                       
        (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/SetMessage.java       2012-05-03 
11:54:32 UTC (rev 21245)
@@ -0,0 +1,26 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:10 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(168)
+public class SetMessage implements GnunetMessage.Body {
+    @UInt32
+    public int flags;
+    @UInt64
+    public long value;
+    @ZeroTerminatedString
+    public String subsystemName;
+    @ZeroTerminatedString
+    public String statisticName;
+}

Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-05-03 
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-05-03 
11:54:32 UTC (rev 21245)
@@ -6,152 +6,85 @@
 
 package org.gnunet.statistics;
 
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UInt64;
-import org.gnunet.construct.UnionCase;
-import org.gnunet.construct.ZeroTerminatedString;
 import org.gnunet.util.*;
 import org.gnunet.util.getopt.Option;
 import org.gnunet.util.getopt.OptionAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-
 /**
  * API for the gnunet statistics service.
  * <p/>
- * Write and read statistics values, represented as unsigned 64bit integer.
+ * Set, get and monitor statistics values, represented as unsigned 64bit 
integer.
+ * Note that address@hidden long}, java's largest primitive type, can only 
store signed 64bit integers.
+ * With absolute operation, its negative values are interpreted as large 
numbers by the statistics api.
  */
 public class Statistics {
     private static final Logger logger = LoggerFactory
             .getLogger(Statistics.class);
 
     /**
-     * Time after we give up setting values in statistics
+     * Time after we give up on setting values in statistics
      */
     private static final RelativeTime SET_TIMEOUT = 
RelativeTime.SECOND.multiply(5);
 
+    private final static int SETFLAG_ABSOLUTE = 0;
+    private final static int SETFLAG_RELATIVE = 1;
+    private final static int SETFLAG_PERSIST = 2;
 
-    private interface Request {
-        boolean dropOnShutdown();
-        AbsoluteTime getDeadline();
-    }
-
-    /**
-     * a generic implementation of a request queue
-     */
-    private class RequestQueue<R extends Request> {
-        public Cancelable addRequest(R request) {
-            return null;
-        }
-
-    }
-
-
     private Client client;
-    private LinkedList<StatisticsRequest> requests = new 
LinkedList<StatisticsRequest>();
-    private Cancelable currentTransmit;
-    private boolean disconnectRequested = false;
 
-    @UnionCase(169)
-    public static class RequestMessage implements GnunetMessage.Body {
-        @ZeroTerminatedString
-        public String subsystemName;
-        @ZeroTerminatedString
-        public String statisticsName;
-    }
+    private RequestQueue requestQueue;
 
-    @UnionCase(170)
-    public static class ResponseValueMessage implements GnunetMessage.Body {
-        @UInt32
-        public long uid;
-        @UInt64
-        public long value;
-        @ZeroTerminatedString
-        public String subsystemName;
-        @ZeroTerminatedString
-        public String statisticName;
-    }
+    private StatisticsReceiver currentGetReceiver;
+    private Continuation currentGetContinuation;
 
-    @UnionCase(171)
-    public static class ResponseEndMessage implements GnunetMessage.Body {
-        // empty
-    }
-
-    private final static int SETFLAG_PERSIST = 2;
-
-    @UnionCase(168)
-    public static class SetMessage implements GnunetMessage.Body {
-        @UInt32
-        public int flags;
-        @UInt64
-        public long value;
-        @ZeroTerminatedString
-        public String subsystemName;
-        @ZeroTerminatedString
-        public String statisticName;
-    }
-
-
-    private abstract class StatisticsRequest implements Cancelable, 
MessageTransmitter {
+    /**
+     * A request to the statistics service.
+     */
+    private abstract class StatisticsRequest implements Request {
         public String name;
         public String subsystem;
         public AbsoluteTime deadline;
     }
 
-
-    private class StatisticsGetRequest extends StatisticsRequest implements 
Cancelable {
+    private class StatisticsGetRequest extends StatisticsRequest {
         public StatisticsReceiver receiver;
         public Cancelable receiveHandle;
 
-        public class GetResponseHandler extends RunaboutMessageReceiver {
-            public void visit(ResponseValueMessage m) {
-                receiver.onReceive(m.subsystemName, m.statisticName, m.value);
-                client.receive(deadline.getRemaining(), this);
-            }
-
-            public void visit(ResponseEndMessage m) {
-                receiver.onDone();
-            }
-
-            @Override
-            public void handleError() {
-                logger.error("unable to read from statistics service");
-            }
-        }
-
         @Override
         public void cancel() {
-            requests.remove(this);
-            if (receiveHandle != null) {
-                receiveHandle.cancel();
-            }
+            currentGetReceiver = null;
         }
 
         @Override
         public void transmit(Connection.MessageSink sink) {
-            currentTransmit = null;
-            if (sink == null) {
-                logger.error("unable to connect to statistics service");
-                return;
-            }
             RequestMessage rm = new RequestMessage();
             rm.statisticsName = name;
             rm.subsystemName = subsystem;
 
             sink.send(rm);
-
-            receiveHandle = client.receive(deadline.getRemaining(), new 
GetResponseHandler());
-
-            handleNextRequest();
         }
 
         @Override
         public void handleError() {
             throw new RuntimeException("unexpected transmit error");
         }
+
+        @Override
+        public AbsoluteTime getDeadline() {
+            return deadline;
+        }
+
+        @Override
+        public boolean onDestroy() {
+            return false;
+        }
+
+        @Override
+        public boolean onReconnect() {
+            return true;
+        }
     }
 
     private class StatisticsPutRequest extends StatisticsRequest implements 
Cancelable {
@@ -160,96 +93,105 @@
 
         @Override
         public void cancel() {
-            requests.remove(this);
+            // do nothing
         }
 
         @Override
         public void transmit(Connection.MessageSink sink) {
-            currentTransmit = null;
-
             SetMessage sm = new SetMessage();
             sm.statisticName = name;
             sm.subsystemName = subsystem;
             sm.value = value;
             sm.flags = flags;
             sink.send(sm);
-
-            handleNextRequest();
         }
 
         @Override
         public void handleError() {
             throw new RuntimeException("unexpected transmit error");
         }
+
+        @Override
+        public AbsoluteTime getDeadline() {
+            return deadline;
+        }
+
+        @Override
+        public boolean onDestroy() {
+            return true;
+        }
+
+        @Override
+        public boolean onReconnect() {
+            return true;
+        }
     }
 
-    public Statistics(Configuration cfg) {
-        client = new Client("statistics", cfg);
+
+    public class StatisticsMessageReceiver extends RunaboutMessageReceiver {
+        public void visit(ResponseValueMessage m) {
+            currentGetReceiver.onReceive(m.subsystemName, m.statisticName, 
m.value);
+        }
+
+        public void visit(ResponseEndMessage m) {
+            if (currentGetContinuation != null) {
+                currentGetContinuation.onDone();
+            }
+        }
+
+        @Override
+        public void handleError() {
+        }
     }
 
     public interface StatisticsReceiver {
         public void onReceive(String subsystem, String name, long value);
+    }
 
-        public void onTimeout();
-
+    public interface Continuation {
         public void onDone();
     }
 
+    public Statistics(Configuration cfg) {
+        client = new Client("statistics", cfg);
+        requestQueue = new RequestQueue(this.client, new 
StatisticsMessageReceiver());
+    }
+
     /**
      * Retrieve values from statistics.
      *
+     *
      * @param timeout   time after we give up and call receiver.onTimeout
-     * @param subsystem name of the subsystem the statistics value belongs to
+     * @param subsystem the subsystem of interest
      * @param name      name of the statistics value belongs to
      * @param receiver  callback
+     * @param continuation
      * @return handle to cancel the request
      */
     public Cancelable get(RelativeTime timeout, final String subsystem, final 
String name,
-                          final StatisticsReceiver receiver) {
+                          final StatisticsReceiver receiver, Continuation 
continuation) {
 
+        if (currentGetReceiver != null) {
+            throw new AssertionError("only one Statistics get request can be 
active at a time");
+        }
+        currentGetReceiver = receiver;
+        currentGetContinuation = continuation;
+
         final StatisticsGetRequest getRequest = new StatisticsGetRequest();
         getRequest.deadline = timeout.toAbsolute();
-        getRequest.name = name;
-        getRequest.subsystem = subsystem;
+        getRequest.name = name == null ? "" : name;
+        getRequest.subsystem = subsystem == null ? "" : name;
         getRequest.receiver = receiver;
 
-        requests.add(getRequest);
-
-        handleNextRequest();
-
-        return getRequest;
+        return requestQueue.add(getRequest);
     }
 
-    /**
-     * todo: should this be abstracted into a common request queue?
-     */
-    private void handleNextRequest() {
-        if (currentTransmit != null) {
-            return;
-        }
-        StatisticsRequest request = requests.poll();
-        if (request == null) {
-            if (disconnectRequested) {
-                client.disconnect();
-            }
-            return;
-        }
 
-        logger.debug("handling request");
-        currentTransmit = 
client.notifyTransmitReady(request.deadline.getRemaining(), true, 0, request);
-    }
-
-    public Cancelable get(RelativeTime timeout, StatisticsReceiver srh) {
-        return get(timeout, "", "", srh);
-    }
-
-
     /**
      * Sets a statistics value asynchronously.
      *
-     * @param subsystem subsystem of the entry
-     * @param name name of the entry
-     * @param value desired value
+     * @param name    name of the entry
+     * @param value   desired value
      * @param persist keep value even if the statistics service restarts
      * @return a handle to cancel the request
      */
@@ -261,34 +203,26 @@
         putRequest.value = value;
         putRequest.flags = persist ? SETFLAG_PERSIST : 0;
 
-        requests.add(putRequest);
+        return requestQueue.add(putRequest);
+    }
 
-        handleNextRequest();
-
-        return putRequest;
+    public Cancelable watch(final String subsystem, final String name, 
StatisticsReceiver receiver) {
+        return null;
     }
 
     /**
-     * Destroy handle to the statistics service.
-     *
-     * @param putPending wait until all pending put requests have been 
submitted or timed out
+     * Destroy handle to the statistics service. Always finishes writing 
pending values.
      */
-    public void destroy(boolean putPending) {
-        if (putPending) {
-            // throw away all get requests!
-            Iterator<StatisticsRequest> it = requests.listIterator();
-            while (it.hasNext()) {
-                StatisticsRequest r = it.next();
-                if (r instanceof StatisticsGetRequest) {
-                    it.remove();
-                }
-            }
-            disconnectRequested = true;
-        } else {
-            client.disconnect();
-        }
+    public void destroy() {
+        requestQueue.destroy();
     }
 
+
+    /**
+     * Statistics utility entry point
+     *
+     * @param args command line arguments
+     */
     public static void main(String[] args) {
         new Program(args) {
             @Option(
@@ -329,26 +263,22 @@
                         return;
                     }
                     statistics.set(subsystemName, statisticsName, value, 
false);
-                    statistics.destroy(true);
+                    statistics.destroy();
                 } else {
                     if (unprocessedArgs.length == 0) {
-                        statistics.get(RelativeTime.SECOND, subsystemName, 
statisticsName, new StatisticsReceiver() {
-                            @Override
-                            public void onReceive(String subsystem, String 
name, long value) {
-                                System.out.println(subsystem + "(" + name + ") 
= " + value);
-                            }
-
-                            @Override
-                            public void onTimeout() {
-                                logger.error("timeout while getting 
statistics");
-                                statistics.destroy(false);
-                            }
-
-                            @Override
-                            public void onDone() {
-                                statistics.destroy(false);
-                            }
-                        });
+                        statistics.get(RelativeTime.SECOND, subsystemName, 
statisticsName,
+                                new StatisticsReceiver() {
+                                    @Override
+                                    public void onReceive(String subsystem, 
String name, long value) {
+                                        System.out.println(subsystem + "(" + 
name + ") = " + value);
+                                    }
+                                },
+                                new Continuation() {
+                                    @Override
+                                    public void onDone() {
+                                        statistics.destroy();
+                                    }
+                                });
                     } else {
                         System.err.println("dumping statistics does not take 
any positional parameters");
                     }

Modified: gnunet-java/src/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Client.java 2012-05-03 11:36:52 UTC (rev 
21244)
+++ gnunet-java/src/org/gnunet/util/Client.java 2012-05-03 11:54:32 UTC (rev 
21245)
@@ -46,7 +46,7 @@
      * Initial value for connectBackoff.
      *
      */
-    private final RelativeTime INITAL_BACKOFF = RelativeTime.MILLISECOND;
+    private final RelativeTime INITAL_BACKOFF = 
RelativeTime.MILLISECOND.multiply(20);
 
     /**
      * Maximum value for connectBackoff.
@@ -54,7 +54,6 @@
      */
     private final RelativeTime MAX_BACKOFF = RelativeTime.SECOND.multiply(5);
 
-
     /**
      * The time to wait after an error occured while connecting.
      * Every time an error occurs while connecting, this value is doubled 
until its maximum

Modified: gnunet-java/src/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Connection.java     2012-05-03 11:36:52 UTC 
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Connection.java     2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -76,7 +76,7 @@
 
 
     private class AddressProbe {
-        Scheduler.TaskIdentifier connectTask;
+        Cancelable connectTask;
         SocketChannel channel;
     }
 
@@ -108,7 +108,7 @@
         private MessageReceiver receiver;
         private RelativeTime timeout;
         private GnunetMessage.Header msgh = null;
-        private Scheduler.TaskIdentifier recvTask = null;
+        private Scheduler.TaskConfiguration recvTask = null;
         private boolean finished = false;
         // is this receiver actively working? if not, the connection process 
has to kick off the receiver
         // (or select behaves badly)
@@ -208,22 +208,22 @@
     private class TransmitHelper implements Scheduler.Task, MessageSink {
         private final MessageTransmitter transmitter;
 
-        private Scheduler.TaskIdentifier notifyTimeoutTask;
+        private Cancelable notifyTimeoutTask;
 
-        private Scheduler.TaskIdentifier transmitTask = null;
+        private Cancelable transmitTask = null;
 
         public TransmitHelper(final MessageTransmitter transmitter, 
RelativeTime notifyTimeout) {
             this.transmitter = transmitter;
 
-            Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
-            b.withTask(new Scheduler.Task() {
-                @Override
-                public void run(Scheduler.RunContext ctx) {
-                    transmitter.handleError();
-                }
-            });
-            b.withTimeout(notifyTimeout);
-            notifyTimeoutTask = Scheduler.add(b);
+            Scheduler.TaskConfiguration tc = new 
Scheduler.TaskConfiguration(notifyTimeout,
+                    new Scheduler.Task() {
+                        @Override
+                        public void run(Scheduler.RunContext ctx) {
+                            transmitter.handleError();
+                        }
+                    });
+
+            notifyTimeoutTask = tc.schedule();
         }
 
         public boolean notifyDone() {
@@ -280,9 +280,9 @@
             // timeout is forever, because there is no way to directly limit 
the transmission time
             // of a message, only the max. wait time before transmission.
             // cancel must be called on the transmitTask if we disconnect
-            Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
-            
b.withTimeout(RelativeTime.FOREVER).withSelectWrite(connectionChannel).withTask(this);
-            this.transmitTask = Scheduler.add(b);
+            Scheduler.TaskConfiguration tc = new 
Scheduler.TaskConfiguration(RelativeTime.FOREVER, this);
+            tc.selectWrite(connectionChannel);
+            this.transmitTask = tc.schedule();
         }
 
         @Override
@@ -345,19 +345,20 @@
 
             final AddressProbe addressProbe = new AddressProbe();
             addressProbe.channel = channel;
-            Scheduler.TaskBuilder builder = new Scheduler.TaskBuilder();
-            builder.withSelectConnect(channel);
-            builder.withTask(new Scheduler.Task() {
-                @Override
-                public void run(Scheduler.RunContext ctx) {
-                    addressProbe.connectTask = null;
-                    if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
-                        return;
-                    }
-                    Connection.this.finishConnect(channel);
-                }
-            });
-            addressProbe.connectTask = Scheduler.add(builder);
+            Scheduler.TaskConfiguration tc = new 
Scheduler.TaskConfiguration(RelativeTime.FOREVER,
+                    new Scheduler.Task() {
+                        @Override
+                        public void run(Scheduler.RunContext ctx) {
+                            addressProbe.connectTask = null;
+                            if 
(ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
+                                return;
+                            }
+                            Connection.this.finishConnect(channel);
+                        }
+                    });
+            tc.selectConnect(channel);
+
+            addressProbe.connectTask = tc.schedule();
         }
 
         @Override
@@ -381,12 +382,12 @@
                     addressProbe.connectTask.cancel();
                 }
             }
-            logger.debug("client successfully connected");
         } catch (IOException e) {
             logger.debug("finishConnect() was not successful: {}", (Object) e);
             return;
         }
         if (connected) {
+            logger.debug("client successfully connected to " + 
channel.toString());
             if (currentTransmitHelper != null) {
                 currentTransmitHelper.start();
             }
@@ -501,6 +502,7 @@
      * Disconnect. Cancel pending receive/transmit requests.
      */
     public void disconnect() {
+        logger.debug(""+this+".disconnect()");
         if (nextTransmitHelper != null) {
             nextTransmitHelper.cancel();
             nextTransmitHelper = null;
@@ -514,11 +516,19 @@
             resolveHandle.cancel();
             resolveHandle = null;
         }
-
         if (connectHandle != null) {
             connectHandle.cancel();
             connectHandle = null;
         }
+        if (connectionChannel != null) {
+            try {
+                connectionChannel.socket().close();
+            } catch (IOException e) {
+                throw new IOError(e);
+            }
+            connectionChannel = null;
+        }
+
     }
 
     @Override

Modified: gnunet-java/src/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Resolver.java       2012-05-03 11:36:52 UTC 
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Resolver.java       2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -1,6 +1,5 @@
 package org.gnunet.util;
 
-
 import org.gnunet.construct.*;
 import org.gnunet.construct.ProtocolViolation;
 import org.gnunet.util.getopt.Option;
@@ -14,7 +13,6 @@
 import java.net.UnknownHostException;
 import java.util.LinkedList;
 
-
 /**
  * Resolve hostnames asynchronously, using the gnunet resolver service if 
necessary.
  * <p/>
@@ -261,6 +259,7 @@
         rh.cb = cb;
         // try if hostname is numeric IP or loopback
         if (hostname.equalsIgnoreCase("localhost")) {
+            logger.debug("resolving address locally");
             Scheduler.add(new Scheduler.Task() {
                 @Override
                 public void run(Scheduler.RunContext ctx) {

Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java      2012-05-03 11:36:52 UTC 
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java      2012-05-03 11:54:32 UTC 
(rev 21245)
@@ -38,6 +38,42 @@
     private static final Logger logger = LoggerFactory
             .getLogger(Scheduler.class);
 
+    // only valid while a task is executing
+    private static TaskConfiguration activeTask = null;
+
+    // number of tasks in the ready queue
+    private static int readyCount = 0;
+
+    // for every priority, there is a list of tasks that is definitely ready 
to run
+    final private static ArrayList<LinkedList<TaskConfiguration>> ready = new 
ArrayList<LinkedList<TaskConfiguration>>
+            (Priority.size);
+
+    static {
+        for (int i = 0; i < Priority.size; ++i) {
+            ready.add(new LinkedList<TaskConfiguration>());
+        }
+    }
+
+    public static final int EVENT_READ = 0, EVENT_WRITE = 1, EVENT_ACCEPT = 2, 
EVENT_CONNECT = 3;
+    public static final int[] eventToInterestOp = new 
int[]{SelectionKey.OP_READ, SelectionKey.OP_WRITE,
+            SelectionKey.OP_ACCEPT, SelectionKey.OP_CONNECT};
+    public static final Reason[] eventToReason = new 
Reason[]{Reason.READ_READY, Reason.WRITE_READY,
+            Reason.ACCEPT_READY, Reason.CONNECT_READY};
+
+
+    private static Selector selector = null;
+
+    static {
+        try {
+            selector = SelectorProvider.provider().openSelector();
+        } catch (final IOException e) {
+            // what to do here?
+            logger.error("fatal: cannot create selector");
+            System.exit(-1);
+        }
+    }
+
+
     public enum Priority {
         IDLE, BACKGROUND, DEFAULT, HIGH, UI, URGENT, SHUTDOWN;
         private static final int size = Priority.values().length;
@@ -72,85 +108,39 @@
         public void run(RunContext ctx);
     }
 
-    private static class Subscribers {
-        TaskIdentifier readSubscriber;
-        TaskIdentifier writeSubscriber;
-        TaskIdentifier connectSubscriber;
-        TaskIdentifier acceptSubscriber;
-    }
-
     /**
      * A TaskIdentifier represents a Task that will execute or has already 
been executed.
      */
-    public static class TaskIdentifier implements Comparable<TaskIdentifier>, 
Cancelable {
+    public static class TaskConfiguration implements Cancelable {
         private final Task task;
         private RunContext ctx = new RunContext();
-        private boolean lifeness;
-        private final Priority priority;
+        private boolean lifeness = true;
+        private Priority priority;
         private final AbsoluteTime deadline;
-        private ArrayList<SelectableChannel> rs = null;
-        private ArrayList<SelectableChannel> ws = null;
-        private ArrayList<SelectableChannel> cs = null;
-        private ArrayList<SelectableChannel> as = null;
 
+        private ArrayList<SelectableChannel> eventChannels = null;
+        private ArrayList<Integer> eventTypes = null;
 
-        TaskIdentifier(Task t, Priority priority,
-                       boolean liveness, RelativeTime timeout,
-                       Set<SelectableChannel> rs, Set<SelectableChannel> ws, 
Set<SelectableChannel> cs) {
-            this.task = t;
+        private boolean finished = false;
 
-            if (priority == null) {
-                this.priority = (activeTask == null) ? Priority.DEFAULT : 
activeTask.priority;
-            } else {
-                this.priority = priority;
-            }
-
-            this.lifeness = liveness;
-
-            if (timeout.getMilliseconds() < 0) {
-                throw new AssertionError("timeout must be (>=0)");
-            }
-
-            this.deadline = timeout.toAbsolute();
-
-            if (rs != null) {
-                for (SelectableChannel sc : rs) {
-                    registerSelect(sc, SelectionKey.OP_READ);
-                }
-            }
-            if (ws != null) {
-                for (SelectableChannel sc : ws) {
-                    registerSelect(sc, SelectionKey.OP_WRITE);
-                }
-            }
-            if (cs != null) {
-                for (SelectableChannel sc : cs) {
-                    registerSelect(sc, SelectionKey.OP_CONNECT);
-                }
-            }
-            if (as != null) {
-                for (SelectableChannel sc : as) {
-                    registerSelect(sc, SelectionKey.OP_ACCEPT);
-                }
-            }
-            this.ws = new ArrayList<SelectableChannel>(ws == null ? 
Collections.EMPTY_SET : ws);
-            this.rs = new ArrayList<SelectableChannel>(rs == null ? 
Collections.EMPTY_SET : rs);
-            this.cs = new ArrayList<SelectableChannel>(cs == null ? 
Collections.EMPTY_SET : cs);
-            this.as = new ArrayList<SelectableChannel>(as == null ? 
Collections.EMPTY_SET : as);
-
-            // todo: assertions for validity of this task
-
-            pending.add(this);
+        /**
+         * Create a TaskIdentifier.
+         *
+         * @param task
+         */
+        TaskConfiguration(RelativeTime delay, Task task) {
+            this.task = task;
+            this.deadline = delay.toAbsolute();
         }
 
         /**
-         * Create a light-weight task identifier that is not registerd as 
pending in the Scheduler,
+         * Create a light-weight task identifier that is not registered as 
pending in the Scheduler,
          * used for continuations.
          *
-         * @param t task
+         * @param t   task
          * @param ctx the RunContext
          */
-        TaskIdentifier(Task t, RunContext ctx) {
+        TaskConfiguration(Task t, RunContext ctx) {
             this.task = t;
             this.ctx = ctx;
             this.deadline = AbsoluteTime.ZERO;
@@ -158,240 +148,112 @@
             this.lifeness = true;
         }
 
-        private void run() {
-            TaskIdentifier old = activeTask;
-            activeTask = this;
-            task.run(ctx);
-            activeTask = old;
-        }
+        private void addChannelEvent(SelectableChannel channel, int eventType) 
{
+            if (eventChannels == null) {
+                eventChannels = new ArrayList<SelectableChannel>();
+                eventTypes = new ArrayList<Integer>();
+            }
+            eventChannels.add(channel);
+            eventTypes.add(eventType);
 
-        public Task getTask() {
-            return task;
-        }
+            int interestOp = eventToInterestOp[eventType];
 
-        public void cancel() {
-            pending.remove(this);
-        }
-
-        public boolean getLifeness() {
-            return lifeness;
-        }
-
-        @Override
-        public int compareTo(TaskIdentifier other) {
-            return this.deadline.compareTo(other.deadline);
-        }
-
-        private void registerSelect(SelectableChannel sc, int op) {
-            SelectionKey key = sc.keyFor(selector);
-            Subscribers subscribers;
-
+            SelectionKey key = channel.keyFor(selector);
             if (key == null || !key.isValid()) {
-                subscribers = new Subscribers();
                 try {
-                    sc.register(selector, op, subscribers);
+                    key = channel.register(selector, interestOp, new 
TaskConfiguration[4]);
                 } catch (ClosedChannelException e) {
                     throw new IOError(e);
                 }
             } else {
-                subscribers = (Subscribers) key.attachment();
-                key.interestOps(key.interestOps() | op);
-            }
-            if ((op & SelectionKey.OP_READ) != 0) {
-                if (subscribers.readSubscriber != null) {
-                    throw new AssertionError("only one task can wait for a 
specific event at the same time");
+                if ((key.interestOps() & interestOp) != 0) {
+                    throw new AssertionError("interest op registered twice");
                 }
-                subscribers.readSubscriber = this;
+                key.interestOps(key.interestOps() | interestOp);
             }
-            if ((op & SelectionKey.OP_WRITE) != 0) {
-                if (subscribers.writeSubscriber != null) {
-                    throw new AssertionError("only one task can wait for a 
specific event at the same time");
-                }
-                subscribers.writeSubscriber = this;
+
+            TaskConfiguration[] subscribers = (TaskConfiguration[]) 
key.attachment();
+            if (subscribers[eventType] != null) {
+                throw new AssertionError("subscriber registered twice");
             }
-            if ((op & SelectionKey.OP_CONNECT) != 0) {
-                if (subscribers.connectSubscriber != null) {
-                    throw new AssertionError("only one task can wait for a 
specific event at the same time");
-                }
-                subscribers.connectSubscriber = this;
-            }
-            if ((op & SelectionKey.OP_ACCEPT) != 0) {
-                if (subscribers.acceptSubscriber != null) {
-                    throw new AssertionError("only one task can wait for a 
specific event at the same time");
-                }
-                subscribers.acceptSubscriber = this;
-            }
+            subscribers[eventType] = this;
 
-            if (subscribers.connectSubscriber != null && 
subscribers.readSubscriber != null) {
+            if (subscribers[EVENT_CONNECT] != null && subscribers[EVENT_READ] 
!= null) {
                 throw new AssertionError("OP_CONNECT and OP_READ are 
incompatible in java");
             }
         }
 
-        private void deregisterOne(SelectableChannel sc, int op) {
-            SelectionKey key = sc.keyFor(selector);
-            if (key == null) {
-                throw new AssertionError();
+        private void run() {
+            if (finished) {
+                throw new AssertionError("same task ran twice");
             }
-            Subscribers subscribers = (Subscribers) key.attachment();
-            if (subscribers == null) {
-                throw new AssertionError();
+            TaskConfiguration old = activeTask;
+            activeTask = this;
+            task.run(ctx);
+            finished = true;
+            activeTask = old;
+        }
+
+        public void cancel() {
+            if (!pending.contains(this)) {
+                throw new AssertionError("canceling task that is not 
scheduled");
             }
-            if ((op & SelectionKey.OP_READ) != 0) {
-                if (subscribers.readSubscriber == null) {
-                    throw new AssertionError();
+            pending.remove(this);
+            logger.debug(""+ pending.size() + "tasks remaining");
+        }
+
+        public Cancelable schedule() {
+            if (priority == null) {
+                if (activeTask != null) {
+                    priority = activeTask.priority;
+                } else {
+                    priority = Priority.DEFAULT;
                 }
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-                subscribers.readSubscriber = null;
             }
-            if ((op & SelectionKey.OP_WRITE) != 0) {
-                if (subscribers.writeSubscriber == null) {
-                    throw new AssertionError();
-                }
-                subscribers.writeSubscriber = null;
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-            }
-            if ((op & SelectionKey.OP_CONNECT) != 0) {
-                if (subscribers.connectSubscriber == null) {
-                    throw new AssertionError();
-                }
-                subscribers.connectSubscriber = null;
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-            }
-            if ((op & SelectionKey.OP_ACCEPT) != 0) {
-                if (subscribers.acceptSubscriber == null) {
-                    throw new AssertionError();
-                }
-                subscribers.acceptSubscriber = null;
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
-            }
+            pending.add(this);
+            return this;
         }
 
         private void deregister() {
-            if (this.rs != null) {
-                for (SelectableChannel sc : this.rs) {
-                    deregisterOne(sc, SelectionKey.OP_READ);
-                }
+            if (eventChannels == null) {
+                return;
             }
-            if (this.ws != null) {
-                for (SelectableChannel sc : this.ws) {
-                    deregisterOne(sc, SelectionKey.OP_WRITE);
+            for (int i = 0; i < eventChannels.size(); ++i) {
+                SelectionKey key = eventChannels.get(i).keyFor(selector);
+                TaskConfiguration[] subscribers = (TaskConfiguration[]) 
key.attachment();
+                int interestOp = eventToInterestOp[eventTypes.get(i)];
+                if (subscribers[eventTypes.get(i)] == null || 
(key.interestOps() | interestOp) == 0) {
+                    throw new AssertionError("deregistering event that has not 
been registered");
                 }
+                subscribers[eventTypes.get(i)] = null;
+                key.interestOps(key.interestOps() & (~interestOp));
             }
-            if (this.cs != null) {
-                for (SelectableChannel sc : this.cs) {
-                    deregisterOne(sc, SelectionKey.OP_CONNECT);
-                }
-            }
-            if (this.as != null) {
-                for (SelectableChannel sc : this.as) {
-                    deregisterOne(sc, SelectionKey.OP_ACCEPT);
-                }
-            }
         }
-    }
 
-
-    public static class TaskBuilder {
-        private Task task = null;
-        private boolean lifeness = true;
-        private Priority prio = null;
-        private RelativeTime timeout = RelativeTime.ZERO;
-        private Set<SelectableChannel> rs = null, ws = null, cs = null, as = 
null;
-
-        public TaskBuilder withLifeness(boolean lifeness) {
-            this.lifeness = lifeness;
-            return this;
+        public void selectRead(SocketChannel channel) {
+            addChannelEvent(channel, EVENT_READ);
         }
-
-        public TaskBuilder withPriority(Priority prio) {
-            this.prio = prio;
-            return this;
+        public void selectWrite(SocketChannel channel) {
+            addChannelEvent(channel, EVENT_WRITE);
         }
-
-        public TaskBuilder withTask(Task task) {
-            this.task = task;
-            return this;
+        public void selectConnect(SocketChannel channel) {
+            addChannelEvent(channel, EVENT_CONNECT);
         }
-
-        public TaskBuilder withTimeout(RelativeTime timeout) {
-            this.timeout = timeout;
-            return this;
+        public void selectAccept(SocketChannel channel) {
+            addChannelEvent(channel, EVENT_ACCEPT);
         }
-
-        public TaskBuilder withSelectRead(SelectableChannel c) {
-            this.rs = Collections.singleton(c);
-            return this;
-        }
-
-
-        public TaskBuilder withSelectReadSet(Set<SelectableChannel> cs) {
-            this.rs = cs;
-            return this;
-        }
-
-        public TaskBuilder withSelectWrite(SelectableChannel c) {
-            this.ws = Collections.singleton(c);
-            return this;
-        }
-
-        public TaskBuilder withSelectWriteSet(Set<SelectableChannel> cs) {
-            this.ws = cs;
-            return this;
-        }
-
-        public TaskBuilder withSelectConnect(SelectableChannel c) {
-            this.cs = Collections.singleton(c);
-            return this;
-        }
-
-        public TaskBuilder withSelectAccept(SelectableChannel c) {
-            this.as = Collections.singleton(c);
-            return this;
-        }
-
-        public TaskBuilder withSelectConnectSet(Set<SelectableChannel> cs) {
-            this.cs = cs;
-            return this;
-        }
-
-        private TaskIdentifier build() {
-            return new TaskIdentifier(task, prio, lifeness, timeout, rs, ws, 
cs);
-        }
     }
 
-
-    // tasks that are waiting for an event, which are executed anyway after 
the deadline has occured
-    final private static Queue<TaskIdentifier> pending = new 
PriorityQueue<TaskIdentifier>();
-
-    // only valid while a task is executing
-    private static TaskIdentifier activeTask = null;
-
-    // number of tasks in the ready queue
-    private static int readyCount = 0;
-
-    // for every priority, there is a list of tasks that is definitely ready 
to run
-    final private static ArrayList<LinkedList<TaskIdentifier>> ready = new 
ArrayList<LinkedList<TaskIdentifier>>
-            (Priority.size);
-
-    static {
-        for (int i = 0; i < Priority.size; ++i) {
-            ready.add(new LinkedList<TaskIdentifier>());
+    // tasks that are waiting for an event, which are executed anyway after 
the deadline has occurred
+    final private static Queue<TaskConfiguration> pending = new 
PriorityQueue<TaskConfiguration>(5, new Comparator
+            <TaskConfiguration>() {
+        @Override
+        public int compare(TaskConfiguration a, TaskConfiguration b) {
+            return a.deadline.compareTo(b.deadline);
         }
-    }
+    });
 
-    private static Selector selector = null;
 
-    static {
-        try {
-            selector = SelectorProvider.provider().openSelector();
-        } catch (final IOException e) {
-            // what to do here?
-            logger.error("fatal: cannot create selector");
-            System.exit(-1);
-        }
-    }
-
-
     public static boolean getCurrentLifeness() {
         return (activeTask == null) || activeTask.lifeness;
     }
@@ -404,15 +266,9 @@
                                        EnumSet<Reason> reasons) {
         RunContext ctx = new RunContext();
         ctx.reasons = reasons;
-        queueReady(new TaskIdentifier(task, ctx));
+        queueReady(new TaskConfiguration(task, ctx));
     }
 
-
-    public static TaskIdentifier add(TaskBuilder builder) {
-        return builder.build();
-    }
-
-
     /**
      * Schedule a new task to be run as soon as possible. The task will be run
      * with the priority of the calling task.
@@ -421,9 +277,8 @@
      * @return unique task identifier for the job only valid until "task" is
      *         started!
      */
-    public static TaskIdentifier add(Task task) {
-        return addSelect(Priority.KEEP, RelativeTime.ZERO, null, null,
-                task);
+    public static Cancelable add(Task task) {
+        return addDelayed(RelativeTime.ZERO, task);
     }
 
 
@@ -431,58 +286,31 @@
      * Add a task to run after the specified delay.
      *
      * @param delay time to wait until running the task
-     * @param task the task to run after delay
+     * @param task  the task to run after delay
      * @return the TaskIdentifier, can be used to cancel the task until it has 
been executed.
      */
-    public static TaskIdentifier addDelayed(RelativeTime delay, Task task) {
-        return addSelect(Priority.KEEP, delay, null, null, task);
+    public static TaskConfiguration addDelayed(RelativeTime delay, Task task) {
+        TaskConfiguration tid = new TaskConfiguration(delay, task);
+        tid.schedule();
+        return tid;
     }
 
-
-    /**
-     * Schedule a new task to be run with a specified delay or when any of
-     * the specified file descriptor sets is ready.  The delay can be used
-     * as a deadline on the socket(s) being ready.  The task will be
-     * scheduled for execution once either the delay has expired or any of
-     * the socket operations is ready.  This is the most general
-     * function of the "add" family.  Note that the "prerequisite_task"
-     * must be satisfied in addition to any of the other conditions.
-     *
-     * @param p                 how important is this task?
-     * @param delay             how long should we wait? Use 
GNUNET_TIME_UNIT_FOREVER_REL for "forever",
-     *                          which means that the task will only be run 
after we receive SIGTERM
-     * @param rs                set of file descriptors we want to read (can 
be NULL)
-     * @param ws                set of file descriptors we want to write (can 
be NULL)
-     * @param t                 The Task to run
-     * @return unique task identifier for the job
-     *         only valid until "task" is started!
-     */
-    public static TaskIdentifier addSelect(Priority p, RelativeTime delay,
-                                           Set<SelectableChannel> rs, 
Set<SelectableChannel> ws,
-                                           Task t) {
-
-        return new TaskIdentifier(t, p, getCurrentLifeness(), delay, rs, ws, 
null);
+    public static TaskConfiguration addRead(RelativeTime timeout,
+                                         SelectableChannel chan, Task task) {
+        TaskConfiguration tid = new TaskConfiguration(timeout, task);
+        tid.addChannelEvent(chan, EVENT_READ);
+        tid.schedule();
+        return tid;
     }
 
-    public static TaskIdentifier addRead(RelativeTime timeout,
-                                         SelectableChannel chan, Task t) {
-        return Scheduler.addSelect(Priority.KEEP, timeout,
-                Collections.singleton(chan), null, t);
+    public static TaskConfiguration addWrite(RelativeTime timeout,
+                                          SelectableChannel chan, Task task) {
+        TaskConfiguration tid = new TaskConfiguration(timeout, task);
+        tid.addChannelEvent(chan, EVENT_WRITE);
+        tid.schedule();
+        return tid;
     }
 
-    public static TaskIdentifier addWrite(RelativeTime timeout,
-                                          SelectableChannel chan, Task t) {
-        logger.debug("scheduling write");
-        return Scheduler.addSelect(Priority.KEEP, timeout, null,
-                Collections.singleton(chan), t);
-    }
-
-    public static TaskIdentifier addWithPriority(Priority prio,
-                                                 Task t) {
-        return addSelect(prio, RelativeTime.ZERO, null, null, t);
-    }
-
-
     /**
      * Check if the system is still life. Trigger disconnect if we have tasks, 
but
      * none of them give us lifeness.
@@ -493,7 +321,7 @@
         if (readyCount > 0) {
             return true;
         }
-        for (TaskIdentifier t : pending) {
+        for (TaskConfiguration t : pending) {
             if (t.lifeness) {
                 return true;
             }
@@ -514,7 +342,7 @@
      *
      * @param tid TaskIdentifier of the ready task
      */
-    private static void queueReady(TaskIdentifier tid) {
+    private static void queueReady(TaskConfiguration tid) {
         int idx = tid.priority.ordinal();
         ready.get(idx).add(tid);
         readyCount++;
@@ -533,7 +361,7 @@
 
         // check if any timeouts occured
         while (true) {
-            TaskIdentifier t = pending.peek();
+            TaskConfiguration t = pending.peek();
             if (t == null) {
                 break;
             }
@@ -551,60 +379,40 @@
         return timeout;
     }
 
+    private static void addSubscriberTask(Collection<TaskConfiguration> 
executableTasks,
+                                          TaskConfiguration[] subscribers, int 
eventType) {
+        if (subscribers[eventType] == null)
+            return;
+        executableTasks.add(subscribers[eventType]);
+        subscribers[eventType].ctx.reasons.add(eventToReason[eventType]);
+    }
+
     private static void handleSelect(RelativeTime timeout) {
         try {
-            // selector.select(0) would block indefinitely (conterintuitive, 
java's fault)
+            // selector.select(0) would block indefinitely (counter-intuitive, 
java's fault)
             if (timeout.getMilliseconds() == 0) {
                 selector.selectNow();
             } else if (timeout.isForever()) {
                 selector.select(0);
             } else {
-                //logger.debug("starting to select with deadline");
                 selector.select(timeout.getMilliseconds());
-                //logger.debug("select with deadline ended");
             }
         } catch (IOException e) {
             throw new IOError(e);
         }
 
         // we have to do this so we don't execute any task twice
-        // todo: alternative: mark TaskIdentifier as executed
-        Collection<TaskIdentifier> executableTasks = new 
TreeSet<TaskIdentifier>();
+        Collection<TaskConfiguration> executableTasks = new 
HashSet<TaskConfiguration>();
         for (SelectionKey sk : selector.selectedKeys()) {
-            Subscribers ss = (Subscribers) sk.attachment();
+            TaskConfiguration[] subscribers = (TaskConfiguration[]) 
sk.attachment();
 
-            Channel c = sk.channel();
+            if (sk.isReadable()) addSubscriberTask(executableTasks, 
subscribers, EVENT_READ);
+            if (sk.isWritable()) addSubscriberTask(executableTasks, 
subscribers, EVENT_WRITE);
+            if (sk.isAcceptable()) addSubscriberTask(executableTasks, 
subscribers, EVENT_ACCEPT);
+            if (sk.isConnectable()) addSubscriberTask(executableTasks, 
subscribers, EVENT_CONNECT);
 
-            if (sk.isReadable()) {
-                if (ss.readSubscriber == null) {
-                    throw new AssertionError("event fired, but not registered 
before");
-                }
-                executableTasks.add(ss.readSubscriber);
-                ss.readSubscriber.ctx.reasons.add(Reason.READ_READY);
-            }
-            if (sk.isWritable()) {
-                if (ss.writeSubscriber == null) {
-                    throw new AssertionError("event fired, but not registered 
before");
-                }
-                executableTasks.add(ss.writeSubscriber);
-                ss.writeSubscriber.ctx.reasons.add(Reason.WRITE_READY);
-            }
-            if (sk.isConnectable()) {
-                if (ss.connectSubscriber == null) {
-                    throw new AssertionError("event fired, but not registered 
before");
-                }
-                executableTasks.add(ss.connectSubscriber);
-                ss.connectSubscriber.ctx.reasons.add(Reason.CONNECT_READY);
-            }
-            if (sk.isAcceptable()) {
-                if (ss.acceptSubscriber == null) {
-                    throw new AssertionError("event fired, but not registered 
before");
-                }
-                executableTasks.add(ss.acceptSubscriber);
-                ss.acceptSubscriber.ctx.reasons.add(Reason.ACCEPT_READY);
-            }
         }
-        for (TaskIdentifier tt : executableTasks) {
+        for (TaskConfiguration tt : executableTasks) {
             // cancel subscriptions to other events, we can execute now!
             tt.deregister();
             queueReady(tt);
@@ -614,26 +422,23 @@
 
     /**
      * Initialize and run scheduler. This function will return when all tasks
-     * have completed. On systems with signals, receiving a SIGTERM (and other
-     * similar signals) will cause "GNUNET_SCHEDULER_shutdown" to be run after
-     * the active task is complete. As a result, SIGTERM causes all active 
tasks
-     * to be scheduled with reason "GNUNET_SCHEDULER_REASON_SHUTDOWN". 
(However,
-     * tasks added afterwards will execute normally!).
+     * have completed.
      *
-     * @param task task to run immediately
+     * @param initialTask the initial task to run immediately
      */
-    public static void run(Task task) {
-        addContinuation(task, EnumSet.of(Reason.STARTUP));
+    public static void run(Task initialTask) {
+        addContinuation(initialTask, EnumSet.of(Reason.STARTUP));
 
+        // the gnunet main loop
         while (checkLiveness()) {
-
             RelativeTime nextTimeout = handleTimeouts();
 
-            // don't select if there are no tasks
+            // don't select if there are no tasks; we are done!
             if (readyCount == 0 && pending.isEmpty()) {
                 return;
             }
 
+            // don't block in select if we have tasks ready to run!
             if (readyCount > 0) {
                 handleSelect(RelativeTime.ZERO);
             } else {
@@ -660,9 +465,9 @@
             // start executing from the highest priority down to 0
             for (int p = Priority.size - 1; p >= 0; p--) {
                 // execute all tasks with priority p
-                LinkedList<TaskIdentifier> queue = ready.get(p);
+                LinkedList<TaskConfiguration> queue = ready.get(p);
                 while (!queue.isEmpty()) {
-                    TaskIdentifier tid = queue.removeFirst();
+                    TaskConfiguration tid = queue.removeFirst();
                     readyCount--;
                     tid.run();
                 }
@@ -679,7 +484,7 @@
      */
     public static void shutdown() {
         // queueReady() while iterating would yield concurrent modification 
exn otherwise
-        for (TaskIdentifier tid : new ArrayList<TaskIdentifier>(pending)) {
+        for (TaskConfiguration tid : new 
ArrayList<TaskConfiguration>(pending)) {
             tid.ctx.reasons.add(Reason.SHUTDOWN);
             queueReady(tid);
         }

Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-05-03 11:36:52 UTC (rev 
21244)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-05-03 11:54:32 UTC (rev 
21245)
@@ -124,6 +124,7 @@
      * @param srv ...
      */
     private void doAccept(final ServerSocketChannel srv) {
+        /*
         Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
         b.withTask(new Scheduler.Task() {
             @Override
@@ -140,6 +141,7 @@
         });
         b.withSelectAccept(srv);
         Scheduler.add(b);
+        */
     }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]