[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r164 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r164 - GNUnet/src/applications/dht/module |
Date: |
Thu, 3 Feb 2005 02:03:17 -0800 (PST) |
Author: grothoff
Date: 2005-02-03 02:03:16 -0800 (Thu, 03 Feb 2005)
New Revision: 164
Modified:
GNUnet/src/applications/dht/module/dht.c
Log:
more dht hacking
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2005-02-03 09:23:44 UTC (rev
163)
+++ GNUnet/src/applications/dht/module/dht.c 2005-02-03 10:03:16 UTC (rev
164)
@@ -252,6 +252,7 @@
* such an operation was not performed).
*/
struct DHT_GET_RECORD * async_handle;
+ Semaphore * async_handle_done;
/**
* ASYNC RPC handles.
@@ -275,7 +276,7 @@
* @param identity the identity of the node that was found
* @return OK to continue searching, SYSERR to abort early
*/
-typedef int (*NodeFoundCallback)(PeerIdentity * identity,
+typedef int (*NodeFoundCallback)(const PeerIdentity * identity,
void * closure);
/**
@@ -319,6 +320,7 @@
* such an operation was not performed).
*/
struct DHT_GET_RECORD * async_handle;
+ Semaphore * async_handle_done;
/**
* ASYNC RPC handles.
@@ -373,6 +375,8 @@
void * resultClosure;
+ unsigned int resultsFound;
+
/**
* Context of findKNodes (async); NULL if the table was local.
*/
@@ -1588,19 +1592,20 @@
* callback with the results found locally.
* A DataProcessor.
*/
-static int getLocalResultCallback(const HashCode160 key,
- const DataContainer val,
+static int getLocalResultCallback(const HashCode160 * key,
+ const DataContainer * val,
DHT_GET_RECORD * rec) {
int ret;
if ( (equalsHashCode160(&rec->table,
&masterTableId)) &&
- (results[j].dataLength % sizeof(PeerIdentity) != 0) )
+ ((ntohl(val->size) - sizeof(DataContainer)) % sizeof(PeerIdentity) !=
0) )
BREAK(); /* assertion failed: entry in master table malformed! */
ret = OK;
if (rec->resultCallback != NULL)
- ret = rec->resultCallback(val,
+ ret = rec->resultCallback(key,
+ val,
rec->resultClosure);
- rec->resultsFound++;
+ rec->resultsFound++;
return ret;
}
@@ -1638,7 +1643,7 @@
ENTER();
IFLOG(LOG_DEBUG,
- hash2enc(keys[0],
+ hash2enc(&keys[0],
&enc));
IFLOG(LOG_DEBUG,
hash2enc(table,
@@ -1668,12 +1673,12 @@
ret->table = *table;
ret->resultCallback = resultCallback;
ret->resultClosure = cls;
+ ret->resultsFound = 0;
ret->callback = callback;
ret->closure = closure;
MUTEX_CREATE_RECURSIVE(&ret->lock);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
- ret->resultsFound = 0;
ret->kfnc = NULL;
MUTEX_LOCK(lock);
@@ -1717,6 +1722,7 @@
&hosts[i])) {
res = ltd->store->get(ltd->store->closure,
type,
+ 0, /* FIXME: priority */
keyCount,
keys,
(DataProcessor)&getLocalResultCallback,
@@ -1771,7 +1777,7 @@
*/
ret->kfnc
= findKNodes_start(table,
- key,
+ &keys[0],
timeout,
ALPHA,
(NodeFoundCallback) &send_dht_get_rpc,
@@ -1823,16 +1829,18 @@
* to the identities of peers that support the table that we're
* looking for; pass those Helos to the core *and* try to ping them.
*/
-static void findnodes_dht_master_get_callback(const DataContainer * cont,
- FindNodesContext * fnc) {
+static int
+findnodes_dht_master_get_callback(const HashCode160 * key,
+ const DataContainer * cont,
+ FindNodesContext * fnc) {
unsigned int dataLength;
- PeerIdentity * id;
+ const PeerIdentity * id;
int i;
ENTER();
- dataLength = cont->dataLength;
-
- if (dataLength % sizeof(PeerIdentity) != 0) {
+ dataLength = ntohl(cont->size) - sizeof(DataContainer);
+
+ if ( (dataLength % sizeof(PeerIdentity)) != 0) {
LOG(LOG_DEBUG,
"Response size was %d, expected multile of %d\n",
dataLength,
@@ -1840,17 +1848,23 @@
LOG(LOG_WARNING,
_("Invalid response to '%s'.\n"),
"DHT_findValue");
- return;
+ return SYSERR;
}
- id = (PeerIdentity*) cont->data;
+ id = (const PeerIdentity*) &cont[1];
for (i=dataLength/sizeof(PeerIdentity)-1;i>=0;i--) {
if (!hostIdentityEquals(&id[i],
coreAPI->myIdentity))
request_DHT_ping(&id[i],
fnc);
}
+ return OK;
}
+static void findnodes_dht_master_complete_callback(FindNodesContext * fnc) {
+ SEMAPHORE_UP(fnc->async_handle_done);
+}
+
+
/**
* In the induced sub-structure for the given 'table', find the ALPHA
* nodes closest to the given key. The code first tries to find ALPHA
@@ -1904,6 +1918,7 @@
fnc->timeout = cronTime(NULL) + timeout;
fnc->rpcRepliesExpected = 0;
fnc->rpcRepliesReceived = 0;
+ fnc->async_handle = NULL;
MUTEX_CREATE_RECURSIVE(&fnc->lock);
/* find peers in local peer-list that participate in
@@ -1952,13 +1967,17 @@
#endif
/* try finding peers responsible for this table using
the master table */
+ fnc->async_handle_done
+ = SEMAPHORE_NEW(0);
fnc->async_handle
= dht_get_async_start(&masterTableId,
- table,
+ 0, /* type */
+ 1, /* 1 key */
+ table, /* key */
timeout,
- ALPHA - fnc->k, /* level of parallelism
proportional to
- number of peers we're looking
for */
- &findnodes_dht_master_get_callback,
+ (DataProcessor)
&findnodes_dht_master_get_callback,
+ fnc,
+ (DHT_OP_Complete)
&findnodes_dht_master_complete_callback,
fnc);
}
}
@@ -1982,7 +2001,9 @@
ENTER();
/* stop async DHT get */
if (fnc->async_handle != NULL) {
+ SEMAPHORE_DOWN(fnc->async_handle_done);
dht_get_async_stop(fnc->async_handle);
+ SEMAPHORE_FREE(fnc->async_handle_done);
fnc->async_handle = NULL;
}
@@ -2011,18 +2032,19 @@
* looking for; pass those Helos to the core *and* to the callback
* as peers supporting the table.
*/
-static void find_k_nodes_dht_master_get_callback(const DataContainer * cont,
+static void find_k_nodes_dht_master_get_callback(const HashCode160 * key,
+ const DataContainer * cont,
FindKNodesContext * fnc) {
unsigned int pos;
unsigned int dataLength;
- char * value;
+ const PeerIdentity * value;
#if DEBUG_DHT
EncName enc;
#endif
ENTER();
- dataLength = cont->dataLength;
- value = cont->data;
+ dataLength = ntohl(cont->size) - sizeof(DataContainer);
+ value = (const PeerIdentity*) &cont[1];
/* parse value, try to DHT-ping the new peers
(to add it to the table; if that succeeds
@@ -2034,10 +2056,10 @@
"DHT_findValue");
return;
}
- for (pos = 0;pos<dataLength;pos+=sizeof(PeerIdentity)) {
- PeerIdentity * msg;
+ for (pos = 0;pos<dataLength/sizeof(PeerIdentity);pos++) {
+ const PeerIdentity * msg;
- msg = (PeerIdentity*) &value[pos];
+ msg = &value[pos];
#if DEBUG_DHT
IFLOG(LOG_DEBUG,
hash2enc(&msg->hashPubKey,
@@ -2059,6 +2081,9 @@
}
}
+static void find_k_nodes_dht_master_get_complete(FindKNodesContext * fnc) {
+ SEMAPHORE_UP(fnc->async_handle_done);
+}
/**
* In the induced sub-structure for the given 'table', find k nodes
@@ -2164,13 +2189,17 @@
#endif
/* try finding peers responsible for this table using
the master table */
+ fnc->async_handle_done
+ = SEMAPHORE_NEW(0);
fnc->async_handle
= dht_get_async_start(&masterTableId,
- table,
- timeout,
- fnc->k, /* level of parallelism proportional to
- number of peers we're looking for */
- &find_k_nodes_dht_master_get_callback,
+ 0, /* type */
+ 1, /* key count */
+ table, /* keys */
+ timeout,
+
(DataProcessor)&find_k_nodes_dht_master_get_callback,
+ fnc,
+ (DHT_OP_Complete)
&find_k_nodes_dht_master_get_complete,
fnc);
}
return fnc;
@@ -2188,6 +2217,8 @@
/* stop async DHT get */
ENTER();
if (fnc->async_handle != NULL) {
+ SEMAPHORE_DOWN(fnc->async_handle_done);
+ SEMAPHORE_FREE(fnc->async_handle_done);
dht_get_async_stop(fnc->async_handle);
fnc->async_handle = NULL;
}
@@ -2259,8 +2290,7 @@
record->confirmedReplicas+1);
record->replicas[record->confirmedReplicas-1] = *peer;
if (record->callback != NULL)
- record->callback(peer,
- record->closure);
+ record->callback(record->closure);
}
}
MUTEX_UNLOCK(&record->lock);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r164 - GNUnet/src/applications/dht/module,
grothoff <=