gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25845 - in gnunet-java/src/org/gnunet: mesh requests util


From: gnunet
Subject: [GNUnet-SVN] r25845 - in gnunet-java/src/org/gnunet: mesh requests util
Date: Mon, 21 Jan 2013 16:43:14 +0100

Author: dold
Date: 2013-01-21 16:43:13 +0100 (Mon, 21 Jan 2013)
New Revision: 25845

Added:
   gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
Modified:
   gnunet-java/src/org/gnunet/mesh/Mesh.java
   gnunet-java/src/org/gnunet/requests/Request.java
   gnunet-java/src/org/gnunet/util/Scheduler.java
Log:
fix


Modified: gnunet-java/src/org/gnunet/mesh/Mesh.java
===================================================================
--- gnunet-java/src/org/gnunet/mesh/Mesh.java   2013-01-21 15:36:00 UTC (rev 
25844)
+++ gnunet-java/src/org/gnunet/mesh/Mesh.java   2013-01-21 15:43:13 UTC (rev 
25845)
@@ -22,6 +22,7 @@
 
 import com.google.common.collect.Maps;
 import org.gnunet.construct.Construct;
+import org.gnunet.requests.FixedMessageRequest;
 import org.gnunet.requests.Request;
 import org.gnunet.requests.RequestQueue;
 import org.gnunet.util.*;
@@ -39,9 +40,17 @@
     private static final Logger logger = LoggerFactory
             .getLogger(Mesh.class);
 
+    /**
+     * How many messages can we send to the service until we have to wait for 
an ACK from it?
+     */
     private static final int INITIAL_WINDOW_SIZE = 8;
-
+    /**
+     * Requests queued to be sent to the mesh service.
+     */
     private RequestQueue requestQueue;
+    /**
+     * Called whenever a tunnel was destroyed.
+     */
     private TunnelEndHandler tunnelEndHandler;
     private MeshRunabout messageReceiver;
     private int[] applications;
@@ -62,52 +71,45 @@
         public DisconnectHandler disconnectHandler;
         public ConnectHandler connectHandler;
 
-        class ConnectByTypeRequest extends Request {
-            int appType;
-            @Override
-            public void transmit(Connection.MessageSink sink) {
-                ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage();
-                m.applicationType = appType;
-                m.tunnelId = tunnelId;
-                sink.send(m);
-            }
-        }
-
         public void addPeer(PeerIdentity peerIdentity) {
             throw new UnsupportedOperationException("not implemented");
         }
+
         /**
          * Request that the given peer isn't added to this tunnel in calls to
          * connect_by_* calls, (due to misbehaviour, bad performance, ...).
          *
          * @param peerIdentity peer identity of the peer which should be 
blacklisted
-         *                  for the tunnel.
+         *                     for the tunnel.
          */
         public void blacklist(PeerIdentity peerIdentity) {
             throw new UnsupportedOperationException("not implemented");
         }
+
         /**
          * Request that the given peer isn't blacklisted anymore from this 
tunnel,
          * and therefore can be added in future calls to connect*.
          * The peer must have been previously blacklisted for this tunnel.
          *
          * @param peerIdentity peer identity of the peer which shouldn't be 
blacklisted
-         *                  for the tunnel anymore.
+         *                     for the tunnel anymore.
          */
         public void unblacklist(PeerIdentity peerIdentity) {
             throw new UnsupportedOperationException("not implemented");
         }
+
         /**
          * Request that the mesh should try to connect to a peer supporting 
the given
          * message type.
          *
          * @param appType application type that must be supported by the peer
-         *                 (MESH should discover peer in proximity handling 
this type)
+         *                (MESH should discover peer in proximity handling 
this type)
          */
         public void requestConnectByType(int appType) {
-            ConnectByTypeRequest r = new ConnectByTypeRequest();
-            r.appType = appType;
-            requestQueue.add(r);
+            ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage();
+            m.applicationType = appType;
+            m.tunnelId = tunnelId;
+            requestQueue.add(new FixedMessageRequest(m));
         }
 
         /**
@@ -136,7 +138,7 @@
          *
          * @param peer peer to remove
          */
