gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r4128 - in GNUnet: . src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r4128 - in GNUnet: . src/applications/dht/module
Date: Sat, 30 Dec 2006 07:10:20 -0800 (PST)

Author: grothoff
Date: 2006-12-30 07:10:17 -0800 (Sat, 30 Dec 2006)
New Revision: 4128

Modified:
   GNUnet/src/applications/dht/module/routing.c
   GNUnet/todo
Log:
addressing routing issues

Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c        2006-12-30 14:51:14 UTC 
(rev 4127)
+++ GNUnet/src/applications/dht/module/routing.c        2006-12-30 15:10:17 UTC 
(rev 4128)
@@ -23,16 +23,11 @@
  * @brief state for active DHT routing operations
  * @author Christian Grothoff
  *
- * RC:
- * - add support for GET retry (or delayed initial GET ops)
- * - fix problem with possible GET/PUT routing loops!
- *   (not convinced that current design even probabilistically
- *    prevents loops; also check for routing loops by
- *    checking pending queries)
- *
  * LATER:
  * - prioritization
  * - delay selection
+ * - implement extra_get_callback
+ * - add similar callback for discovery in table.c
  */
 
 #include "platform.h"
@@ -47,9 +42,14 @@
 /**
  * @brief record used for sending response back
  */
-typedef struct {
+typedef struct DHT_Source_Route {
 
   /**
+   * This is a linked list.
+   */
+  struct DHT_Source_Route * next;
+
+  /**
    * Source of the request.  Replies should be forwarded to
    * this peer.
    */
@@ -73,19 +73,19 @@
   MESSAGE_HEADER header;
 
   /**
-   * Type of the requested content
+   * Type of the requested content (NBO)
    */
   unsigned int type;
 
   /**
-   * Priority of requested content
+   * Priority of requested content (NBO)
    */
   unsigned int prio;
 
   /**
-   * Reserved (for now, always zero)
+   * Relative time to live in cronMILLIS (NBO)
    */
-  unsigned int reserved;
+  int ttl;
 
   /**
    * Search key.
@@ -104,12 +104,12 @@
   MESSAGE_HEADER header;
 
   /**
-   * Type of the content
+   * Type of the content (NBO)
    */
   unsigned int type;
 
   /**
-   * When to discard the content (relative time)
+   * When to discard the content (relative time, NBO)
    */
   cron_t timeout;
 
@@ -130,7 +130,7 @@
   MESSAGE_HEADER header;
 
   /**
-   * Type of the content
+   * Type of the content (NBO)
    */
   unsigned int type;
 
@@ -144,12 +144,17 @@
 /**
  * Entry in the DHT routing table.
  */
-typedef struct {
+typedef struct DHTQueryRecord {
 
   /**
+   * When do we stop forwarding this request?
+   */
+  cron_t expires;
+
+  /**
    * Information about where to send the results back to.
    */
-  DHT_Source_Route source;
+  DHT_Source_Route * sources;
 
   /**
    * GET message of this record.
@@ -166,19 +171,26 @@
    * Number of entries in results.
    */
   unsigned int result_count;
-
 } DHTQueryRecord;
 
-static unsigned int rt_size;
+/**
+ * How far into the future can requests continue?
+ * Note that this also caps the frequency of how
+ * often peers will re-issue requests.
+ */
+#define MAX_TTL (5 * cronMINUTES)
 
-static unsigned int rt_pos;
-
 /**
- * rt_size records of active queries
+ * Linked list of active records.
  */
 static DHTQueryRecord ** records;
 
 /**
+ * Size of records
+ */
+static unsigned int rt_size;
+
+/**
  * Statistics service.
  */
 static Stats_ServiceAPI * stats;
@@ -214,6 +226,7 @@
   DHT_RESULT_MESSAGE * result;
   unsigned int routed;
   unsigned int tracked;
+  DHT_Source_Route * pos;
 
   if (cls != NULL) {
     result = cls;
@@ -258,33 +271,37 @@
         q->result_count + 1);
     routed++;
     q->results[q->result_count-1] = hc;
-    if (0 != memcmp(&q->source.source,
-                   coreAPI->myIdentity,
-                   sizeof(PeerIdentity))) {
+    pos = q->sources;
+    while (pos != NULL) {
+      if (0 != memcmp(&pos->source,
+                     coreAPI->myIdentity,
+                     sizeof(PeerIdentity))) {
 #if DEBUG_ROUTING
-      GE_LOG(coreAPI->ectx,
-            GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
-            "Routing result to other peer\n");
+       GE_LOG(coreAPI->ectx,
+              GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+              "Routing result to other peer\n");
 #endif
-      coreAPI->unicast(&q->source.source,
-                      &result->header,
-                      0, /* FIXME: priority */
-                      5 * cronSECONDS); /* FIXME */
-      if (stats != NULL)
-       stats->change(stat_replies_routed, 1);
-    } else if (q->source.receiver != NULL) {
+       coreAPI->unicast(&pos->source,
+                        &result->header,
+                        0, /* FIXME: priority */
+                        5 * cronSECONDS); /* FIXME */
+       if (stats != NULL)
+         stats->change(stat_replies_routed, 1);
+      } else if (pos->receiver != NULL) {
 #if DEBUG_ROUTING
-      GE_LOG(coreAPI->ectx,
-            GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
-            "Routing result to local client\n");
+       GE_LOG(coreAPI->ectx,
+              GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+              "Routing result to local client\n");
 #endif
-      q->source.receiver(key,
-                        type,
-                        size,
-                        data,
-                        q->source.receiver_closure);
-      if (stats != NULL)
-       stats->change(stat_replies_routed, 1);
+       pos->receiver(key,
+                     type,
+                     size,
+                     data,
+                     pos->receiver_closure);
+       if (stats != NULL)
+         stats->change(stat_replies_routed, 1);
+      }
+      pos = pos->next;
     }
   }
   MUTEX_UNLOCK(lock); 
@@ -299,35 +316,84 @@
     FREE(result);
 }
 
-static void addRoute(const PeerIdentity * sender,
-                    ResultHandler handler,
-                    void * cls,
-                    const DHT_GET_MESSAGE * get) {
+/**
+ * @return OK if route was added, SYSERR if not
+ */
+static int addRoute(const PeerIdentity * sender,
+                   ResultHandler handler,
+                   void * cls,
+                   const DHT_GET_MESSAGE * get) {
   DHTQueryRecord * q;
+  unsigned int i;
+  unsigned int rt_pos;
+  cron_t expire;
+  cron_t now;
+  int ttl;
+  struct DHT_Source_Route * pos;
 
+  ttl = ntohl(get->ttl);
+  if (ttl > MAX_TTL)
+    ttl = 0; /* implausibly high */
+  now = get_time();
+  expire = now + ttl;
   MUTEX_LOCK(lock);
-  if (records[rt_pos] != NULL) {
+  rt_pos = rt_size;
+  for (i=0;i<rt_size;i++) {
+    if ( (sender != NULL) &&
+        (records[i] != NULL) &&
+        (0 == memcmp(&records[i]->get->key,
+                     &get->key,
+                     sizeof(HashCode512))) &&
+        (records[i]->get->type == get->type) &&
+        (records[i]->expires > now - MAX_TTL) ) {
+      /* do not route, same request already (recently) 
+        active (possibly from other initiator) */
+      /* FIXME: support sending replies back to
+        multiple peers!? */
+      MUTEX_UNLOCK(lock);
+      return SYSERR;
+    }
+    if (records[i] == NULL) {
+      records[i] = MALLOC(sizeof(DHTQueryRecord));
+      records[i]->get = NULL;
+      rt_pos = i;
+      expire = 0;
+    } else if (records[i]->expires < expire) {
+      expire = records[i]->expires;
+      rt_pos = i;
+    } 
+  }
+  if (rt_pos == rt_size) {
+    /* do not route, expiration time too high */
+    MUTEX_UNLOCK(lock);
+    return SYSERR;
+  }
+  if (records[rt_pos]->get != NULL) {
     FREE(records[rt_pos]->get);
-    GROW(records[rt_pos]->results,
-        records[rt_pos]->result_count,
-        0);
-  } else {
-    records[rt_pos] = MALLOC(sizeof(DHTQueryRecord));
+    while (records[rt_pos]->sources != NULL) {
+      pos = records[rt_pos]->sources;
+      records[rt_pos]->sources = pos->next;
+      FREE(pos);
+    }
   }
   q = records[rt_pos];
   memset(q,
         0,
         sizeof(DHTQueryRecord));
+  q->expires = now + ttl;
   q->get = MALLOC(ntohs(get->header.size));
   memcpy(q->get,
         get,
         ntohs(get->header.size));
+  pos = MALLOC(sizeof(DHT_Source_Route));
+  pos->next = q->sources;
+  q->sources = pos;
   if (sender != NULL)
-    q->source.source = *sender;
+    pos->source = *sender;
   else
-    q->source.source = *coreAPI->myIdentity;
-  q->source.receiver = handler;
-  q->source.receiver_closure = cls;
+    pos->source = *coreAPI->myIdentity;
+  pos->receiver = handler;
+  pos->receiver_closure = cls;
 #if DEBUG_ROUTING
   GE_LOG(coreAPI->ectx,
         GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
@@ -338,6 +404,7 @@
   MUTEX_UNLOCK(lock);
   if (stats != NULL)
     stats->change(stat_requests_routed, 1);
+  return OK;
 }
 
 /**
@@ -354,7 +421,9 @@
                     const MESSAGE_HEADER * msg) {
   PeerIdentity next[GET_TRIES];
   const DHT_GET_MESSAGE * get;
+  DHT_GET_MESSAGE aget;
   int total;
+  int ttl;
   int i;
 #if DEBUG_ROUTING
   EncName enc;
@@ -374,11 +443,12 @@
 #endif
   if (stats != NULL)
     stats->change(stat_get_requests_received, 1);
-  if (sender != NULL)
-    addRoute(sender,
-            NULL,
-            NULL,
-            get);  
+  if ( (sender != NULL) &&
+       (OK != addRoute(sender,
+                      NULL,
+                      NULL,
+                      get)) )
+    return OK; /* could not route */
   total = dht_store_get(&get->key,
                        ntohl(get->type),
                        &routeResult,
@@ -393,6 +463,7 @@
 #endif
     return OK;
   }
+  total = 0;
   for (i=0;i<GET_TRIES;i++) {
     if (OK != select_dht_peer(&next[i],
                              &get->key,
@@ -402,6 +473,15 @@
     if (-1 == hashCodeCompareDistance(&next[i].hashPubKey,
                                      &coreAPI->myIdentity->hashPubKey,
                                      &get->key)) {
+      if (total == 0) {
+       aget = *get;
+       ttl = ntohl(get->ttl);
+       if (ttl > MAX_TTL)
+         ttl = MAX_TTL;
+       ttl -= 5 * cronSECONDS;
+       aget.ttl = htonl(ttl);
+       total = 1;
+      }
       coreAPI->unicast(&next[i],
                       msg,
                       0, /* FIXME: priority */
@@ -441,7 +521,8 @@
     stats->change(stat_put_requests_received, 1);
   put = (const DHT_PUT_MESSAGE*) msg;
 #if DEBUG_ROUTING
-  hash2enc(&put->key, &enc);
+  hash2enc(&put->key, 
+          &enc);
   GE_LOG(coreAPI->ectx,
         GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
         "Received DHT PUT for key `%s'.\n",
@@ -539,14 +620,14 @@
   get.header.type = htons(P2P_PROTO_DHT_GET);
   get.type = htonl(type);
   get.prio = htonl(0); /* FIXME */
-  get.reserved = htonl(0);
+  get.ttl = htonl(MAX_TTL); /* FIXME? */
   get.key = *key;
-  addRoute(NULL,
-          handler,
-          cls,
-          &get);             
-  handleGet(NULL,
-           &get.header);
+  if (OK == addRoute(NULL,
+                    handler,
+                    cls,
+                    &get))     
+    handleGet(NULL,
+             &get.header);
 }
 
 /**
@@ -558,21 +639,39 @@
                  ResultHandler handler,
                  void * cls) {
   int i;
+  struct DHT_Source_Route * pos;
+  struct DHT_Source_Route * prev;
+  int done;
 
+  done = NO;
   MUTEX_LOCK(lock);
   for (i=0;i<rt_size;i++) {
     if (records[i] == NULL)
       continue;
-    if ( (records[i]->source.receiver == handler) &&
-        (records[i]->source.receiver_closure == cls) &&
-        (0 == memcmp(key,
-                     &records[i]->get->key,
-                     sizeof(HashCode512))) ) {
+    prev = NULL;
+    pos = records[i]->sources;
+    while (pos != NULL) {
+      if ( (pos->receiver == handler) &&
+          (pos->receiver_closure == cls) &&
+          (0 == memcmp(key,
+                       &records[i]->get->key,
+                       sizeof(HashCode512))) ) {
+       if (prev == NULL)
+         records[i]->sources = pos->next;
+       else
+         prev->next = pos->next;
+       FREE(pos);
+       done = YES;
+       break;
+      }
+    }
+    if (records[i]->sources == NULL) {
       FREE(records[i]->get);
       FREE(records[i]);
       records[i] = NULL;
+    }
+    if (done == YES)
       break;
-    }
   }
   MUTEX_UNLOCK(lock);
 }
@@ -608,6 +707,21 @@
 }
 
 /**
+ * We have additional "free" bandwidth available.
+ * Possibly find a good query to add to the message
+ * to the given receiver.
+ *
+ * @param padding maximum number of bytes available
+ * @return number of bytes added at position
+ */
+static unsigned int 
+extra_get_callback(const PeerIdentity * receiver,
+                  void * position,
+                  unsigned int padding) {
+  return 0;
+}
+
+/**
  * Initialize routing DHT component.
  *
  * @param capi the core API
@@ -650,6 +764,8 @@
                           &handlePut);
   coreAPI->registerHandler(P2P_PROTO_DHT_RESULT,
                           &handleResult);
+  coreAPI->registerSendCallback(sizeof(DHT_GET_MESSAGE),
+                               &extra_get_callback);
   return OK;
 }
 
@@ -661,6 +777,8 @@
 int done_dht_routing() {
   unsigned int i;
 
+  coreAPI->unregisterSendCallback(sizeof(DHT_GET_MESSAGE),
+                                 &extra_get_callback);
   coreAPI->unregisterHandler(P2P_PROTO_DHT_GET,
                             &handleGet);
   coreAPI->unregisterHandler(P2P_PROTO_DHT_PUT,

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-12-30 14:51:14 UTC (rev 4127)
+++ GNUnet/todo 2006-12-30 15:10:17 UTC (rev 4128)
@@ -21,12 +21,9 @@
 0.7.2 [3'07]:
 - new features:
   * XFS / support for location URIs [CG] 
-    + dht/routing: handle routing loops [RC]
     + ECRS-URI: toString/fromString for loc URIs [RC]
-    + dht/gap integration [RC]
-    + fsui/location URI support [RC]
-    + dht/routing: GET retries (optimization)
-    + dstore bloomfilter (optimization)
+    + dht/gap integration (search routing) [RC]
+    + fsui/fs/location URI support (download routing) [RC]
   * HTTP transport (libcurl, libmicrohttpd)
 - minor improvements:
   * directories can be compacted -- add heuristic to determine
@@ -47,11 +44,11 @@
 - insert meta-data under hash (md5? sha1? sha-512? GNUnet-URI?)
   as keyword (to allow getting meta-data from URI only)
 - Chat support basics [RC]
-- better NAT traversal:
-  * NAT-PMP (in addition to UPnP)
 - old/new features:
   * SMTP transport (libesmtp)
   * SMTP logger
+  * support NAT-PMP (in addition to UPnP)?
+  * add bloomfilter to dstore?
 - Documentation:
   * LJ article
 - Testcases:





reply via email to

[Prev in Thread] Current Thread [Next in Thread]