gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r164 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r164 - GNUnet/src/applications/dht/module
Date: Thu, 3 Feb 2005 02:03:17 -0800 (PST)

Author: grothoff
Date: 2005-02-03 02:03:16 -0800 (Thu, 03 Feb 2005)
New Revision: 164

Modified:
   GNUnet/src/applications/dht/module/dht.c
Log:
more dht hacking

Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c    2005-02-03 09:23:44 UTC (rev 
163)
+++ GNUnet/src/applications/dht/module/dht.c    2005-02-03 10:03:16 UTC (rev 
164)
@@ -252,6 +252,7 @@
    * such an operation was not performed).
    */
   struct DHT_GET_RECORD * async_handle;
+  Semaphore * async_handle_done;
 
   /**
    * ASYNC RPC handles.
@@ -275,7 +276,7 @@
  * @param identity the identity of the node that was found
  * @return OK to continue searching, SYSERR to abort early
  */
-typedef int (*NodeFoundCallback)(PeerIdentity * identity,
+typedef int (*NodeFoundCallback)(const PeerIdentity * identity,
                                 void * closure);
 
 /**
@@ -319,6 +320,7 @@
    * such an operation was not performed).
    */
   struct DHT_GET_RECORD * async_handle;
+  Semaphore * async_handle_done;
 
   /**
    * ASYNC RPC handles.
@@ -373,6 +375,8 @@
 
   void * resultClosure;
 
+  unsigned int resultsFound;
+
   /**
    * Context of findKNodes (async); NULL if the table was local.
    */
@@ -1588,19 +1592,20 @@
  * callback with the results found locally.
  * A DataProcessor.
  */
-static int getLocalResultCallback(const HashCode160 key,
-                                 const DataContainer val,
+static int getLocalResultCallback(const HashCode160 * key,
+                                 const DataContainer * val,
                                  DHT_GET_RECORD * rec) {
   int ret;
   if ( (equalsHashCode160(&rec->table,
                          &masterTableId)) &&
-       (results[j].dataLength % sizeof(PeerIdentity) != 0) )
+       ((ntohl(val->size) - sizeof(DataContainer)) % sizeof(PeerIdentity) != 
0) )
     BREAK(); /* assertion failed: entry in master table malformed! */
   ret = OK;
   if (rec->resultCallback != NULL)
-    ret = rec->resultCallback(val,
+    ret = rec->resultCallback(key,
+                             val,
                              rec->resultClosure);
-  rec->resultsFound++;  
+  rec->resultsFound++;
   return ret;
 }
 
@@ -1638,7 +1643,7 @@
 
   ENTER();
   IFLOG(LOG_DEBUG,
-       hash2enc(keys[0],
+       hash2enc(&keys[0],
                 &enc));
   IFLOG(LOG_DEBUG,
        hash2enc(table,
@@ -1668,12 +1673,12 @@
   ret->table = *table;
   ret->resultCallback = resultCallback;
   ret->resultClosure = cls;
+  ret->resultsFound = 0;
   ret->callback = callback;
   ret->closure = closure;
   MUTEX_CREATE_RECURSIVE(&ret->lock);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
-  ret->resultsFound = 0;
   ret->kfnc = NULL;
   MUTEX_LOCK(lock);
 
@@ -1717,6 +1722,7 @@
                             &hosts[i])) {
        res = ltd->store->get(ltd->store->closure,
                              type,
+                             0, /* FIXME: priority */
                              keyCount,
                              keys,
                              (DataProcessor)&getLocalResultCallback,
@@ -1771,7 +1777,7 @@
     */   
     ret->kfnc 
       = findKNodes_start(table,
-                        key,
+                        &keys[0],
                         timeout,
                         ALPHA,
                         (NodeFoundCallback) &send_dht_get_rpc,
@@ -1823,16 +1829,18 @@
  *  to the identities of peers that support the table that we're 
  *  looking for; pass those Helos to the core *and* try to ping them.
  */
-static void findnodes_dht_master_get_callback(const DataContainer * cont,
-                                             FindNodesContext * fnc) {
+static int 
+findnodes_dht_master_get_callback(const HashCode160 * key,
+                                 const DataContainer * cont,
+                                 FindNodesContext * fnc) {
   unsigned int dataLength;
-  PeerIdentity * id;
+  const PeerIdentity * id;
   int i;
 
   ENTER();
-  dataLength = cont->dataLength;
-
-  if (dataLength % sizeof(PeerIdentity) != 0) {
+  dataLength = ntohl(cont->size) - sizeof(DataContainer);
+  
+  if ( (dataLength % sizeof(PeerIdentity)) != 0) {
     LOG(LOG_DEBUG,
        "Response size was %d, expected multile of %d\n",
        dataLength, 
@@ -1840,17 +1848,23 @@
     LOG(LOG_WARNING,
        _("Invalid response to '%s'.\n"),
        "DHT_findValue");
-    return;
+    return SYSERR;
   }
-  id = (PeerIdentity*) cont->data;
+  id = (const PeerIdentity*) &cont[1];
   for (i=dataLength/sizeof(PeerIdentity)-1;i>=0;i--) {
     if (!hostIdentityEquals(&id[i],
                            coreAPI->myIdentity)) 
       request_DHT_ping(&id[i],
                       fnc);  
   }
+  return OK;
 }
 
+static void findnodes_dht_master_complete_callback(FindNodesContext * fnc) {
+  SEMAPHORE_UP(fnc->async_handle_done);
+}
+
+
 /**
  * In the induced sub-structure for the given 'table', find the ALPHA
  * nodes closest to the given key.  The code first tries to find ALPHA
@@ -1904,6 +1918,7 @@
   fnc->timeout = cronTime(NULL) + timeout;
   fnc->rpcRepliesExpected = 0;
   fnc->rpcRepliesReceived = 0;
+  fnc->async_handle = NULL;
   MUTEX_CREATE_RECURSIVE(&fnc->lock);
 
   /* find peers in local peer-list that participate in
@@ -1952,13 +1967,17 @@
 #endif
       /* try finding peers responsible for this table using
         the master table */
+      fnc->async_handle_done
+       = SEMAPHORE_NEW(0);
       fnc->async_handle
        = dht_get_async_start(&masterTableId,
-                             table,
+                             0, /* type */
+                             1, /* 1 key */
+                             table, /* key */
                              timeout,
-                             ALPHA - fnc->k, /* level of parallelism 
proportional to 
-                                                number of peers we're looking 
for */
-                             &findnodes_dht_master_get_callback,
+                             (DataProcessor) 
&findnodes_dht_master_get_callback,
+                             fnc,
+                             (DHT_OP_Complete) 
&findnodes_dht_master_complete_callback,
                              fnc);
     }
   }
@@ -1982,7 +2001,9 @@
   ENTER();
   /* stop async DHT get */
   if (fnc->async_handle != NULL) {
+    SEMAPHORE_DOWN(fnc->async_handle_done);
     dht_get_async_stop(fnc->async_handle);
+    SEMAPHORE_FREE(fnc->async_handle_done);
     fnc->async_handle = NULL;
   }
 
@@ -2011,18 +2032,19 @@
  *  looking for; pass those Helos to the core *and* to the callback
  *  as peers supporting the table.
  */
-static void find_k_nodes_dht_master_get_callback(const DataContainer * cont,
+static void find_k_nodes_dht_master_get_callback(const HashCode160 * key,
+                                                const DataContainer * cont,
                                                 FindKNodesContext * fnc) {
   unsigned int pos;
   unsigned int dataLength;
-  char * value;
+  const PeerIdentity * value;
 #if DEBUG_DHT
   EncName enc;
 #endif
 
   ENTER();
-  dataLength = cont->dataLength;
-  value = cont->data;
+  dataLength = ntohl(cont->size) - sizeof(DataContainer);
+  value = (const PeerIdentity*) &cont[1];
 
   /* parse value, try to DHT-ping the new peers
      (to add it to the table; if that succeeds
@@ -2034,10 +2056,10 @@
        "DHT_findValue");
     return;
   }
-  for (pos = 0;pos<dataLength;pos+=sizeof(PeerIdentity)) {
-    PeerIdentity * msg;
+  for (pos = 0;pos<dataLength/sizeof(PeerIdentity);pos++) {
+    const PeerIdentity * msg;
 
-    msg = (PeerIdentity*) &value[pos];
+    msg = &value[pos];
 #if DEBUG_DHT
     IFLOG(LOG_DEBUG,
          hash2enc(&msg->hashPubKey,
@@ -2059,6 +2081,9 @@
   }
 }
 
+static void find_k_nodes_dht_master_get_complete(FindKNodesContext * fnc) {
+  SEMAPHORE_UP(fnc->async_handle_done);
+}
 
 /**
  * In the induced sub-structure for the given 'table', find k nodes
@@ -2164,13 +2189,17 @@
 #endif
     /* try finding peers responsible for this table using
        the master table */
+    fnc->async_handle_done
+      = SEMAPHORE_NEW(0);
     fnc->async_handle
       = dht_get_async_start(&masterTableId,
-                           table,
-                           timeout,
-                           fnc->k, /* level of parallelism proportional to 
-                                      number of peers we're looking for */
-                           &find_k_nodes_dht_master_get_callback,
+                           0, /* type */
+                           1, /* key count */
+                           table, /* keys */
+                           timeout, 
+                           
(DataProcessor)&find_k_nodes_dht_master_get_callback,
+                           fnc,
+                           (DHT_OP_Complete) 
&find_k_nodes_dht_master_get_complete,
                            fnc);
   }  
   return fnc;
@@ -2188,6 +2217,8 @@
   /* stop async DHT get */
   ENTER();
   if (fnc->async_handle != NULL) {
+    SEMAPHORE_DOWN(fnc->async_handle_done);
+    SEMAPHORE_FREE(fnc->async_handle_done);
     dht_get_async_stop(fnc->async_handle);
     fnc->async_handle = NULL;
   }
@@ -2259,8 +2290,7 @@
           record->confirmedReplicas+1);
       record->replicas[record->confirmedReplicas-1] = *peer;
       if (record->callback != NULL)
-       record->callback(peer,
-                        record->closure);
+       record->callback(record->closure);
     }
   }
   MUTEX_UNLOCK(&record->lock);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]