-        public void requestConnectDel (PeerIdentity peer) {
+        public void requestConnectDel(PeerIdentity peer) {
             throw new UnsupportedOperationException("not implemented");
         }
 
@@ -151,6 +153,7 @@
 
         static class Sink implements Connection.MessageSink {
             byte[] payload;
+
             @Override
             public void send(GnunetMessage.Body m) {
                 if (payload != null) {
@@ -217,14 +220,14 @@
          * Only one call can be active at any time, to issue another request,
          * wait for the callback or cancel the current request.
          *
-         * @param maxdelay how long can the message wait?
-         * @param target destination for the message
-         *               NULL for multicast to all tunnel targets
+         * @param maxdelay    how long can the message wait?
+         * @param target      destination for the message
+         *                    NULL for multicast to all tunnel targets
          * @param notify_size how many bytes of buffer space does notify want?
          * @param transmitter handler to call when buffer space is available;
-         *        will be called with NULL on timeout or if the overall queue
-         *        for this peer is larger than queue_size and this is currently
-         *        the message with the lowest priority
+         *                    will be called with NULL on timeout or if the 
overall queue
+         *                    for this peer is larger than queue_size and this 
is currently
+         *                    the message with the lowest priority
          * @return non-NULL if the notify callback was queued,
          *         NULL if we can not even queue the request (insufficient
          *         memory); if NULL is returned, "notify" will NOT be called.
@@ -395,11 +398,11 @@
     /**
      * Connect to the mesh service.
      *
-     * @param cfg configuration to use
+     * @param cfg                  configuration to use
      * @param inboundTunnelHandler function called when an *inbound* tunnel is 
created
-     * @param tunnelEndHandler function called when an *inbound* tunnel is 
destroyed by the
-     *                remote peer, it is *not* called if Tunnel.destroy
-     *                is called on the tunnel
+     * @param tunnelEndHandler     function called when an *inbound* tunnel is 
destroyed by the
+     *                             remote peer, it is *not* called if 
Tunnel.destroy
+     *                             is called on the tunnel
      */
     public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler,
                 TunnelEndHandler tunnelEndHandler, MeshRunabout 
messageReceiver, int... applications) {
@@ -418,8 +421,8 @@
      * Create a new tunnel (we're initiator and will be allowed to add/remove 
peers
      * and to broadcast).
      *
-     * @param connectHandler callback for when a new peer connects to the 
tunnel, either because the origin added him,
-     *                       or the client joined the tunnel
+     * @param connectHandler    callback for when a new peer connects to the 
tunnel, either because the origin added him,
+     *                          or the client joined the tunnel
      * @param disconnectHandler callback for when when a peer is disconnected
      */
     public OriginTunnel createTunnel(ConnectHandler connectHandler, 
DisconnectHandler disconnectHandler) {
@@ -436,7 +439,7 @@
     /**
      * Announce to ther peer the availability of services described by the 
regex,
      * in order to be reachable to other peers via connect_by_string.
-     *
+     * <p/>
      * Note that the first 8 characters are considered to be part of a prefix,
      * (for instance 'gnunet://'). If you put a variable part in there (*, +. 
()),
      * all matching strings will be stored in the DHT.

Added: gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
===================================================================
--- gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java                
                (rev 0)
+++ gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java        
2013-01-21 15:43:13 UTC (rev 25845)
@@ -0,0 +1,23 @@
+package org.gnunet.requests;
+
+import org.gnunet.util.Connection;
+import org.gnunet.util.GnunetMessage;
+
+/**
+ * A request that sends a message, pre-determined at construction of the 
FixedMessageRequest.
+ *
+ * @author Florian Dold
+ */
+public class FixedMessageRequest extends Request {
+    private final GnunetMessage.Body msg;
+
+    public FixedMessageRequest(GnunetMessage.Body msg) {
+        this.msg = msg;
+    }
+
+    final
+    @Override
+    public void transmit(Connection.MessageSink sink) {
+        sink.send(msg);
+    }
+}

Modified: gnunet-java/src/org/gnunet/requests/Request.java
===================================================================
--- gnunet-java/src/org/gnunet/requests/Request.java    2013-01-21 15:36:00 UTC 
(rev 25844)
+++ gnunet-java/src/org/gnunet/requests/Request.java    2013-01-21 15:43:13 UTC 
(rev 25845)
@@ -5,9 +5,26 @@
 import org.gnunet.util.RelativeTime;
 
 /**
-* ...
-*
-* @author Florian Dold
+ * Abstract base class for a Request.
+ *
+ * Every request defines what happens when one of the following happens:
+ * <ul>
+ *     <li>
+ *         The request is canceled. There may be some cleanup necessary, 
depending on whether the request has already been
+ *         sent to the service or not.
+ *     </li>
+ *     <li>
+ *         On timeout.
+ *     </li>
+ *     <li>
+ *         On reconnect. In particular, every Request has to decide whether it 
will be kept after a reconnect to the service.
+ *     </li>
+ *     <li>
+ *         On destruction of the request queue. Some request may be important 
enough to delay the destruction until they have been sent.
+ *     </li>
+ * </ul>
+ *
+ * @author Florian Dold
 */
 public abstract class Request {
     protected AbsoluteTime deadline = AbsoluteTime.FOREVER;

Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java      2013-01-21 15:36:00 UTC 
(rev 25844)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java      2013-01-21 15:43:13 UTC 
(rev 25845)
@@ -45,12 +45,12 @@
     private static int readyCount = 0;
 
     // for every priority, there is a list of tasks that is definitely ready 
to run
-    final private static ArrayList<LinkedList<TaskConfiguration>> ready = new 
ArrayList<LinkedList<TaskConfiguration>>
-            (Priority.size);
+    @SuppressWarnings("unchecked")
+    final private static LinkedList<TaskConfiguration>[] readyLists = new 
LinkedList[Priority.numberOfPriorities];
 
     static {
-        for (int i = 0; i < Priority.size; ++i) {
-            ready.add(new LinkedList<TaskConfiguration>());
+        for (int i = 0; i < Priority.numberOfPriorities; ++i) {
+            readyLists[i] = new LinkedList<TaskConfiguration>();
         }
     }
 
@@ -61,11 +61,11 @@
             Reason.ACCEPT_READY, Reason.CONNECT_READY};
 
 
+    /**
+     * Selector, used to check file descriptors for readiness.
+     */
     private static Selector selector = null;
 
-    private static boolean scheduler_running = false;
-
-
     static {
         try {
             selector = SelectorProvider.provider().openSelector();
@@ -76,13 +76,36 @@
         }
     }
 
+    /**
+     * true iff the scheduler is currently running.
+     */
+    private static boolean scheduler_running = false;
 
+
+
+    // tasks that are waiting for an event, which are executed anyway after 
the deadline has occurred
+    final private static Queue<TaskConfiguration> pending = new 
PriorityQueue<TaskConfiguration>(5, new Comparator
+            <TaskConfiguration>() {
+        @Override
+        public int compare(TaskConfiguration a, TaskConfiguration b) {
+            return a.deadline.compareTo(b.deadline);
+        }
+    });
+
+
+    /**
+     * Priority for Tasks.
+     */
     public enum Priority {
         IDLE, BACKGROUND, DEFAULT, HIGH, UI, URGENT, SHUTDOWN;
-        private static final int size = Priority.values().length;
-        public static final Priority KEEP = null;
+
+        // how many different priorities do we have?
+        private static final int numberOfPriorities = Priority.values().length;
     }
 
+    /**
+     * Reasons for executing a task.
+     */
     public enum Reason {
         STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, ACCEPT_READY, 
CONNECT_READY
     }
@@ -98,10 +121,6 @@
 
         public RunContext() {
         }
-
-        public RunContext(Set<Reason> reasons) {
-            this.reasons = reasons;
-        }
     }
 
     /**
@@ -130,7 +149,7 @@
         /**
          * Create a TaskIdentifier.
          *
-         * @param task
+         * @param task task to run with this TaskIdentifier
          */
         TaskConfiguration(RelativeTime delay, Task task) {
             this.task = task;
@@ -257,20 +276,6 @@
         }
     }
 
-    // tasks that are waiting for an event, which are executed anyway after 
the deadline has occurred
-    final private static Queue<TaskConfiguration> pending = new 
PriorityQueue<TaskConfiguration>(5, new Comparator
-            <TaskConfiguration>() {
-        @Override
-        public int compare(TaskConfiguration a, TaskConfiguration b) {
-            return a.deadline.compareTo(b.deadline);
-        }
-    });
-
-
-    public static boolean getCurrentLifeness() {
-        return (activeTask == null) || activeTask.lifeness;
-    }
-
     /**
      * Run the task regardless of any prerequisites, before any other task of
      * the same priority.
@@ -357,7 +362,7 @@
      */
     private static void queueReady(TaskConfiguration tid) {
         int idx = tid.priority.ordinal();
-        ready.get(idx).add(tid);
+        readyLists[idx].add(tid);
         readyCount++;
 
         pending.remove(tid);
@@ -458,9 +463,8 @@
      */
     public static void run(Task initialTask) {
         if (scheduler_running) {
-            throw new AssertionError("can't call run recursively");
+            throw new AssertionError("Scheduler already running");
         }
-
         scheduler_running = true;
 
         if (initialTask != null) {
@@ -488,6 +492,27 @@
 
             runReady();
         }
+
+        if (readyCount != 0) {
+            throw new AssertionError("tasks ready after scheduler ran 
(count)");
+        }
+
+        for (List readyList : Scheduler.readyLists) {
+            if (!readyList.isEmpty()) {
+                throw new AssertionError("tasks ready after scheduler ran 
(list)");
+            }
+        }
+
+        if (pending.size() != 0) {
+            throw new AssertionError("pending tasks after scheduler ran");
+        }
+
+        if (activeTask != null) {
+            throw new AssertionError("active task after scheduler ran");
+        }
+
+
+        scheduler_running = false;
     }
 
 
@@ -504,9 +529,9 @@
                 return;
             }
             // start executing from the highest priority down to 0
-            for (int p = Priority.size - 1; p >= 0; p--) {
+            for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) {
                 // execute all tasks with priority p
-                LinkedList<TaskConfiguration> queue = ready.get(p);
+                LinkedList<TaskConfiguration> queue = readyLists[p];
                 while (!queue.isEmpty()) {
                     TaskConfiguration tid = queue.removeFirst();
                     readyCount--;
@@ -518,7 +543,7 @@
     }
 
     /**
-     * Request the disconnect of a scheduler. Marks all currently pending 
tasks as
+     * Request the shutdown of a scheduler. Marks all currently pending tasks 
as
      * ready because of disconnect. This will cause all tasks to run (as soon 
as
      * possible, respecting priorities and prerequisite tasks). Note that tasks
      * scheduled AFTER this call may still be delayed arbitrarily.




reply via email to

[Prev in Thread] Current Thread [Next in Thread]