[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r1842 - in GNUnet/src/applications: fs/ecrs fs/lib fs/modul
From: |
grothoff |
Subject: |
[GNUnet-SVN] r1842 - in GNUnet/src/applications: fs/ecrs fs/lib fs/module gap |
Date: |
Sun, 21 Aug 2005 23:02:18 -0700 (PDT) |
Author: grothoff
Date: 2005-08-21 23:02:16 -0700 (Sun, 21 Aug 2005)
New Revision: 1842
Modified:
GNUnet/src/applications/fs/ecrs/download.c
GNUnet/src/applications/fs/lib/fslib.c
GNUnet/src/applications/fs/module/fs.c
GNUnet/src/applications/fs/module/querymanager.c
GNUnet/src/applications/gap/gap.c
Log:
fixing bug in delays and waits that resulted in downloads (esp. via loopback)
being much slower than necessary
Modified: GNUnet/src/applications/fs/ecrs/download.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/download.c 2005-08-22 04:18:36 UTC (rev
1841)
+++ GNUnet/src/applications/fs/ecrs/download.c 2005-08-22 06:02:16 UTC (rev
1842)
@@ -392,6 +392,8 @@
struct FS_SEARCH_CONTEXT * sctx;
+ PTHREAD_T requestThread;
+
} RequestManager;
static int nodeReceive(const HashCode512 * query,
@@ -409,6 +411,7 @@
RequestManager * rm;
rm = MALLOC(sizeof(RequestManager));
+ PTHREAD_GET_SELF(&rm->requestThread);
rm->abortFlag
= NO;
rm->lastDET
@@ -460,7 +463,8 @@
rm->requestListSize,
0);
FS_SEARCH_destroyContext(rm->sctx);
- MUTEX_DESTROY(&rm->lock);
+ MUTEX_DESTROY(&rm->lock);
+ PTHREAD_REL_SELF(&rm->requestThread);
FREE(rm);
}
@@ -860,7 +864,7 @@
hash2enc(query,
&enc));
LOG(LOG_DEBUG,
- "Receiving reply to query %s\n",
+ "Receiving reply to query `%s'\n",
&enc);
#endif
@@ -923,6 +927,8 @@
requestManagerEndgame(node->ctx->rm);
}
}
+ PTHREAD_KILL(&node->ctx->rm->requestThread,
+ SIGALRM);
FREE(data);
FREE(node);
return OK;
@@ -1090,7 +1096,7 @@
TTL_DECREMENT = rm->requestList[0]->node->ctx->TTL_DECREMENT;
for (i=0;i<rm->requestListIndex;i++) {
- if (rm->requestList[i]->lastTimeout >= now + TTL_DECREMENT) {
+ if (rm->requestList[i]->lastTimeout >= now - TTL_DECREMENT) {
pending++;
} else if (rm->requestList[i]->searchHandle != NULL) {
FS_stop_search(rm->sctx,
@@ -1117,6 +1123,8 @@
(0 == randomi(rm->requestListIndex *
pOCWCubed)) ) {
delta = (rm->requestList[j]->lastTimeout - now) + 10 * cronMILLIS;
+ LOG(LOG_DEBUG,
+ "Requesting!\n");
issueRequest(rm, j);
pending++;
} else {
@@ -1159,10 +1167,12 @@
NodeClosure * top;
FileIdentifier fid;
+#if DEBUG_DOWNLOAD
LOG(LOG_DEBUG,
"`%s' running for file `%s'\n",
__FUNCTION__,
filename);
+#endif
GNUNET_ASSERT(filename != NULL);
fid = uri->data.chk;
if (! ECRS_isFileUri(uri)) {
Modified: GNUnet/src/applications/fs/lib/fslib.c
===================================================================
--- GNUnet/src/applications/fs/lib/fslib.c 2005-08-22 04:18:36 UTC (rev
1841)
+++ GNUnet/src/applications/fs/lib/fslib.c 2005-08-22 06:02:16 UTC (rev
1842)
@@ -55,6 +55,7 @@
static void * processReplies(SEARCH_CONTEXT * ctx) {
CS_MESSAGE_HEADER * hdr;
int i;
+ int matched;
CS_fs_reply_content_MESSAGE * rep;
HashCode512 query;
unsigned int size;
@@ -65,6 +66,10 @@
hdr = NULL;
if (OK == readFromSocket(ctx->sock,
&hdr)) {
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: received message from gnunetd\n");
+#endif
delay = 100 * cronMILLIS;
/* verify hdr, if reply, process, otherwise
signal protocol problem; if ok, find
@@ -84,12 +89,14 @@
FREE(hdr);
continue;
}
+ matched = 0;
MUTEX_LOCK(ctx->lock);
for (i=ctx->handleCount-1;i>=0;i--) {
if (equalsHashCode512(&query,
&ctx->handles[i]->req->query[0])) {
Datastore_Value * value;
+ matched++;
if (ctx->handles[i]->callback != NULL) {
value = MALLOC(sizeof(Datastore_Value) + size);
value->size = htonl(size + sizeof(Datastore_Value));
@@ -111,7 +118,17 @@
}
}
MUTEX_UNLOCK(ctx->lock);
+#if DEBUG_FSLIB
+ if (matched == 0)
+ LOG(LOG_DEBUG,
+ "FSLIB: received content but have no pending request\n");
+#endif
} else {
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: error communicating with gnunetd; sleeping for %ums\n",
+ delay);
+#endif
gnunet_util_sleep(delay);
delay *= 2;
if (delay > 5 * cronSECONDS)
@@ -181,6 +198,11 @@
#endif
ret = MALLOC(sizeof(SEARCH_HANDLE));
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: start search (%p)\n",
+ ret);
+#endif
req = MALLOC(sizeof(CS_fs_request_search_MESSAGE) + (keyCount-1) *
sizeof(HashCode512));
req->header.size = htons(sizeof(CS_fs_request_search_MESSAGE) + (keyCount-1)
* sizeof(HashCode512));
req->header.type = htons(CS_PROTO_gap_QUERY_START);
@@ -217,6 +239,11 @@
ret);
return NULL;
}
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: search started (%p)\n",
+ ret);
+#endif
return ret;
}
@@ -227,6 +254,11 @@
SEARCH_HANDLE * handle) {
int i;
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: stop search (%p)\n",
+ handle);
+#endif
handle->req->header.type = htons(CS_PROTO_gap_QUERY_STOP);
writeToSocket(ctx->sock,
&handle->req->header);
@@ -238,6 +270,11 @@
}
MUTEX_UNLOCK(ctx->lock);
FREE(handle->req);
+#if DEBUG_FSLIB
+ LOG(LOG_DEBUG,
+ "FSLIB: search stopped (%p)\n",
+ handle);
+#endif
FREE(handle);
}
Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c 2005-08-22 04:18:36 UTC (rev
1841)
+++ GNUnet/src/applications/fs/module/fs.c 2005-08-22 06:02:16 UTC (rev
1842)
@@ -86,48 +86,23 @@
*/
static DHT_TableId dht_table;
-/**
- * Store an item in the datastore.
- *
- * @param query the unique identifier of the item
- * @param value the value to store
- * @param prio how much does our routing code value
- * this datum?
- * @return OK if the value could be stored,
- * NO if the value verifies but is not stored,
- * SYSERR if the value is malformed
- */
-static int gapPut(void * closure,
- const HashCode512 * query,
- const DataContainer * value,
- unsigned int prio) {
+static Datastore_Value *
+gapWrapperToDatastoreValue(const DataContainer * value,
+ int prio) {
Datastore_Value * dv;
GapWrapper * gw;
unsigned int size;
- int ret;
- HashCode512 hc;
cron_t et;
cron_t now;
-#if DEBUG_FS
- EncName enc;
-#endif
if (ntohl(value->size) < sizeof(GapWrapper)) {
BREAK();
- return SYSERR;
+ return NULL;
}
gw = (GapWrapper*) value;
size = ntohl(gw->dc.size)
- sizeof(GapWrapper)
+ sizeof(Datastore_Value);
- if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
- (DBlock*)&gw[1],
- &hc)) ||
- (! equalsHashCode512(&hc, query)) ) {
- BREAK(); /* value failed verification! */
- return SYSERR;
- }
-
dv = MALLOC(size);
dv->size = htonl(size);
dv->type = htonl(getTypeOfBlock(size - sizeof(Datastore_Value),
@@ -146,6 +121,44 @@
memcpy(&dv[1],
&gw[1],
size - sizeof(Datastore_Value));
+ return dv;
+}
+
+/**
+ * Store an item in the datastore.
+ *
+ * @param query the unique identifier of the item
+ * @param value the value to store
+ * @param prio how much does our routing code value
+ * this datum?
+ * @return OK if the value could be stored,
+ * NO if the value verifies but is not stored,
+ * SYSERR if the value is malformed
+ */
+static int gapPut(void * closure,
+ const HashCode512 * query,
+ const DataContainer * value,
+ unsigned int prio) {
+ Datastore_Value * dv;
+ GapWrapper * gw;
+ unsigned int size;
+ int ret;
+ HashCode512 hc;
+#if DEBUG_FS
+ EncName enc;
+#endif
+
+ dv = gapWrapperToDatastoreValue(value, prio);
+ if (dv == NULL)
+ return SYSERR;
+ size = ntohl(gw->dc.size) - sizeof(GapWrapper);
+ if ( (OK != getQueryFor(size,
+ (DBlock*) &gw[1],
+ &hc)) ||
+ (! equalsHashCode512(&hc, query)) ) {
+ BREAK(); /* value failed verification! */
+ return SYSERR;
+ }
if (YES != isDatumApplicable(ntohl(dv->type),
ntohl(dv->size) - sizeof(Datastore_Value),
(DBlock*) &dv[1],
@@ -201,61 +214,6 @@
}
/**
- * Process a query from the client. Forwards to the network.
- *
- * @return SYSERR if the TCP connection should be closed, otherwise OK
- */
-static int csHandleRequestQueryStart(ClientHandle sock,
- const CS_MESSAGE_HEADER * req) {
- const CS_fs_request_search_MESSAGE * rs;
- unsigned int keyCount;
-#if DEBUG_FS
- EncName enc;
-#endif
-
- if (ntohs(req->size) < sizeof(CS_fs_request_search_MESSAGE)) {
- BREAK();
- return SYSERR;
- }
- rs = (const CS_fs_request_search_MESSAGE*) req;
-#if DEBUG_FS
- IFLOG(LOG_DEBUG,
- hash2enc(&rs->query[0],
- &enc));
- LOG(LOG_DEBUG,
- "FS received QUERY START (query: `%s')\n",
- &enc);
-#endif
- trackQuery(&rs->query[0],
- ntohl(rs->type),
- sock);
- keyCount = 1 + (ntohs(req->size) - sizeof(CS_fs_request_search_MESSAGE)) /
sizeof(HashCode512);
- gap->get_start(ntohl(rs->type),
- ntohl(rs->anonymityLevel),
- keyCount,
- &rs->query[0],
- ntohll(rs->expiration),
- ntohl(rs->prio));
- if ( (ntohl(rs->anonymityLevel) == 0) &&
- (dht != NULL) ) {
- DHT_GET_CLS * cls;
-
- cls = MALLOC(sizeof(DHT_GET_CLS));
- cls->prio = ntohl(rs->prio);
- cls->rec = dht->get_start(&dht_table,
- ntohl(rs->type),
- keyCount,
- &rs->query[0],
- ntohll(rs->expiration),
- (DataProcessor) &get_result_callback,
- cls,
- (DHT_OP_Complete) &get_complete_callback,
- cls);
- }
- return OK;
-}
-
-/**
* Stop processing a query.
*
* @return SYSERR if the TCP connection should be closed, otherwise OK
@@ -561,7 +519,7 @@
* Process a client request unindex content.
*/
static int csHandleCS_fs_request_unindex_MESSAGE(ClientHandle sock,
- const CS_MESSAGE_HEADER * req) {
+ const CS_MESSAGE_HEADER * req)
{
int ret;
CS_fs_request_unindex_MESSAGE * ru;
@@ -586,7 +544,7 @@
* data is indexed.
*/
static int csHandleCS_fs_request_test_index_MESSAGEed(ClientHandle sock,
- const CS_MESSAGE_HEADER * req) {
+ const CS_MESSAGE_HEADER *
req) {
int ret;
RequestTestindex * ru;
@@ -946,6 +904,24 @@
return ret;
}
+static int replyHashFunction(const DataContainer * content,
+ HashCode512 * id) {
+ const GapWrapper * gw;
+ unsigned int size;
+
+ size = ntohl(content->size);
+ if (size < sizeof(GapWrapper)) {
+ BREAK();
+ memset(id, 0, sizeof(HashCode512));
+ return SYSERR;
+ }
+ gw = (const GapWrapper*) content;
+ hash(&gw[1],
+ size - sizeof(GapWrapper),
+ id);
+ return OK;
+}
+
static int uniqueReplyIdentifier(const DataContainer * content,
unsigned int type,
const HashCode512 * primaryKey) {
@@ -978,21 +954,97 @@
return NO;
}
-static int replyHashFunction(const DataContainer * content,
- HashCode512 * id) {
- const GapWrapper * gw;
- unsigned int size;
+static int fastPathProcessor(const HashCode512 * query,
+ const DataContainer * value,
+ void * cls) {
+ int * done = cls;
+ Datastore_Value * dv;
- size = ntohl(content->size);
- if (size < sizeof(GapWrapper)) {
+ dv = gapWrapperToDatastoreValue(value, 0);
+ if (dv == NULL)
+ return SYSERR;
+ processResponse(query,
+ dv);
+ if (YES == uniqueReplyIdentifier(value,
+ ntohl(dv->type),
+ query))
+ *done = YES;
+ FREE(dv);
+ return OK;
+}
+
+/**
+ * Process a query from the client. Forwards to the network.
+ *
+ * @return SYSERR if the TCP connection should be closed, otherwise OK
+ */
+static int csHandleRequestQueryStart(ClientHandle sock,
+ const CS_MESSAGE_HEADER * req) {
+ const CS_fs_request_search_MESSAGE * rs;
+ unsigned int keyCount;
+#if DEBUG_FS
+ EncName enc;
+#endif
+ unsigned int type;
+ int done;
+
+ if (ntohs(req->size) < sizeof(CS_fs_request_search_MESSAGE)) {
BREAK();
- memset(id, 0, sizeof(HashCode512));
return SYSERR;
}
- gw = (const GapWrapper*) content;
- hash(&gw[1],
- size - sizeof(GapWrapper),
- id);
+ rs = (const CS_fs_request_search_MESSAGE*) req;
+#if DEBUG_FS
+ IFLOG(LOG_DEBUG,
+ hash2enc(&rs->query[0],
+ &enc));
+ LOG(LOG_DEBUG,
+ "FS received QUERY START (query: `%s')\n",
+ &enc);
+#endif
+ type = ntohl(rs->type);
+ trackQuery(&rs->query[0],
+ type,
+ sock);
+ keyCount = 1 + (ntohs(req->size) - sizeof(CS_fs_request_search_MESSAGE)) /
sizeof(HashCode512);
+
+ /* try a "fast path" avoiding gap/dht if unique reply is locally available */
+ done = NO;
+ gapGet(NULL,
+ type,
+ EXTREME_PRIORITY,
+ keyCount,
+ &rs->query[0],
+ &fastPathProcessor,
+ &done);
+ if (done == YES) {
+#if DEBUG_FS
+ LOG(LOG_DEBUG,
+ "FS successfully took GAP shortcut.\n");
+#endif
+ return OK;
+ }
+ gap->get_start(type,
+ ntohl(rs->anonymityLevel),
+ keyCount,
+ &rs->query[0],
+ ntohll(rs->expiration),
+ ntohl(rs->prio));
+ if ( (ntohl(rs->anonymityLevel) == 0) &&
+ (dht != NULL) ) {
+ DHT_GET_CLS * cls;
+
+ cls = MALLOC(sizeof(DHT_GET_CLS));
+ cls->prio = ntohl(rs->prio);
+ cls->rec = dht->get_start(&dht_table,
+ type,
+ keyCount,
+ &rs->query[0],
+ ntohll(rs->expiration),
+ (DataProcessor) &get_result_callback,
+ cls,
+ (DHT_OP_Complete) &get_complete_callback,
+ cls);
+ }
return OK;
}
Modified: GNUnet/src/applications/fs/module/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/module/querymanager.c 2005-08-22 04:18:36 UTC
(rev 1841)
+++ GNUnet/src/applications/fs/module/querymanager.c 2005-08-22 06:02:16 UTC
(rev 1842)
@@ -30,7 +30,7 @@
#include "fs.h"
#include "querymanager.h"
-#define DEBUG_QUERYMANAGER NO
+#define DEBUG_QUERYMANAGER YES
typedef struct {
HashCode512 query;
Modified: GNUnet/src/applications/gap/gap.c
===================================================================
--- GNUnet/src/applications/gap/gap.c 2005-08-22 04:18:36 UTC (rev 1841)
+++ GNUnet/src/applications/gap/gap.c 2005-08-22 06:02:16 UTC (rev 1842)
@@ -44,7 +44,7 @@
#include "gnunet_traffic_service.h"
#include "gnunet_topology_service.h"
-#define DEBUG_GAP NO
+#define DEBUG_GAP YES
#define EXTRA_CHECKS YES
@@ -1055,9 +1055,17 @@
ite = &ROUTING_indTable_[computeRoutingIndex(primaryKey)];
if (! equalsHashCode512(&ite->primaryKey,
primaryKey) ) {
+#if DEBUG_GAP
+ LOG(LOG_DEBUG,
+ "GAP: Dropping reply, routing table has no query associated with it
(anymore)\n");
+#endif
return; /* we don't care for the reply (anymore) */
}
if (YES == ite->successful_local_lookup_in_delay_loop) {
+#if DEBUG_GAP
+ LOG(LOG_DEBUG,
+ "GAP: Dropping reply, found reply locally during delay\n");
+#endif
return; /* wow, really bad concurrent DB lookup and processing for
the same query. Well, at least we should not also
queue the delayed reply twice... */
@@ -1139,7 +1147,17 @@
const PeerIdentity * sender) {
unsigned int i;
cron_t now;
+#if DEBUG__GAP
+ EncName enc;
+ IFLOG(LOG_DEBUG,
+ hash2enc(query,
+ &enc));
+ LOG(LOG_DEBUG,
+ "GAP: Queueing query '%s' in slot %p\n",
+ &enc,
+ ite);
+#endif
GNUNET_ASSERT(sender != NULL); /* do NOT add to RT for local clients! */
cronTime(&now);
if (mode == ITE_REPLACE) {
@@ -1576,14 +1594,15 @@
ite = &ROUTING_indTable_[computeRoutingIndex(&query->queries[0])];
MUTEX_LOCK(&lookup_exclusion);
+ i = -1;
if (sender != NULL) {
if ((policy & QUERY_INDIRECT) > 0) {
- needsForwarding(&query->queries[0],
- ttl,
- prio,
- sender,
- &isRouted,
- &doForward);
+ i = needsForwarding(&query->queries[0],
+ ttl,
+ prio,
+ sender,
+ &isRouted,
+ &doForward);
} else {
isRouted = NO;
doForward = YES;
@@ -1602,10 +1621,11 @@
hash2enc(&query->queries[0],
&enc));
LOG(LOG_DEBUG,
- "GAP is executing request for `%s': %s %s\n",
+ "GAP is executing request for `%s':%s%s (%d)\n",
&enc,
- doForward ? "forwarding" : "",
- isRouted ? "routing" : "");
+ doForward ? " forwarding" : "",
+ isRouted ? " routing" : "",
+ i);
#endif
cls.values = NULL;
cls.valueCount = 0;
@@ -2069,8 +2089,14 @@
if ((policy & QUERY_DROPMASK) == 0) {
FREE(qmsg);
#if DEBUG_GAP
+ if (sender != NULL) {
+ IFLOG(LOG_DEBUG,
+ hash2enc(&sender->hashPubKey,
+ &enc));
+ }
LOG(LOG_DEBUG,
- "Dropping query, policy decided that this peer is too busy.\n");
+ "Dropping query from %s, policy decided that this peer is too busy.\n",
+ sender == NULL ? "localhost" : &enc);
#endif
return OK; /* straight drop. */
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r1842 - in GNUnet/src/applications: fs/ecrs fs/lib fs/module gap,
grothoff <=