[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r21245 - in gnunet-java/src/org/gnunet: construct construct
From: |
gnunet |
Subject: |
[GNUnet-SVN] r21245 - in gnunet-java/src/org/gnunet: construct construct/parsers core dht statistics util |
Date: |
Thu, 3 May 2012 13:54:32 +0200 |
Author: dold
Date: 2012-05-03 13:54:32 +0200 (Thu, 03 May 2012)
New Revision: 21245
Added:
gnunet-java/src/org/gnunet/dht/BlockType.java
gnunet-java/src/org/gnunet/dht/ClientGetMessage.java
gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java
gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
gnunet-java/src/org/gnunet/dht/ClientResultMessage.java
gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java
gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java
gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java
gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
gnunet-java/src/org/gnunet/dht/RouteOption.java
gnunet-java/src/org/gnunet/statistics/Request.java
gnunet-java/src/org/gnunet/statistics/RequestMessage.java
gnunet-java/src/org/gnunet/statistics/RequestQueue.java
gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
gnunet-java/src/org/gnunet/statistics/SetMessage.java
Modified:
gnunet-java/src/org/gnunet/construct/Construct.java
gnunet-java/src/org/gnunet/construct/MessageLoader.java
gnunet-java/src/org/gnunet/construct/MsgMap.txt
gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
gnunet-java/src/org/gnunet/core/Core.java
gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
gnunet-java/src/org/gnunet/statistics/Statistics.java
gnunet-java/src/org/gnunet/util/Client.java
gnunet-java/src/org/gnunet/util/Connection.java
gnunet-java/src/org/gnunet/util/Resolver.java
gnunet-java/src/org/gnunet/util/Scheduler.java
gnunet-java/src/org/gnunet/util/Server.java
Log:
refactored Scheduler, Statistics, fixes
Modified: gnunet-java/src/org/gnunet/construct/Construct.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/Construct.java 2012-05-03 11:36:52 UTC
(rev 21244)
+++ gnunet-java/src/org/gnunet/construct/Construct.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -36,6 +36,7 @@
* @author Christian Grothoff
* @author Florian Dold
*/
address@hidden("unchecked")
public class Construct {
private static final Logger logger = LoggerFactory
.getLogger(Construct.class);
@@ -48,8 +49,9 @@
* Information the root of the parser, if the target is nested in another
message.
*/
private static class ParserContext {
+ List<Field> parserPath = new ArrayList<Field>();
+ // fully determined by parserPath
ArrayList<Field> frameSizePath = new ArrayList<Field>();
- List<Field> parserPath = new ArrayList<Field>();
@Override
public boolean equals(Object other) {
Modified: gnunet-java/src/org/gnunet/construct/MessageLoader.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-05-03
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/MessageLoader.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -129,6 +129,7 @@
}
+ @SuppressWarnings("unchecked")
private static Class<? extends MessageUnion> loadClass(String className) {
ClassLoader cl = ClassLoader.getSystemClassLoader();
Class<MessageUnion> msgClass;
Modified: gnunet-java/src/org/gnunet/construct/MsgMap.txt
===================================================================
--- gnunet-java/src/org/gnunet/construct/MsgMap.txt 2012-05-03 11:36:52 UTC
(rev 21244)
+++ gnunet-java/src/org/gnunet/construct/MsgMap.txt 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -3,25 +3,29 @@
org.gnunet.util.Resolver$Address|0=org.gnunet.util.Resolver$TextualAddress
org.gnunet.util.Resolver$Address|1=org.gnunet.util.Resolver$NumericAddress
org.gnunet.util.GnunetMessage$Body|68=org.gnunet.core.Core$DisconnectNotifyMessage
-org.gnunet.util.GnunetMessage$Body|171=org.gnunet.statistics.Statistics$ResponseEndMessage
-org.gnunet.util.GnunetMessage$Body|170=org.gnunet.statistics.Statistics$ResponseValueMessage
org.gnunet.util.GnunetMessage$Body|70=org.gnunet.core.Core$NotifyInboundTrafficMessage
-org.gnunet.util.GnunetMessage$Body|169=org.gnunet.statistics.Statistics$RequestMessage
org.gnunet.util.GnunetMessage$Body|71=org.gnunet.core.Core$NotifyOutboundTrafficMessage
-org.gnunet.util.GnunetMessage$Body|168=org.gnunet.statistics.Statistics$SetMessage
org.gnunet.util.GnunetMessage$Body|4=org.gnunet.util.Resolver$GetMessage
org.gnunet.util.GnunetMessage$Body|64=org.gnunet.core.Core$InitMessage
+org.gnunet.util.GnunetMessage$Body|5=org.gnunet.util.Resolver$ResolverResponse
org.gnunet.util.GnunetMessage$Body|65=org.gnunet.core.Core$InitReplyMessage
-org.gnunet.util.GnunetMessage$Body|5=org.gnunet.util.Resolver$ResolverResponse
-org.gnunet.util.GnunetMessage$Body|143=org.gnunet.dht.DistributedHashTable$DHTClientGetMessage
+org.gnunet.util.GnunetMessage$Body|143=org.gnunet.dht.ClientGetMessage
+org.gnunet.util.GnunetMessage$Body|142=org.gnunet.dht.ClientPutMessage
org.gnunet.util.GnunetMessage$Body|67=org.gnunet.core.Core$ConnectNotifyMessage
-org.gnunet.util.GnunetMessage$Body|142=org.gnunet.dht.DistributedHashTable$DHTClientPutMessage
org.gnunet.util.GnunetMessage$Body|76=org.gnunet.core.Core$SendMessage
org.gnunet.util.GnunetMessage$Body|74=org.gnunet.core.Core$SendMessageRequest
org.gnunet.util.GnunetMessage$Body|75=org.gnunet.core.Core$SendMessageReady
+org.gnunet.util.GnunetMessage$Body|153=org.gnunet.dht.MonitorStartMessage
org.gnunet.util.GnunetMessage$Body|42001=org.gnunet.core.Core$MyMessage
org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.NetworkSizeEstimation$UpdateMessage
org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.NetworkSizeEstimation$StartMessage
-org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.DistributedHashTable$DHTClientGetStopMessage
-org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.DistributedHashTable$DHTClientResultMessage
-# generated 2012/04/19 13:45:01
+org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage
+org.gnunet.util.GnunetMessage$Body|145=org.gnunet.dht.ClientResultMessage
+org.gnunet.util.GnunetMessage$Body|149=org.gnunet.dht.MonitorGetMessage
+org.gnunet.util.GnunetMessage$Body|150=org.gnunet.dht.MonitorGetRespMessage
+org.gnunet.util.GnunetMessage$Body|151=org.gnunet.dht.MonitorPutMessage
+org.gnunet.util.GnunetMessage$Body|171=org.gnunet.statistics.ResponseEndMessage
+org.gnunet.util.GnunetMessage$Body|170=org.gnunet.statistics.ResponseValueMessage
+org.gnunet.util.GnunetMessage$Body|169=org.gnunet.statistics.RequestMessage
+org.gnunet.util.GnunetMessage$Body|168=org.gnunet.statistics.SetMessage
+# generated 2012/05/02 20:26:30
Modified: gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/FillParser.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -54,6 +54,7 @@
ArrayList<Message> list = new ArrayList<Message>(10);
while (remaining > 0) {
+ @SuppressWarnings("unchecked")
Message next = ReflectUtil.justInstantiate((Class<Message>)
targetField.getType().getComponentType());
int s = elemParser.parse(srcBuf, frameOffset, frameObj, next);
size += s;
@@ -63,7 +64,7 @@
try {
targetField.set(dstObj, list.toArray());
} catch (IllegalAccessException e) {
- throw new AssertionError("cannot acces field");
+ throw new AssertionError("cannot access field");
}
return size;
Modified: gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/FixedSizeArrayParser.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -46,6 +46,7 @@
ReflectUtil.justSet(dstObj, targetField, arr);
for (int i = 0; i < elemNumber; ++i) {
+ @SuppressWarnings("unchecked")
Message elemObj =
ReflectUtil.justInstantiate((Class<Message>)targetField.getType().getComponentType());
Array.set(arr, i, elemObj);
Modified: gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/UnionParser.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -8,6 +8,9 @@
import java.util.List;
+
+// unchecked casts are necessary
address@hidden("unchecked")
public class UnionParser implements Parser {
private final List<Field> frameSizePath;
@@ -81,7 +84,8 @@
return p.write(dstBuf, (Message) ReflectUtil.justGet(src,
targetField));
}
-
+
+ @SuppressWarnings("unchecked")
public int getTag(Message m) {
return MessageLoader.getUnionTag(unionType, (Class<MessageUnion>)
ReflectUtil.justGet(m, targetField).getClass());
}
Modified:
gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
===================================================================
--- gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
2012-05-03 11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/construct/parsers/VariableSizeArrayParser.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -37,7 +37,8 @@
@Override
public int parse(final ByteBuffer srcBuf, int frameOffset, Message
frameObj, final Message dstObj) {
final int elemNumber = (int) sizeField.get(dstObj);
- final Class arrayElementType =
targetField.getType().getComponentType();
+ @SuppressWarnings("unchecked")
+ final Class<Message> arrayElementType = (Class<Message>)
targetField.getType().getComponentType();
int size = 0;
@@ -47,7 +48,7 @@
for (int i = 0; i < elemNumber; ++i) {
Message elemObj;
- elemObj = (Message) ReflectUtil.justInstantiate(arrayElementType);
+ elemObj = (Message)
ReflectUtil.<Message>justInstantiate(arrayElementType);
Array.set(arr, i, elemObj);
Modified: gnunet-java/src/org/gnunet/core/Core.java
===================================================================
--- gnunet-java/src/org/gnunet/core/Core.java 2012-05-03 11:36:52 UTC (rev
21244)
+++ gnunet-java/src/org/gnunet/core/Core.java 2012-05-03 11:54:32 UTC (rev
21245)
@@ -633,6 +633,7 @@
return ret;
}
+ @SuppressWarnings("unchecked")
private static int[] getRunaboutMessageTypes(Runabout r) {
ArrayList<Class> visitees = getRunaboutVisitees(r);
int[] msgtypes = new int[visitees.size()];
Added: gnunet-java/src/org/gnunet/dht/BlockType.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/BlockType.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/BlockType.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -0,0 +1,61 @@
+package org.gnunet.dht;
+
+/**
+ * Information on how to interpret a block of data.
+ */
+public enum BlockType {
+ /**
+ * Any type of block, used as a wildcard when searching. Should
+ * never be attached to a specific block.
+ */
+ ANY(0),
+ /**
+ * Data block (leaf) in the CHK tree.
+ */
+ DBLOCK(1),
+ /**
+ * Inner block in the CHK tree.
+ */
+ IBLOCK(2),
+ /**
+ * Type of a block representing a keyword search result. Note that
+ * the values for KBLOCK, SBLOCK and NBLOCK must be consecutive.
+ */
+ KBLOCK(3),
+ /**
+ * Type of a block that is used to advertise content in a namespace.
+ */
+ SBLOCK(4),
+ /**
+ * Type of a block that is used to advertise a namespace.
+ */
+ NBLOCK(5),
+ /**
+ * Type of a block representing a block to be encoded on demand from disk.
+ * Should never appear on the network directly.
+ */
+ FS_ONDEMAND(6),
+ /**
+ * Type of a block that contains a HELLO for a peer (for
+ * DHT find-peer operations).
+ */
+ DHT_HELLO(7),
+ /**
+ * Block for testing.
+ */
+ TEST(8),
+ /**
+ * Block for storing .gnunet-domains
+ */
+ DNS(10),
+ /**
+ * Block for storing record data
+ */
+ NAMERECORD(11);
+
+ public final int val;
+
+ BlockType(int val) {
+ this.val = val;
+ }
+}
Added: gnunet-java/src/org/gnunet/dht/ClientGetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientGetMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientGetMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,31 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(143)
+public class ClientGetMessage implements GnunetMessage.Body {
+ /**
+ * Combination of RouteOption.*
+ */
+ @UInt32
+ public int options;
+ @UInt32
+ public int desiredReplicationLevel;
+ @UInt32
+ public int type;
+ @NestedMessage
+ public HashCode key;
+ @UInt64
+ public long uniqueId;
+ @ByteFill
+ public byte[] xquery;
+}
Added: gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientGetStopMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,25 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(144)
+public class ClientGetStopMessage implements GnunetMessage.Body {
+ @UInt32
+ public int reserved = 0;
+ @UInt64
+ public long unique_id;
+ @NestedMessage
+ public HashCode key;
+}
Added: gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientPutMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,38 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.ByteFill;
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:05 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(142)
+public class ClientPutMessage implements GnunetMessage.Body {
+ /**
+ * Type of data to insert, one of BlockType.*
+ */
+ @UInt32
+ public int type;
+ /**
+ * Combination of RouteOption.*
+ */
+ @UInt32
+ public int options;
+ @UInt32
+ public int desiredReplicationLevel;
+ @NestedMessage
+ public AbsoluteTimeMessage expiration;
+ @NestedMessage
+ public HashCode hash;
+ @ByteFill
+ public byte[] data;
+}
Added: gnunet-java/src/org/gnunet/dht/ClientResultMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientResultMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientResultMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,36 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:06 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(145)
+public class ClientResultMessage implements GnunetMessage.Body {
+ @UInt32
+ public int type;
+ @UInt32
+ public int putPathLength;
+ @UInt32
+ public int getPathLength;
+ @UInt64
+ public long uid;
+ @NestedMessage
+ public AbsoluteTimeMessage expiration;
+ @NestedMessage
+ public HashCode key;
+ @VariableSizeArray(lengthField = "putPathLength")
+ public PeerIdentity[] putPath;
+ @VariableSizeArray(lengthField = "getPathLength")
+ public PeerIdentity[] getPath;
+ @ByteFill
+ public byte[] data;
+}
Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-03
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -1,6 +1,5 @@
package org.gnunet.dht;
-import org.gnunet.construct.*;
import org.gnunet.util.*;
import org.gnunet.util.getopt.Option;
import org.gnunet.util.getopt.OptionAction;
@@ -23,366 +22,9 @@
private static final Logger logger = LoggerFactory
.getLogger(DistributedHashTable.class);
+ private Configuration cfg;
/**
- * Information on how to interpret a block of data.
- */
- public enum BlockType {
- /**
- * Any type of block, used as a wildcard when searching. Should
- * never be attached to a specific block.
- */
- ANY(0),
- /**
- * Data block (leaf) in the CHK tree.
- */
- DBLOCK(1),
- /**
- * Inner block in the CHK tree.
- */
- IBLOCK(2),
- /**
- * Type of a block representing a keyword search result. Note that
- * the values for KBLOCK, SBLOCK and NBLOCK must be consecutive.
- */
- KBLOCK(3),
- /**
- * Type of a block that is used to advertise content in a namespace.
- */
- SBLOCK(4),
- /**
- * Type of a block that is used to advertise a namespace.
- */
- NBLOCK(5),
- /**
- * Type of a block representing a block to be encoded on demand from
disk.
- * Should never appear on the network directly.
- */
- FS_ONDEMAND(6),
- /**
- * Type of a block that contains a HELLO for a peer (for
- * DHT find-peer operations).
- */
- DHT_HELLO(7),
- /**
- * Block for testing.
- */
- TEST(8),
- /**
- * Block for storing .gnunet-domains
- */
- DNS(10),
- /**
- * Block for storing record data
- */
- NAMERECORD(11);
-
- private int val;
-
- BlockType(int val) {
- this.val = val;
- }
- }
-
-
- /**
- * Options passed to the dht service for routing requests.
- */
- enum RouteOption {
- /**
- * Default. Do nothing special.
- */
- NONE(0),
- /**
- * Each peer along the way should look at 'enc' (otherwise
- * only the k-peers closest to the key should look at it).
- */
- DEMULTIPLEX_EVERYWHERE(1),
- /**
- * We should keep track of the route that the message
- * took in the P2P network.
- */
- RECORD_ROUTE(2),
- /**
- * This is a 'FIND-PEER' request, so approximate results are fine.
- */
- FIND_PEER(4),
- /**
- * Possible message option for query key randomization.
- */
- BART(8);
-
- private int val;
-
- RouteOption(int val) {
- this.val = val;
- }
- }
-
- @UnionCase(143)
- public static class DHTClientGetMessage implements GnunetMessage.Body {
- /**
- * Combination of RouteOption.*
- */
- @UInt32
- public int options;
- @UInt32
- public int desiredReplicationLevel;
- @UInt32
- public int type;
- @NestedMessage
- public HashCode key;
- @UInt64
- public long uniqueId;
- @ByteFill
- public byte[] xquery;
- }
-
- @UnionCase(142)
- public static class DHTClientPutMessage implements GnunetMessage.Body {
- /**
- * Type of data to insert, one of BlockType.*
- */
- @UInt32
- public int type;
- /**
- * Combination of RouteOption.*
- */
- @UInt32
- public int options;
- @UInt32
- public int desiredReplicationLevel;
- @NestedMessage
- public AbsoluteTimeMessage expiration;
- @NestedMessage
- public HashCode hash;
- @ByteFill
- public byte[] data;
- }
-
- @UnionCase(144)
- public static class DHTClientGetStopMessage implements GnunetMessage.Body {
- @UInt32
- public int reserved = 0;
- @UInt64
- public long unique_id;
- @NestedMessage
- public HashCode key;
- }
-
- @UnionCase(145)
- public static class DHTClientResultMessage implements GnunetMessage.Body {
- @UInt32
- public int type;
- @UInt32
- public int putPathLength;
- @UInt32
- public int getPathLength;
- @UInt64
- public long uid;
- @NestedMessage
- public AbsoluteTimeMessage expiration;
- @NestedMessage
- public HashCode key;
- @VariableSizeArray(lengthField = "putPathLength")
- public PeerIdentity[] putPath;
- @VariableSizeArray(lengthField = "getPathLength")
- public PeerIdentity[] getPath;
- @ByteFill
- public byte[] data;
- }
-
-
- /**
- * Message to request monitoring messages, clients --> DHT service.
- */
- @UnionCase(153)
- public static class DHTMonitorStartMessage implements GnunetMessage.Body {
- /**
- * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
- */
- @UInt32
- public int type;
-
- /**
- * Flag whether to notify about GET messages.
- */
- @UInt16
- public int get;
-
- /**
- * Flag whether to notify about GET_REPONSE messages.
- */
- @UInt16
- public int getResp;
-
- /**
- * Flag whether to notify about PUT messages.
- */
- @UInt16
- public int put;
-
- /**
- * Flag whether to use the provided key to filter messages.
- */
- @UInt16
- public int filter_key;
-
- /*
- The key to filter messages by.
- */
- @NestedMessage
- public HashCode key;
- }
-
- /**
- * Message to monitor put requests going through peer, DHT service -->
clients.
- */
- @UnionCase(151)
- public static class DHTMonitorPutMessage implements GnunetMessage.Body {
- /**
- * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
- */
- @UInt32
- public int options;
-
- /**
- * The type of data in the request.
- */
- @UInt32
- public int type;
-
- /**
- * Hop count so far.
- */
- @UInt32
- public int hop_count;
-
- /**
- * Replication level for this message
- */
- @UInt32
- public int desired_replication_level;
-
- /**
- * Number of peers recorded in the outgoing path from source to the
- * storage location of this message.
- */
- @UInt32
- public int put_path_length;
-
- /**
- * How long should this data persist?
- */
- @NestedMessage
- public AbsoluteTimeMessage expirationTime;
-
- /**
- * The key to store the value under.
- */
- @NestedMessage
- public HashCode key;
-
- @VariableSizeArray(lengthField = "put_path_length")
- public PeerIdentity[] putPath;
-
- @ByteFill
- public byte[] data;
- }
-
-
- /**
- * Message to monitor get requests going through peer, DHT service ->
clients.
- */
- @UnionCase(149)
- public class DHTMonitorGetMessage implements GnunetMessage.Body {
- /**
- * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
- */
- @UInt32
- public int options;
-
- /**
- * The type of data in the request.
- */
- @UInt32
- public int type;
-
- /**
- * Hop count
- */
- @UInt32
- public int hop_count;
-
- /**
- * Replication level for this message
- */
- @UInt32
- public int desired_replication_level;
-
- /**
- * Number of peers recorded in the outgoing path from source to the
- * storage location of this message.
- */
- @UInt32
- public int get_path_length;
-
- /**
- * The key to store the value under.
- */
- @NestedMessage
- public HashCode key;
-
- @VariableSizeArray(lengthField = "get_path_length")
- public PeerIdentity[] getPath;
- }
-
- /**
- * Message to monitor get results going through peer, DHT service -->
clients.
- */
- @UnionCase(150)
- public class MonitorGetRespMessage implements GnunetMessage.Body {
- /**
- * Content type.
- */
- @UInt32
- int type;
-
- /**
- * Length of the PUT path that follows (if tracked).
- */
- @UInt32
- int put_path_length;
-
- /**
- * Length of the GET path that follows (if tracked).
- */
- @UInt32
- int get_path_length;
-
- /**
- * When does the content expire?
- */
- @NestedMessage
- public AbsoluteTimeMessage expiration;
-
- /**
- * The key of the corresponding GET request.
- */
- @NestedMessage
- public HashCode key;
-
- @VariableSizeArray(lengthField = "put_path_length")
- public PeerIdentity[] putPath;
-
- @VariableSizeArray(lengthField = "get_path_length")
- public PeerIdentity[] getPath;
-
- @ByteFill
- public byte[] data;
- }
-
- /**
* Callback object for requests to the dht
*/
public interface ResultCallback {
@@ -417,7 +59,7 @@
* @param cfg the configuration to use
*/
public DistributedHashTable(Configuration cfg) {
- Configuration cfg1 = cfg;
+ this.cfg = cfg;
client = new Client("dht", cfg);
}
@@ -437,7 +79,7 @@
public void put(HashCode key, byte[] data, int replicationLevel,
Set<RouteOption> routeOptions,
BlockType type, AbsoluteTime expiration,
RelativeTime timeout, final Continuation cont) {
- final DHTClientPutMessage cpm = new DHTClientPutMessage();
+ final ClientPutMessage cpm = new ClientPutMessage();
cpm.data = data;
cpm.hash = key;
cpm.desiredReplicationLevel = replicationLevel;
@@ -488,7 +130,7 @@
client.notifyTransmitReady(RelativeTime.FOREVER, false, 0, new
MessageTransmitter() {
@Override
public void transmit(Connection.MessageSink sink) {
- final DHTClientGetStopMessage sm = new
DHTClientGetStopMessage();
+ final ClientGetStopMessage sm = new ClientGetStopMessage();
sm.key = key;
sm.unique_id = uid;
sink.send(sm);
@@ -511,7 +153,7 @@
public void process(GnunetMessage.Body msg) {
receiveHandle = null;
- DHTClientResultMessage rm = (DHTClientResultMessage) msg;
+ ClientResultMessage rm = (ClientResultMessage) msg;
GetRequest request = null;
for (GetRequest r : activeRequests) {
@@ -560,7 +202,7 @@
request.cb = cb;
request.deadline = timeout.toAbsolute();
- final DHTClientGetMessage getMessage = new DHTClientGetMessage();
+ final ClientGetMessage getMessage = new ClientGetMessage();
getMessage.desiredReplicationLevel = replication;
getMessage.key = key;
getMessage.options = 0;
Added: gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorGetMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,55 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.VariableSizeArray;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor get requests going through peer, DHT service -> clients.
+ */
address@hidden(149)
+public class MonitorGetMessage implements GnunetMessage.Body {
+ /**
+ * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+ */
+ @UInt32
+ public int options;
+
+ /**
+ * The type of data in the request.
+ */
+ @UInt32
+ public int type;
+
+ /**
+ * Hop count
+ */
+ @UInt32
+ public int hop_count;
+
+ /**
+ * Replication level for this message
+ */
+ @UInt32
+ public int desired_replication_level;
+
+ /**
+ * Number of peers recorded in the outgoing path from source to the
+ * storage location of this message.
+ */
+ @UInt32
+ public int get_path_length;
+
+ /**
+ * The key to store the value under.
+ */
+ @NestedMessage
+ public HashCode key;
+
+ @VariableSizeArray(lengthField = "get_path_length")
+ public PeerIdentity[] getPath;
+}
Added: gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorGetRespMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,52 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor get results going through peer, DHT service --> clients.
+ */
address@hidden(150)
+public class MonitorGetRespMessage implements GnunetMessage.Body {
+ /**
+ * Content type.
+ */
+ @UInt32
+ int type;
+
+ /**
+ * Length of the PUT path that follows (if tracked).
+ */
+ @UInt32
+ int put_path_length;
+
+ /**
+ * Length of the GET path that follows (if tracked).
+ */
+ @UInt32
+ int get_path_length;
+
+ /**
+ * When does the content expire?
+ */
+ @NestedMessage
+ public AbsoluteTimeMessage expiration;
+
+ /**
+ * The key of the corresponding GET request.
+ */
+ @NestedMessage
+ public HashCode key;
+
+ @VariableSizeArray(lengthField = "put_path_length")
+ public PeerIdentity[] putPath;
+
+ @VariableSizeArray(lengthField = "get_path_length")
+ public PeerIdentity[] getPath;
+
+ @ByteFill
+ public byte[] data;
+}
Added: gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorPutMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,62 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.*;
+import org.gnunet.util.AbsoluteTimeMessage;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+/**
+ * Message to monitor put requests going through peer, DHT service --> clients.
+ */
address@hidden(151)
+public class MonitorPutMessage implements GnunetMessage.Body {
+ /**
+ * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+ */
+ @UInt32
+ public int options;
+
+ /**
+ * The type of data in the request.
+ */
+ @UInt32
+ public int type;
+
+ /**
+ * Hop count so far.
+ */
+ @UInt32
+ public int hop_count;
+
+ /**
+ * Replication level for this message
+ */
+ @UInt32
+ public int desired_replication_level;
+
+ /**
+ * Number of peers recorded in the outgoing path from source to the
+ * storage location of this message.
+ */
+ @UInt32
+ public int put_path_length;
+
+ /**
+ * How long should this data persist?
+ */
+ @NestedMessage
+ public AbsoluteTimeMessage expirationTime;
+
+ /**
+ * The key to store the value under.
+ */
+ @NestedMessage
+ public HashCode key;
+
+ @VariableSizeArray(lengthField = "put_path_length")
+ public PeerIdentity[] putPath;
+
+ @ByteFill
+ public byte[] data;
+}
Added: gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,50 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt16;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+ * Message to request monitoring messages, clients --> DHT service.
+ */
address@hidden(153)
+public class MonitorStartMessage implements GnunetMessage.Body {
+ /**
+ * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+ */
+ @UInt32
+ public int type;
+
+ /**
+ * Flag whether to notify about GET messages.
+ */
+ @UInt16
+ public int get;
+
+ /**
+ * Flag whether to notify about GET_REPONSE messages.
+ */
+ @UInt16
+ public int getResp;
+
+ /**
+ * Flag whether to notify about PUT messages.
+ */
+ @UInt16
+ public int put;
+
+ /**
+ * Flag whether to use the provided key to filter messages.
+ */
+ @UInt16
+ public int filter_key;
+
+ /*
+ The key to filter messages by.
+ */
+ @NestedMessage
+ public HashCode key;
+}
Added: gnunet-java/src/org/gnunet/dht/RouteOption.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/RouteOption.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/RouteOption.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -0,0 +1,35 @@
+package org.gnunet.dht;
+
+/**
+ * Options passed to the dht service for routing requests.
+ */
+enum RouteOption {
+ /**
+ * Default. Do nothing special.
+ */
+ NONE(0),
+ /**
+ * Each peer along the way should look at 'enc' (otherwise
+ * only the k-peers closest to the key should look at it).
+ */
+ DEMULTIPLEX_EVERYWHERE(1),
+ /**
+ * We should keep track of the route that the message
+ * took in the P2P network.
+ */
+ RECORD_ROUTE(2),
+ /**
+ * This is a 'FIND-PEER' request, so approximate results are fine.
+ */
+ FIND_PEER(4),
+ /**
+ * Possible message option for query key randomization.
+ */
+ BART(8);
+
+ private int val;
+
+ RouteOption(int val) {
+ this.val = val;
+ }
+}
Added: gnunet-java/src/org/gnunet/statistics/Request.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Request.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/Request.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -0,0 +1,29 @@
+package org.gnunet.statistics;
+
+import org.gnunet.util.AbsoluteTime;
+import org.gnunet.util.MessageTransmitter;
+
+public interface Request extends MessageTransmitter {
+ /**
+ * cancel action specific to a certain type of request
+ */
+ void cancel();
+
+ /**
+ * Deadline for transmitting the request.
+ *
+ */
+ AbsoluteTime getDeadline();
+
+ /**
+ *
+ * @return true if the request should be kept after the destroy request
+ */
+ boolean onDestroy();
+
+ /**
+ *
+ * @return true if the request should be kept after the reconnect
+ */
+ boolean onReconnect();
+}
Added: gnunet-java/src/org/gnunet/statistics/RequestMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/RequestMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/RequestMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,13 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
address@hidden(169)
+public class RequestMessage implements GnunetMessage.Body {
+ @ZeroTerminatedString
+ public String subsystemName;
+ @ZeroTerminatedString
+ public String statisticsName;
+}
Added: gnunet-java/src/org/gnunet/statistics/RequestQueue.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/RequestQueue.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/RequestQueue.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,129 @@
+package org.gnunet.statistics;
+
+import org.gnunet.util.*;
+
+import java.util.LinkedList;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 8:03 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class RequestQueue {
+
+ /**
+ * Requests to be transmitted to the service.
+ */
+ private final LinkedList<Request> requests = new LinkedList<Request>();
+ /**
+ * The designated receiver for all messages.
+ */
+ private MessageReceiver receiver;
+
+ /**
+ * The active transmit request handle, if any.
+ */
+ private Cancelable currentTransmit;
+
+ /**
+ * Current receive handler.
+ */
+ private Cancelable currentReceive;
+
+ private boolean destroyRequested = false;
+ private final Client client;
+
+ public RequestQueue(Client client, MessageReceiver receiver) {
+ this.client = client;
+ this.receiver = receiver;
+ }
+
+ /**
+ * Handle next request.
+ */
+ private void handleNextRequest() {
+ if (currentTransmit != null) {
+ return;
+ }
+
+ final Request request = requests.poll();
+ if (request == null) {
+ if (destroyRequested) {
+ client.disconnect();
+ }
+ return;
+ }
+
+ currentTransmit =
client.notifyTransmitReady(request.getDeadline().getRemaining(), true, 0, new
MessageTransmitter() {
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ currentTransmit = null;
+
+ request.transmit(sink);
+
+ handleReceive();
+ handleNextRequest();
+ }
+
+ @Override
+ public void handleError() {
+ request.handleError();
+ }
+ });
+ }
+
+ private void handleReceive() {
+ if (currentReceive != null || destroyRequested) {
+ return;
+ }
+ currentReceive = client.receive(RelativeTime.FOREVER, new
MessageReceiver() {
+ @Override
+ public void process(GnunetMessage.Body msg) {
+ currentReceive = null;
+
+ receiver.process(msg);
+
+ handleNextRequest();
+ handleReceive();
+ }
+
+ @Override
+ public void handleError() {
+ receiver.handleError();
+ }
+ });
+ }
+
+ public Cancelable add(final Request request) {
+ requests.add(request);
+ handleNextRequest();
+
+ return new Cancelable() {
+ @Override
+ public void cancel() {
+ RequestQueue.this.requests.remove(request);
+ request.cancel();
+ }
+ };
+ }
+
+ public void destroy() {
+ destroyRequested = true;
+
+ final LinkedList<Request> remove = new LinkedList<Request>();
+
+ for (Request r : requests) {
+ boolean keep = r.onDestroy();
+ if (!keep) {
+ remove.add(r);
+ }
+ }
+ requests.removeAll(remove);
+
+ if (requests.isEmpty()) {
+ client.disconnect();
+ }
+ }
+}
Added: gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/ResponseEndMessage.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -0,0 +1,16 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:09 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(171)
+public class ResponseEndMessage implements GnunetMessage.Body {
+ // empty
+}
Added: gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/ResponseValueMessage.java
2012-05-03 11:54:32 UTC (rev 21245)
@@ -0,0 +1,31 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:09 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(170)
+public class ResponseValueMessage implements GnunetMessage.Body {
+ /**
+ * Unique numerical identifier for the value (will
+ * not change during the same client-session). Highest
+ * bit will be set for persistent values.
+ */
+ @UInt32
+ public long uid;
+ @UInt64
+ public long value;
+ @ZeroTerminatedString
+ public String subsystemName;
+ @ZeroTerminatedString
+ public String statisticName;
+}
Added: gnunet-java/src/org/gnunet/statistics/SetMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/SetMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/SetMessage.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -0,0 +1,26 @@
+package org.gnunet.statistics;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.ZeroTerminatedString;
+import org.gnunet.util.GnunetMessage;
+
+/**
+* Created with IntelliJ IDEA.
+* User: dold
+* Date: 5/2/12
+* Time: 7:10 PM
+* To change this template use File | Settings | File Templates.
+*/
address@hidden(168)
+public class SetMessage implements GnunetMessage.Body {
+ @UInt32
+ public int flags;
+ @UInt64
+ public long value;
+ @ZeroTerminatedString
+ public String subsystemName;
+ @ZeroTerminatedString
+ public String statisticName;
+}
Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-05-03
11:36:52 UTC (rev 21244)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-05-03
11:54:32 UTC (rev 21245)
@@ -6,152 +6,85 @@
package org.gnunet.statistics;
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UInt64;
-import org.gnunet.construct.UnionCase;
-import org.gnunet.construct.ZeroTerminatedString;
import org.gnunet.util.*;
import org.gnunet.util.getopt.Option;
import org.gnunet.util.getopt.OptionAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.LinkedList;
-
/**
* API for the gnunet statistics service.
* <p/>
- * Write and read statistics values, represented as unsigned 64bit integer.
+ * Set, get and monitor statistics values, represented as unsigned 64bit
integer.
+ * Note that address@hidden long}, java's largest primitive type, can only
store signed 64bit integers.
+ * With absolute operation, its negative values are interpreted as large
numbers by the statistics api.
*/
public class Statistics {
private static final Logger logger = LoggerFactory
.getLogger(Statistics.class);
/**
- * Time after we give up setting values in statistics
+ * Time after we give up on setting values in statistics
*/
private static final RelativeTime SET_TIMEOUT =
RelativeTime.SECOND.multiply(5);
+ private final static int SETFLAG_ABSOLUTE = 0;
+ private final static int SETFLAG_RELATIVE = 1;
+ private final static int SETFLAG_PERSIST = 2;
- private interface Request {
- boolean dropOnShutdown();
- AbsoluteTime getDeadline();
- }
-
- /**
- * a generic implementation of a request queue
- */
- private class RequestQueue<R extends Request> {
- public Cancelable addRequest(R request) {
- return null;
- }
-
- }
-
-
private Client client;
- private LinkedList<StatisticsRequest> requests = new
LinkedList<StatisticsRequest>();
- private Cancelable currentTransmit;
- private boolean disconnectRequested = false;
- @UnionCase(169)
- public static class RequestMessage implements GnunetMessage.Body {
- @ZeroTerminatedString
- public String subsystemName;
- @ZeroTerminatedString
- public String statisticsName;
- }
+ private RequestQueue requestQueue;
- @UnionCase(170)
- public static class ResponseValueMessage implements GnunetMessage.Body {
- @UInt32
- public long uid;
- @UInt64
- public long value;
- @ZeroTerminatedString
- public String subsystemName;
- @ZeroTerminatedString
- public String statisticName;
- }
+ private StatisticsReceiver currentGetReceiver;
+ private Continuation currentGetContinuation;
- @UnionCase(171)
- public static class ResponseEndMessage implements GnunetMessage.Body {
- // empty
- }
-
- private final static int SETFLAG_PERSIST = 2;
-
- @UnionCase(168)
- public static class SetMessage implements GnunetMessage.Body {
- @UInt32
- public int flags;
- @UInt64
- public long value;
- @ZeroTerminatedString
- public String subsystemName;
- @ZeroTerminatedString
- public String statisticName;
- }
-
-
- private abstract class StatisticsRequest implements Cancelable,
MessageTransmitter {
+ /**
+ * A request to the statistics service.
+ */
+ private abstract class StatisticsRequest implements Request {
public String name;
public String subsystem;
public AbsoluteTime deadline;
}
-
- private class StatisticsGetRequest extends StatisticsRequest implements
Cancelable {
+ private class StatisticsGetRequest extends StatisticsRequest {
public StatisticsReceiver receiver;
public Cancelable receiveHandle;
- public class GetResponseHandler extends RunaboutMessageReceiver {
- public void visit(ResponseValueMessage m) {
- receiver.onReceive(m.subsystemName, m.statisticName, m.value);
- client.receive(deadline.getRemaining(), this);
- }
-
- public void visit(ResponseEndMessage m) {
- receiver.onDone();
- }
-
- @Override
- public void handleError() {
- logger.error("unable to read from statistics service");
- }
- }
-
@Override
public void cancel() {
- requests.remove(this);
- if (receiveHandle != null) {
- receiveHandle.cancel();
- }
+ currentGetReceiver = null;
}
@Override
public void transmit(Connection.MessageSink sink) {
- currentTransmit = null;
- if (sink == null) {
- logger.error("unable to connect to statistics service");
- return;
- }
RequestMessage rm = new RequestMessage();
rm.statisticsName = name;
rm.subsystemName = subsystem;
sink.send(rm);
-
- receiveHandle = client.receive(deadline.getRemaining(), new
GetResponseHandler());
-
- handleNextRequest();
}
@Override
public void handleError() {
throw new RuntimeException("unexpected transmit error");
}
+
+ @Override
+ public AbsoluteTime getDeadline() {
+ return deadline;
+ }
+
+ @Override
+ public boolean onDestroy() {
+ return false;
+ }
+
+ @Override
+ public boolean onReconnect() {
+ return true;
+ }
}
private class StatisticsPutRequest extends StatisticsRequest implements
Cancelable {
@@ -160,96 +93,105 @@
@Override
public void cancel() {
- requests.remove(this);
+ // do nothing
}
@Override
public void transmit(Connection.MessageSink sink) {
- currentTransmit = null;
-
SetMessage sm = new SetMessage();
sm.statisticName = name;
sm.subsystemName = subsystem;
sm.value = value;
sm.flags = flags;
sink.send(sm);
-
- handleNextRequest();
}
@Override
public void handleError() {
throw new RuntimeException("unexpected transmit error");
}
+
+ @Override
+ public AbsoluteTime getDeadline() {
+ return deadline;
+ }
+
+ @Override
+ public boolean onDestroy() {
+ return true;
+ }
+
+ @Override
+ public boolean onReconnect() {
+ return true;
+ }
}
- public Statistics(Configuration cfg) {
- client = new Client("statistics", cfg);
+
+ public class StatisticsMessageReceiver extends RunaboutMessageReceiver {
+ public void visit(ResponseValueMessage m) {
+ currentGetReceiver.onReceive(m.subsystemName, m.statisticName,
m.value);
+ }
+
+ public void visit(ResponseEndMessage m) {
+ if (currentGetContinuation != null) {
+ currentGetContinuation.onDone();
+ }
+ }
+
+ @Override
+ public void handleError() {
+ }
}
public interface StatisticsReceiver {
public void onReceive(String subsystem, String name, long value);
+ }
- public void onTimeout();
-
+ public interface Continuation {
public void onDone();
}
+ public Statistics(Configuration cfg) {
+ client = new Client("statistics", cfg);
+ requestQueue = new RequestQueue(this.client, new
StatisticsMessageReceiver());
+ }
+
/**
* Retrieve values from statistics.
*
+ *
* @param timeout time after we give up and call receiver.onTimeout
- * @param subsystem name of the subsystem the statistics value belongs to
+ * @param subsystem the subsystem of interest
* @param name name of the statistics value belongs to
* @param receiver callback
+ * @param continuation
* @return handle to cancel the request
*/
public Cancelable get(RelativeTime timeout, final String subsystem, final
String name,
- final StatisticsReceiver receiver) {
+ final StatisticsReceiver receiver, Continuation
continuation) {
+ if (currentGetReceiver != null) {
+ throw new AssertionError("only one Statistics get request can be
active at a time");
+ }
+ currentGetReceiver = receiver;
+ currentGetContinuation = continuation;
+
final StatisticsGetRequest getRequest = new StatisticsGetRequest();
getRequest.deadline = timeout.toAbsolute();
- getRequest.name = name;
- getRequest.subsystem = subsystem;
+ getRequest.name = name == null ? "" : name;
+ getRequest.subsystem = subsystem == null ? "" : name;
getRequest.receiver = receiver;
- requests.add(getRequest);
-
- handleNextRequest();
-
- return getRequest;
+ return requestQueue.add(getRequest);
}
- /**
- * todo: should this be abstracted into a common request queue?
- */
- private void handleNextRequest() {
- if (currentTransmit != null) {
- return;
- }
- StatisticsRequest request = requests.poll();
- if (request == null) {
- if (disconnectRequested) {
- client.disconnect();
- }
- return;
- }
- logger.debug("handling request");
- currentTransmit =
client.notifyTransmitReady(request.deadline.getRemaining(), true, 0, request);
- }
-
- public Cancelable get(RelativeTime timeout, StatisticsReceiver srh) {
- return get(timeout, "", "", srh);
- }
-
-
/**
* Sets a statistics value asynchronously.
*
- * @param subsystem subsystem of the entry
- * @param name name of the entry
- * @param value desired value
+ * @param name name of the entry
+ * @param value desired value
* @param persist keep value even if the statistics service restarts
* @return a handle to cancel the request
*/
@@ -261,34 +203,26 @@
putRequest.value = value;
putRequest.flags = persist ? SETFLAG_PERSIST : 0;
- requests.add(putRequest);
+ return requestQueue.add(putRequest);
+ }
- handleNextRequest();
-
- return putRequest;
+ public Cancelable watch(final String subsystem, final String name,
StatisticsReceiver receiver) {
+ return null;
}
/**
- * Destroy handle to the statistics service.
- *
- * @param putPending wait until all pending put requests have been
submitted or timed out
+ * Destroy handle to the statistics service. Always finishes writing
pending values.
*/
- public void destroy(boolean putPending) {
- if (putPending) {
- // throw away all get requests!
- Iterator<StatisticsRequest> it = requests.listIterator();
- while (it.hasNext()) {
- StatisticsRequest r = it.next();
- if (r instanceof StatisticsGetRequest) {
- it.remove();
- }
- }
- disconnectRequested = true;
- } else {
- client.disconnect();
- }
+ public void destroy() {
+ requestQueue.destroy();
}
+
+ /**
+ * Statistics utility entry point
+ *
+ * @param args command line arguments
+ */
public static void main(String[] args) {
new Program(args) {
@Option(
@@ -329,26 +263,22 @@
return;
}
statistics.set(subsystemName, statisticsName, value,
false);
- statistics.destroy(true);
+ statistics.destroy();
} else {
if (unprocessedArgs.length == 0) {
- statistics.get(RelativeTime.SECOND, subsystemName,
statisticsName, new StatisticsReceiver() {
- @Override
- public void onReceive(String subsystem, String
name, long value) {
- System.out.println(subsystem + "(" + name + ")
= " + value);
- }
-
- @Override
- public void onTimeout() {
- logger.error("timeout while getting
statistics");
- statistics.destroy(false);
- }
-
- @Override
- public void onDone() {
- statistics.destroy(false);
- }
- });
+ statistics.get(RelativeTime.SECOND, subsystemName,
statisticsName,
+ new StatisticsReceiver() {
+ @Override
+ public void onReceive(String subsystem,
String name, long value) {
+ System.out.println(subsystem + "(" +
name + ") = " + value);
+ }
+ },
+ new Continuation() {
+ @Override
+ public void onDone() {
+ statistics.destroy();
+ }
+ });
} else {
System.err.println("dumping statistics does not take
any positional parameters");
}
Modified: gnunet-java/src/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Client.java 2012-05-03 11:36:52 UTC (rev
21244)
+++ gnunet-java/src/org/gnunet/util/Client.java 2012-05-03 11:54:32 UTC (rev
21245)
@@ -46,7 +46,7 @@
* Initial value for connectBackoff.
*
*/
- private final RelativeTime INITAL_BACKOFF = RelativeTime.MILLISECOND;
+ private final RelativeTime INITAL_BACKOFF =
RelativeTime.MILLISECOND.multiply(20);
/**
* Maximum value for connectBackoff.
@@ -54,7 +54,6 @@
*/
private final RelativeTime MAX_BACKOFF = RelativeTime.SECOND.multiply(5);
-
/**
* The time to wait after an error occured while connecting.
* Every time an error occurs while connecting, this value is doubled
until its maximum
Modified: gnunet-java/src/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Connection.java 2012-05-03 11:36:52 UTC
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Connection.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -76,7 +76,7 @@
private class AddressProbe {
- Scheduler.TaskIdentifier connectTask;
+ Cancelable connectTask;
SocketChannel channel;
}
@@ -108,7 +108,7 @@
private MessageReceiver receiver;
private RelativeTime timeout;
private GnunetMessage.Header msgh = null;
- private Scheduler.TaskIdentifier recvTask = null;
+ private Scheduler.TaskConfiguration recvTask = null;
private boolean finished = false;
// is this receiver actively working? if not, the connection process
has to kick off the receiver
// (or select behaves badly)
@@ -208,22 +208,22 @@
private class TransmitHelper implements Scheduler.Task, MessageSink {
private final MessageTransmitter transmitter;
- private Scheduler.TaskIdentifier notifyTimeoutTask;
+ private Cancelable notifyTimeoutTask;
- private Scheduler.TaskIdentifier transmitTask = null;
+ private Cancelable transmitTask = null;
public TransmitHelper(final MessageTransmitter transmitter,
RelativeTime notifyTimeout) {
this.transmitter = transmitter;
- Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
- b.withTask(new Scheduler.Task() {
- @Override
- public void run(Scheduler.RunContext ctx) {
- transmitter.handleError();
- }
- });
- b.withTimeout(notifyTimeout);
- notifyTimeoutTask = Scheduler.add(b);
+ Scheduler.TaskConfiguration tc = new
Scheduler.TaskConfiguration(notifyTimeout,
+ new Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ transmitter.handleError();
+ }
+ });
+
+ notifyTimeoutTask = tc.schedule();
}
public boolean notifyDone() {
@@ -280,9 +280,9 @@
// timeout is forever, because there is no way to directly limit
the transmission time
// of a message, only the max. wait time before transmission.
// cancel must be called on the transmitTask if we disconnect
- Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
-
b.withTimeout(RelativeTime.FOREVER).withSelectWrite(connectionChannel).withTask(this);
- this.transmitTask = Scheduler.add(b);
+ Scheduler.TaskConfiguration tc = new
Scheduler.TaskConfiguration(RelativeTime.FOREVER, this);
+ tc.selectWrite(connectionChannel);
+ this.transmitTask = tc.schedule();
}
@Override
@@ -345,19 +345,20 @@
final AddressProbe addressProbe = new AddressProbe();
addressProbe.channel = channel;
- Scheduler.TaskBuilder builder = new Scheduler.TaskBuilder();
- builder.withSelectConnect(channel);
- builder.withTask(new Scheduler.Task() {
- @Override
- public void run(Scheduler.RunContext ctx) {
- addressProbe.connectTask = null;
- if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
- return;
- }
- Connection.this.finishConnect(channel);
- }
- });
- addressProbe.connectTask = Scheduler.add(builder);
+ Scheduler.TaskConfiguration tc = new
Scheduler.TaskConfiguration(RelativeTime.FOREVER,
+ new Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ addressProbe.connectTask = null;
+ if
(ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
+ return;
+ }
+ Connection.this.finishConnect(channel);
+ }
+ });
+ tc.selectConnect(channel);
+
+ addressProbe.connectTask = tc.schedule();
}
@Override
@@ -381,12 +382,12 @@
addressProbe.connectTask.cancel();
}
}
- logger.debug("client successfully connected");
} catch (IOException e) {
logger.debug("finishConnect() was not successful: {}", (Object) e);
return;
}
if (connected) {
+ logger.debug("client successfully connected to " +
channel.toString());
if (currentTransmitHelper != null) {
currentTransmitHelper.start();
}
@@ -501,6 +502,7 @@
* Disconnect. Cancel pending receive/transmit requests.
*/
public void disconnect() {
+ logger.debug(""+this+".disconnect()");
if (nextTransmitHelper != null) {
nextTransmitHelper.cancel();
nextTransmitHelper = null;
@@ -514,11 +516,19 @@
resolveHandle.cancel();
resolveHandle = null;
}
-
if (connectHandle != null) {
connectHandle.cancel();
connectHandle = null;
}
+ if (connectionChannel != null) {
+ try {
+ connectionChannel.socket().close();
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+ connectionChannel = null;
+ }
+
}
@Override
Modified: gnunet-java/src/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Resolver.java 2012-05-03 11:36:52 UTC
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Resolver.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -1,6 +1,5 @@
package org.gnunet.util;
-
import org.gnunet.construct.*;
import org.gnunet.construct.ProtocolViolation;
import org.gnunet.util.getopt.Option;
@@ -14,7 +13,6 @@
import java.net.UnknownHostException;
import java.util.LinkedList;
-
/**
* Resolve hostnames asynchronously, using the gnunet resolver service if
necessary.
* <p/>
@@ -261,6 +259,7 @@
rh.cb = cb;
// try if hostname is numeric IP or loopback
if (hostname.equalsIgnoreCase("localhost")) {
+ logger.debug("resolving address locally");
Scheduler.add(new Scheduler.Task() {
@Override
public void run(Scheduler.RunContext ctx) {
Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java 2012-05-03 11:36:52 UTC
(rev 21244)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java 2012-05-03 11:54:32 UTC
(rev 21245)
@@ -38,6 +38,42 @@
private static final Logger logger = LoggerFactory
.getLogger(Scheduler.class);
+ // only valid while a task is executing
+ private static TaskConfiguration activeTask = null;
+
+ // number of tasks in the ready queue
+ private static int readyCount = 0;
+
+ // for every priority, there is a list of tasks that is definitely ready
to run
+ final private static ArrayList<LinkedList<TaskConfiguration>> ready = new
ArrayList<LinkedList<TaskConfiguration>>
+ (Priority.size);
+
+ static {
+ for (int i = 0; i < Priority.size; ++i) {
+ ready.add(new LinkedList<TaskConfiguration>());
+ }
+ }
+
+ public static final int EVENT_READ = 0, EVENT_WRITE = 1, EVENT_ACCEPT = 2,
EVENT_CONNECT = 3;
+ public static final int[] eventToInterestOp = new
int[]{SelectionKey.OP_READ, SelectionKey.OP_WRITE,
+ SelectionKey.OP_ACCEPT, SelectionKey.OP_CONNECT};
+ public static final Reason[] eventToReason = new
Reason[]{Reason.READ_READY, Reason.WRITE_READY,
+ Reason.ACCEPT_READY, Reason.CONNECT_READY};
+
+
+ private static Selector selector = null;
+
+ static {
+ try {
+ selector = SelectorProvider.provider().openSelector();
+ } catch (final IOException e) {
+ // what to do here?
+ logger.error("fatal: cannot create selector");
+ System.exit(-1);
+ }
+ }
+
+
public enum Priority {
IDLE, BACKGROUND, DEFAULT, HIGH, UI, URGENT, SHUTDOWN;
private static final int size = Priority.values().length;
@@ -72,85 +108,39 @@
public void run(RunContext ctx);
}
- private static class Subscribers {
- TaskIdentifier readSubscriber;
- TaskIdentifier writeSubscriber;
- TaskIdentifier connectSubscriber;
- TaskIdentifier acceptSubscriber;
- }
-
/**
* A TaskIdentifier represents a Task that will execute or has already
been executed.
*/
- public static class TaskIdentifier implements Comparable<TaskIdentifier>,
Cancelable {
+ public static class TaskConfiguration implements Cancelable {
private final Task task;
private RunContext ctx = new RunContext();
- private boolean lifeness;
- private final Priority priority;
+ private boolean lifeness = true;
+ private Priority priority;
private final AbsoluteTime deadline;
- private ArrayList<SelectableChannel> rs = null;
- private ArrayList<SelectableChannel> ws = null;
- private ArrayList<SelectableChannel> cs = null;
- private ArrayList<SelectableChannel> as = null;
+ private ArrayList<SelectableChannel> eventChannels = null;
+ private ArrayList<Integer> eventTypes = null;
- TaskIdentifier(Task t, Priority priority,
- boolean liveness, RelativeTime timeout,
- Set<SelectableChannel> rs, Set<SelectableChannel> ws,
Set<SelectableChannel> cs) {
- this.task = t;
+ private boolean finished = false;
- if (priority == null) {
- this.priority = (activeTask == null) ? Priority.DEFAULT :
activeTask.priority;
- } else {
- this.priority = priority;
- }
-
- this.lifeness = liveness;
-
- if (timeout.getMilliseconds() < 0) {
- throw new AssertionError("timeout must be (>=0)");
- }
-
- this.deadline = timeout.toAbsolute();
-
- if (rs != null) {
- for (SelectableChannel sc : rs) {
- registerSelect(sc, SelectionKey.OP_READ);
- }
- }
- if (ws != null) {
- for (SelectableChannel sc : ws) {
- registerSelect(sc, SelectionKey.OP_WRITE);
- }
- }
- if (cs != null) {
- for (SelectableChannel sc : cs) {
- registerSelect(sc, SelectionKey.OP_CONNECT);
- }
- }
- if (as != null) {
- for (SelectableChannel sc : as) {
- registerSelect(sc, SelectionKey.OP_ACCEPT);
- }
- }
- this.ws = new ArrayList<SelectableChannel>(ws == null ?
Collections.EMPTY_SET : ws);
- this.rs = new ArrayList<SelectableChannel>(rs == null ?
Collections.EMPTY_SET : rs);
- this.cs = new ArrayList<SelectableChannel>(cs == null ?
Collections.EMPTY_SET : cs);
- this.as = new ArrayList<SelectableChannel>(as == null ?
Collections.EMPTY_SET : as);
-
- // todo: assertions for validity of this task
-
- pending.add(this);
+ /**
+ * Create a TaskIdentifier.
+ *
+ * @param task
+ */
+ TaskConfiguration(RelativeTime delay, Task task) {
+ this.task = task;
+ this.deadline = delay.toAbsolute();
}
/**
- * Create a light-weight task identifier that is not registerd as
pending in the Scheduler,
+ * Create a light-weight task identifier that is not registered as
pending in the Scheduler,
* used for continuations.
*
- * @param t task
+ * @param t task
* @param ctx the RunContext
*/
- TaskIdentifier(Task t, RunContext ctx) {
+ TaskConfiguration(Task t, RunContext ctx) {
this.task = t;
this.ctx = ctx;
this.deadline = AbsoluteTime.ZERO;
@@ -158,240 +148,112 @@
this.lifeness = true;
}
- private void run() {
- TaskIdentifier old = activeTask;
- activeTask = this;
- task.run(ctx);
- activeTask = old;
- }
+ private void addChannelEvent(SelectableChannel channel, int eventType)
{
+ if (eventChannels == null) {
+ eventChannels = new ArrayList<SelectableChannel>();
+ eventTypes = new ArrayList<Integer>();
+ }
+ eventChannels.add(channel);
+ eventTypes.add(eventType);
- public Task getTask() {
- return task;
- }
+ int interestOp = eventToInterestOp[eventType];
- public void cancel() {
- pending.remove(this);
- }
-
- public boolean getLifeness() {
- return lifeness;
- }
-
- @Override
- public int compareTo(TaskIdentifier other) {
- return this.deadline.compareTo(other.deadline);
- }
-
- private void registerSelect(SelectableChannel sc, int op) {
- SelectionKey key = sc.keyFor(selector);
- Subscribers subscribers;
-
+ SelectionKey key = channel.keyFor(selector);
if (key == null || !key.isValid()) {
- subscribers = new Subscribers();
try {
- sc.register(selector, op, subscribers);
+ key = channel.register(selector, interestOp, new
TaskConfiguration[4]);
} catch (ClosedChannelException e) {
throw new IOError(e);
}
} else {
- subscribers = (Subscribers) key.attachment();
- key.interestOps(key.interestOps() | op);
- }
- if ((op & SelectionKey.OP_READ) != 0) {
- if (subscribers.readSubscriber != null) {
- throw new AssertionError("only one task can wait for a
specific event at the same time");
+ if ((key.interestOps() & interestOp) != 0) {
+ throw new AssertionError("interest op registered twice");
}
- subscribers.readSubscriber = this;
+ key.interestOps(key.interestOps() | interestOp);
}
- if ((op & SelectionKey.OP_WRITE) != 0) {
- if (subscribers.writeSubscriber != null) {
- throw new AssertionError("only one task can wait for a
specific event at the same time");
- }
- subscribers.writeSubscriber = this;
+
+ TaskConfiguration[] subscribers = (TaskConfiguration[])
key.attachment();
+ if (subscribers[eventType] != null) {
+ throw new AssertionError("subscriber registered twice");
}
- if ((op & SelectionKey.OP_CONNECT) != 0) {
- if (subscribers.connectSubscriber != null) {
- throw new AssertionError("only one task can wait for a
specific event at the same time");
- }
- subscribers.connectSubscriber = this;
- }
- if ((op & SelectionKey.OP_ACCEPT) != 0) {
- if (subscribers.acceptSubscriber != null) {
- throw new AssertionError("only one task can wait for a
specific event at the same time");
- }
- subscribers.acceptSubscriber = this;
- }
+ subscribers[eventType] = this;
- if (subscribers.connectSubscriber != null &&
subscribers.readSubscriber != null) {
+ if (subscribers[EVENT_CONNECT] != null && subscribers[EVENT_READ]
!= null) {
throw new AssertionError("OP_CONNECT and OP_READ are
incompatible in java");
}
}
- private void deregisterOne(SelectableChannel sc, int op) {
- SelectionKey key = sc.keyFor(selector);
- if (key == null) {
- throw new AssertionError();
+ private void run() {
+ if (finished) {
+ throw new AssertionError("same task ran twice");
}
- Subscribers subscribers = (Subscribers) key.attachment();
- if (subscribers == null) {
- throw new AssertionError();
+ TaskConfiguration old = activeTask;
+ activeTask = this;
+ task.run(ctx);
+ finished = true;
+ activeTask = old;
+ }
+
+ public void cancel() {
+ if (!pending.contains(this)) {
+ throw new AssertionError("canceling task that is not
scheduled");
}
- if ((op & SelectionKey.OP_READ) != 0) {
- if (subscribers.readSubscriber == null) {
- throw new AssertionError();
+ pending.remove(this);
+ logger.debug(""+ pending.size() + "tasks remaining");
+ }
+
+ public Cancelable schedule() {
+ if (priority == null) {
+ if (activeTask != null) {
+ priority = activeTask.priority;
+ } else {
+ priority = Priority.DEFAULT;
}
- key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
- subscribers.readSubscriber = null;
}
- if ((op & SelectionKey.OP_WRITE) != 0) {
- if (subscribers.writeSubscriber == null) {
- throw new AssertionError();
- }
- subscribers.writeSubscriber = null;
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- if ((op & SelectionKey.OP_CONNECT) != 0) {
- if (subscribers.connectSubscriber == null) {
- throw new AssertionError();
- }
- subscribers.connectSubscriber = null;
- key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
- }
- if ((op & SelectionKey.OP_ACCEPT) != 0) {
- if (subscribers.acceptSubscriber == null) {
- throw new AssertionError();
- }
- subscribers.acceptSubscriber = null;
- key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
- }
+ pending.add(this);
+ return this;
}
private void deregister() {
- if (this.rs != null) {
- for (SelectableChannel sc : this.rs) {
- deregisterOne(sc, SelectionKey.OP_READ);
- }
+ if (eventChannels == null) {
+ return;
}
- if (this.ws != null) {
- for (SelectableChannel sc : this.ws) {
- deregisterOne(sc, SelectionKey.OP_WRITE);
+ for (int i = 0; i < eventChannels.size(); ++i) {
+ SelectionKey key = eventChannels.get(i).keyFor(selector);
+ TaskConfiguration[] subscribers = (TaskConfiguration[])
key.attachment();
+ int interestOp = eventToInterestOp[eventTypes.get(i)];
+ if (subscribers[eventTypes.get(i)] == null ||
(key.interestOps() | interestOp) == 0) {
+ throw new AssertionError("deregistering event that has not
been registered");
}
+ subscribers[eventTypes.get(i)] = null;
+ key.interestOps(key.interestOps() & (~interestOp));
}
- if (this.cs != null) {
- for (SelectableChannel sc : this.cs) {
- deregisterOne(sc, SelectionKey.OP_CONNECT);
- }
- }
- if (this.as != null) {
- for (SelectableChannel sc : this.as) {
- deregisterOne(sc, SelectionKey.OP_ACCEPT);
- }
- }
}
- }
-
- public static class TaskBuilder {
- private Task task = null;
- private boolean lifeness = true;
- private Priority prio = null;
- private RelativeTime timeout = RelativeTime.ZERO;
- private Set<SelectableChannel> rs = null, ws = null, cs = null, as =
null;
-
- public TaskBuilder withLifeness(boolean lifeness) {
- this.lifeness = lifeness;
- return this;
+ public void selectRead(SocketChannel channel) {
+ addChannelEvent(channel, EVENT_READ);
}
-
- public TaskBuilder withPriority(Priority prio) {
- this.prio = prio;
- return this;
+ public void selectWrite(SocketChannel channel) {
+ addChannelEvent(channel, EVENT_WRITE);
}
-
- public TaskBuilder withTask(Task task) {
- this.task = task;
- return this;
+ public void selectConnect(SocketChannel channel) {
+ addChannelEvent(channel, EVENT_CONNECT);
}
-
- public TaskBuilder withTimeout(RelativeTime timeout) {
- this.timeout = timeout;
- return this;
+ public void selectAccept(SocketChannel channel) {
+ addChannelEvent(channel, EVENT_ACCEPT);
}
-
- public TaskBuilder withSelectRead(SelectableChannel c) {
- this.rs = Collections.singleton(c);
- return this;
- }
-
-
- public TaskBuilder withSelectReadSet(Set<SelectableChannel> cs) {
- this.rs = cs;
- return this;
- }
-
- public TaskBuilder withSelectWrite(SelectableChannel c) {
- this.ws = Collections.singleton(c);
- return this;
- }
-
- public TaskBuilder withSelectWriteSet(Set<SelectableChannel> cs) {
- this.ws = cs;
- return this;
- }
-
- public TaskBuilder withSelectConnect(SelectableChannel c) {
- this.cs = Collections.singleton(c);
- return this;
- }
-
- public TaskBuilder withSelectAccept(SelectableChannel c) {
- this.as = Collections.singleton(c);
- return this;
- }
-
- public TaskBuilder withSelectConnectSet(Set<SelectableChannel> cs) {
- this.cs = cs;
- return this;
- }
-
- private TaskIdentifier build() {
- return new TaskIdentifier(task, prio, lifeness, timeout, rs, ws,
cs);
- }
}
-
- // tasks that are waiting for an event, which are executed anyway after
the deadline has occured
- final private static Queue<TaskIdentifier> pending = new
PriorityQueue<TaskIdentifier>();
-
- // only valid while a task is executing
- private static TaskIdentifier activeTask = null;
-
- // number of tasks in the ready queue
- private static int readyCount = 0;
-
- // for every priority, there is a list of tasks that is definitely ready
to run
- final private static ArrayList<LinkedList<TaskIdentifier>> ready = new
ArrayList<LinkedList<TaskIdentifier>>
- (Priority.size);
-
- static {
- for (int i = 0; i < Priority.size; ++i) {
- ready.add(new LinkedList<TaskIdentifier>());
+ // tasks that are waiting for an event, which are executed anyway after
the deadline has occurred
+ final private static Queue<TaskConfiguration> pending = new
PriorityQueue<TaskConfiguration>(5, new Comparator
+ <TaskConfiguration>() {
+ @Override
+ public int compare(TaskConfiguration a, TaskConfiguration b) {
+ return a.deadline.compareTo(b.deadline);
}
- }
+ });
- private static Selector selector = null;
- static {
- try {
- selector = SelectorProvider.provider().openSelector();
- } catch (final IOException e) {
- // what to do here?
- logger.error("fatal: cannot create selector");
- System.exit(-1);
- }
- }
-
-
public static boolean getCurrentLifeness() {
return (activeTask == null) || activeTask.lifeness;
}
@@ -404,15 +266,9 @@
EnumSet<Reason> reasons) {
RunContext ctx = new RunContext();
ctx.reasons = reasons;
- queueReady(new TaskIdentifier(task, ctx));
+ queueReady(new TaskConfiguration(task, ctx));
}
-
- public static TaskIdentifier add(TaskBuilder builder) {
- return builder.build();
- }
-
-
/**
* Schedule a new task to be run as soon as possible. The task will be run
* with the priority of the calling task.
@@ -421,9 +277,8 @@
* @return unique task identifier for the job only valid until "task" is
* started!
*/
- public static TaskIdentifier add(Task task) {
- return addSelect(Priority.KEEP, RelativeTime.ZERO, null, null,
- task);
+ public static Cancelable add(Task task) {
+ return addDelayed(RelativeTime.ZERO, task);
}
@@ -431,58 +286,31 @@
* Add a task to run after the specified delay.
*
* @param delay time to wait until running the task
- * @param task the task to run after delay
+ * @param task the task to run after delay
* @return the TaskIdentifier, can be used to cancel the task until it has
been executed.
*/
- public static TaskIdentifier addDelayed(RelativeTime delay, Task task) {
- return addSelect(Priority.KEEP, delay, null, null, task);
+ public static TaskConfiguration addDelayed(RelativeTime delay, Task task) {
+ TaskConfiguration tid = new TaskConfiguration(delay, task);
+ tid.schedule();
+ return tid;
}
-
- /**
- * Schedule a new task to be run with a specified delay or when any of
- * the specified file descriptor sets is ready. The delay can be used
- * as a deadline on the socket(s) being ready. The task will be
- * scheduled for execution once either the delay has expired or any of
- * the socket operations is ready. This is the most general
- * function of the "add" family. Note that the "prerequisite_task"
- * must be satisfied in addition to any of the other conditions.
- *
- * @param p how important is this task?
- * @param delay how long should we wait? Use
GNUNET_TIME_UNIT_FOREVER_REL for "forever",
- * which means that the task will only be run
after we receive SIGTERM
- * @param rs set of file descriptors we want to read (can
be NULL)
- * @param ws set of file descriptors we want to write (can
be NULL)
- * @param t The Task to run
- * @return unique task identifier for the job
- * only valid until "task" is started!
- */
- public static TaskIdentifier addSelect(Priority p, RelativeTime delay,
- Set<SelectableChannel> rs,
Set<SelectableChannel> ws,
- Task t) {
-
- return new TaskIdentifier(t, p, getCurrentLifeness(), delay, rs, ws,
null);
+ public static TaskConfiguration addRead(RelativeTime timeout,
+ SelectableChannel chan, Task task) {
+ TaskConfiguration tid = new TaskConfiguration(timeout, task);
+ tid.addChannelEvent(chan, EVENT_READ);
+ tid.schedule();
+ return tid;
}
- public static TaskIdentifier addRead(RelativeTime timeout,
- SelectableChannel chan, Task t) {
- return Scheduler.addSelect(Priority.KEEP, timeout,
- Collections.singleton(chan), null, t);
+ public static TaskConfiguration addWrite(RelativeTime timeout,
+ SelectableChannel chan, Task task) {
+ TaskConfiguration tid = new TaskConfiguration(timeout, task);
+ tid.addChannelEvent(chan, EVENT_WRITE);
+ tid.schedule();
+ return tid;
}
- public static TaskIdentifier addWrite(RelativeTime timeout,
- SelectableChannel chan, Task t) {
- logger.debug("scheduling write");
- return Scheduler.addSelect(Priority.KEEP, timeout, null,
- Collections.singleton(chan), t);
- }
-
- public static TaskIdentifier addWithPriority(Priority prio,
- Task t) {
- return addSelect(prio, RelativeTime.ZERO, null, null, t);
- }
-
-
/**
* Check if the system is still life. Trigger disconnect if we have tasks,
but
* none of them give us lifeness.
@@ -493,7 +321,7 @@
if (readyCount > 0) {
return true;
}
- for (TaskIdentifier t : pending) {
+ for (TaskConfiguration t : pending) {
if (t.lifeness) {
return true;
}
@@ -514,7 +342,7 @@
*
* @param tid TaskIdentifier of the ready task
*/
- private static void queueReady(TaskIdentifier tid) {
+ private static void queueReady(TaskConfiguration tid) {
int idx = tid.priority.ordinal();
ready.get(idx).add(tid);
readyCount++;
@@ -533,7 +361,7 @@
// check if any timeouts occured
while (true) {
- TaskIdentifier t = pending.peek();
+ TaskConfiguration t = pending.peek();
if (t == null) {
break;
}
@@ -551,60 +379,40 @@
return timeout;
}
+ private static void addSubscriberTask(Collection<TaskConfiguration>
executableTasks,
+ TaskConfiguration[] subscribers, int
eventType) {
+ if (subscribers[eventType] == null)
+ return;
+ executableTasks.add(subscribers[eventType]);
+ subscribers[eventType].ctx.reasons.add(eventToReason[eventType]);
+ }
+
private static void handleSelect(RelativeTime timeout) {
try {
- // selector.select(0) would block indefinitely (conterintuitive,
java's fault)
+ // selector.select(0) would block indefinitely (counter-intuitive,
java's fault)
if (timeout.getMilliseconds() == 0) {
selector.selectNow();
} else if (timeout.isForever()) {
selector.select(0);
} else {
- //logger.debug("starting to select with deadline");
selector.select(timeout.getMilliseconds());
- //logger.debug("select with deadline ended");
}
} catch (IOException e) {
throw new IOError(e);
}
// we have to do this so we don't execute any task twice
- // todo: alternative: mark TaskIdentifier as executed
- Collection<TaskIdentifier> executableTasks = new
TreeSet<TaskIdentifier>();
+ Collection<TaskConfiguration> executableTasks = new
HashSet<TaskConfiguration>();
for (SelectionKey sk : selector.selectedKeys()) {
- Subscribers ss = (Subscribers) sk.attachment();
+ TaskConfiguration[] subscribers = (TaskConfiguration[])
sk.attachment();
- Channel c = sk.channel();
+ if (sk.isReadable()) addSubscriberTask(executableTasks,
subscribers, EVENT_READ);
+ if (sk.isWritable()) addSubscriberTask(executableTasks,
subscribers, EVENT_WRITE);
+ if (sk.isAcceptable()) addSubscriberTask(executableTasks,
subscribers, EVENT_ACCEPT);
+ if (sk.isConnectable()) addSubscriberTask(executableTasks,
subscribers, EVENT_CONNECT);
- if (sk.isReadable()) {
- if (ss.readSubscriber == null) {
- throw new AssertionError("event fired, but not registered
before");
- }
- executableTasks.add(ss.readSubscriber);
- ss.readSubscriber.ctx.reasons.add(Reason.READ_READY);
- }
- if (sk.isWritable()) {
- if (ss.writeSubscriber == null) {
- throw new AssertionError("event fired, but not registered
before");
- }
- executableTasks.add(ss.writeSubscriber);
- ss.writeSubscriber.ctx.reasons.add(Reason.WRITE_READY);
- }
- if (sk.isConnectable()) {
- if (ss.connectSubscriber == null) {
- throw new AssertionError("event fired, but not registered
before");
- }
- executableTasks.add(ss.connectSubscriber);
- ss.connectSubscriber.ctx.reasons.add(Reason.CONNECT_READY);
- }
- if (sk.isAcceptable()) {
- if (ss.acceptSubscriber == null) {
- throw new AssertionError("event fired, but not registered
before");
- }
- executableTasks.add(ss.acceptSubscriber);
- ss.acceptSubscriber.ctx.reasons.add(Reason.ACCEPT_READY);
- }
}
- for (TaskIdentifier tt : executableTasks) {
+ for (TaskConfiguration tt : executableTasks) {
// cancel subscriptions to other events, we can execute now!
tt.deregister();
queueReady(tt);
@@ -614,26 +422,23 @@
/**
* Initialize and run scheduler. This function will return when all tasks
- * have completed. On systems with signals, receiving a SIGTERM (and other
- * similar signals) will cause "GNUNET_SCHEDULER_shutdown" to be run after
- * the active task is complete. As a result, SIGTERM causes all active
tasks
- * to be scheduled with reason "GNUNET_SCHEDULER_REASON_SHUTDOWN".
(However,
- * tasks added afterwards will execute normally!).
+ * have completed.
*
- * @param task task to run immediately
+ * @param initialTask the initial task to run immediately
*/
- public static void run(Task task) {
- addContinuation(task, EnumSet.of(Reason.STARTUP));
+ public static void run(Task initialTask) {
+ addContinuation(initialTask, EnumSet.of(Reason.STARTUP));
+ // the gnunet main loop
while (checkLiveness()) {
-
RelativeTime nextTimeout = handleTimeouts();
- // don't select if there are no tasks
+ // don't select if there are no tasks; we are done!
if (readyCount == 0 && pending.isEmpty()) {
return;
}
+ // don't block in select if we have tasks ready to run!
if (readyCount > 0) {
handleSelect(RelativeTime.ZERO);
} else {
@@ -660,9 +465,9 @@
// start executing from the highest priority down to 0
for (int p = Priority.size - 1; p >= 0; p--) {
// execute all tasks with priority p
- LinkedList<TaskIdentifier> queue = ready.get(p);
+ LinkedList<TaskConfiguration> queue = ready.get(p);
while (!queue.isEmpty()) {
- TaskIdentifier tid = queue.removeFirst();
+ TaskConfiguration tid = queue.removeFirst();
readyCount--;
tid.run();
}
@@ -679,7 +484,7 @@
*/
public static void shutdown() {
// queueReady() while iterating would yield concurrent modification
exn otherwise
- for (TaskIdentifier tid : new ArrayList<TaskIdentifier>(pending)) {
+ for (TaskConfiguration tid : new
ArrayList<TaskConfiguration>(pending)) {
tid.ctx.reasons.add(Reason.SHUTDOWN);
queueReady(tid);
}
Modified: gnunet-java/src/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Server.java 2012-05-03 11:36:52 UTC (rev
21244)
+++ gnunet-java/src/org/gnunet/util/Server.java 2012-05-03 11:54:32 UTC (rev
21245)
@@ -124,6 +124,7 @@
* @param srv ...
*/
private void doAccept(final ServerSocketChannel srv) {
+ /*
Scheduler.TaskBuilder b = new Scheduler.TaskBuilder();
b.withTask(new Scheduler.Task() {
@Override
@@ -140,6 +141,7 @@
});
b.withSelectAccept(srv);
Scheduler.add(b);
+ */
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r21245 - in gnunet-java/src/org/gnunet: construct construct/parsers core dht statistics util,
gnunet <=