[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r9671 - GNUnet/src/applications/fs/gap
From: |
gnunet |
Subject: |
[GNUnet-SVN] r9671 - GNUnet/src/applications/fs/gap |
Date: |
Wed, 2 Dec 2009 19:20:14 +0100 |
Author: nevans
Date: 2009-12-02 19:20:14 +0100 (Wed, 02 Dec 2009)
New Revision: 9671
Removed:
GNUnet/src/applications/fs/gap/dv_fs.c
GNUnet/src/applications/fs/gap/dv_querymanager.c
GNUnet/src/applications/fs/gap/dv_querymanager.h
GNUnet/src/applications/fs/gap/fs_dv_dht.c
GNUnet/src/applications/fs/gap/fs_dv_dht.h
Log:
removing old files
Deleted: GNUnet/src/applications/fs/gap/dv_fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_fs.c 2009-12-02 17:52:51 UTC (rev
9670)
+++ GNUnet/src/applications/fs/gap/dv_fs.c 2009-12-02 18:20:14 UTC (rev
9671)
@@ -1,1176 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
-*/
-
-/**
- * @file applications/fs/gap/dv_fs.c
- * @brief functions for handling CS and P2P file-sharing requests
- * @author Christian Grothoff, Nathan Evans
- *
- * This file contains all of the entry points to the file-sharing
- * module.
- *
- * TODO:
- * - integrate with migration submodule
- * - make sure we do an immediate PUSH for DHT stuff
- * given to us with anonymity_level zero.
- */
-
-#include "platform.h"
-#include "gnunet_util.h"
-#include "gnunet_directories.h"
-#include "gnunet_protocols.h"
-#include "gnunet_datastore_service.h"
-#include "gnunet_dht_service.h"
-#include "gnunet_identity_service.h"
-#include "gnunet_stats_service.h"
-#include "gnunet_traffic_service.h"
-#include "ecrs_core.h"
-#include "anonymity.h"
-#include "fs.h"
-#include "fs_dv_dht.h"
-#include "gap.h"
-#include "migration.h"
-#include "dv_querymanager.h"
-#include "ondemand.h"
-#include "plan.h"
-#include "pid_table.h"
-#include "shared.h"
-#include "gnunet_dv_service.h"
-
-
-#define DEBUG_FS GNUNET_NO
-
-/**
- * Lock shared between all C files in this
- * directory.
- */
-struct GNUNET_Mutex *GNUNET_FS_lock;
-
-static struct GNUNET_GE_Context *ectx;
-
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
-static GNUNET_Identity_ServiceAPI *identity;
-
-static GNUNET_Stats_ServiceAPI *stats;
-
-static GNUNET_DV_ServiceAPI *dv_api;
-
-static GNUNET_Datastore_ServiceAPI *datastore;
-
-static int active_migration;
-
-static int stat_gap_query_received;
-
-static int stat_gap_query_drop_busy;
-
-static int stat_gap_content_received;
-
-static int stat_gap_trust_awarded;
-
-static int stat_dv_replies_sent;
-
-/**
- * Hard CPU limit
- */
-static unsigned long long hardCPULimit;
-
-/**
- * Hard network upload limit.
- */
-static unsigned long long hardUpLimit;
-
-
-struct DV_send_closure
-{
- struct RequestList *request;
- const P2P_gap_query_MESSAGE *message;
-};
-
-/* ********************* CS handlers ********************** */
-
-/**
- * Process a request to insert content from the client.
- *
- * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
- */
-static int
-handle_cs_insert_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- const CS_fs_request_insert_MESSAGE *ri;
- GNUNET_DatastoreValue *datum;
- struct GNUNET_GE_Context *cectx;
- GNUNET_HashCode query;
- int ret;
-#if DEBUG_FS
- GNUNET_EncName enc;
-#endif
-
- ri = (const CS_fs_request_insert_MESSAGE *) req;
- if ((ntohs (req->size) < sizeof (CS_fs_request_insert_MESSAGE)) ||
- (GNUNET_OK !=
- GNUNET_EC_file_block_check_and_get_query (ntohs (ri->header.size) -
- sizeof
-
(CS_fs_request_insert_MESSAGE),
- (const GNUNET_EC_DBlock *)
- &ri[1], GNUNET_YES, &query)))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
- ntohs (req->size) -
- sizeof (CS_fs_request_insert_MESSAGE));
- datum->size =
- htonl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
- sizeof (CS_fs_request_insert_MESSAGE));
- datum->expiration_time = ri->expiration;
- datum->priority = ri->priority;
- datum->anonymity_level = ri->anonymity_level;
- datum->type =
- htonl (GNUNET_EC_file_block_get_type
- (ntohs (ri->header.size) - sizeof (CS_fs_request_insert_MESSAGE),
- (const GNUNET_EC_DBlock *) &ri[1]));
-#if DEBUG_FS
- IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- GNUNET_hash_to_enc (&query, &enc));
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received REQUEST INSERT (query: `%s', type: %u, priority
%u)\n",
- &enc, ntohl (datum->type), ntohl (ri->priority));
-#endif
- memcpy (&datum[1],
- &ri[1], ntohs (req->size) - sizeof (CS_fs_request_insert_MESSAGE));
- ret = datastore->putUpdate (&query, datum);
- if (ret == GNUNET_NO)
- {
- cectx = coreAPI->cs_log_context_create (sock);
- GNUNET_GE_LOG (cectx,
- GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
- _("Datastore full.\n"));
- GNUNET_GE_free_context (cectx);
- }
- GNUNET_free (datum);
- return coreAPI->cs_send_value (sock, ret);
-}
-
-/**
- * Process a request to symlink a file
- */
-static int
-handle_cs_init_index_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- const CS_fs_request_init_index_MESSAGE *ri;
- struct GNUNET_GE_Context *cectx;
- int fnLen;
- int ret;
- char *fn;
-
- fnLen = ntohs (req->size) - sizeof (CS_fs_request_init_index_MESSAGE);
- if ((ntohs (req->size) < sizeof (CS_fs_request_init_index_MESSAGE))
-#if WINDOWS
- || (fnLen > _MAX_PATH)
-#endif
- )
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- ri = (const CS_fs_request_init_index_MESSAGE *) req;
- fn = GNUNET_malloc (fnLen + 1);
- strncpy (fn, (const char *) &ri[1], fnLen + 1);
- fn[fnLen] = 0;
- cectx = coreAPI->cs_log_context_create (sock);
- ret =
- GNUNET_FS_ONDEMAND_index_prepare_with_symlink (cectx, &ri->fileId, fn);
- GNUNET_GE_free_context (cectx);
- GNUNET_free (fn);
-#if DEBUG_FS
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Sending confirmation (%s) of index initialization request to
client\n",
- ret == GNUNET_OK ? "success" : "failure");
-#endif
- return coreAPI->cs_send_value (sock, ret);
-}
-
-/**
- * Process a request to index content from the client.
- *
- * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
- */
-static int
-handle_cs_index_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- int ret;
- const CS_fs_request_index_MESSAGE *ri;
- struct GNUNET_GE_Context *cectx;
-#if DEBUG_FS
- GNUNET_HashCode hc;
- GNUNET_EncName enc;
-#endif
-
- if (ntohs (req->size) < sizeof (CS_fs_request_index_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- cectx = coreAPI->cs_log_context_create (sock);
- ri = (const CS_fs_request_index_MESSAGE *) req;
-#if DEBUG_FS
- GNUNET_EC_file_block_get_query ((const GNUNET_EC_DBlock *) &ri[1],
- ntohs (ri->header.size) -
- sizeof (CS_fs_request_index_MESSAGE), &hc);
- IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- GNUNET_hash_to_enc (&hc, &enc));
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received REQUEST INDEX (query: `%s', priority %u)\n",
- &enc, ntohl (ri->priority));
-#endif
- ret = GNUNET_FS_ONDEMAND_add_indexed_content (cectx,
- datastore,
- ntohl (ri->priority),
- GNUNET_ntohll
- (ri->expiration),
- GNUNET_ntohll
- (ri->fileOffset),
- ntohl (ri->anonymity_level),
- &ri->fileId,
- ntohs (ri->header.size) -
- sizeof
- (CS_fs_request_index_MESSAGE),
- (const GNUNET_EC_DBlock *)
- &ri[1]);
- GNUNET_GE_free_context (cectx);
-#if DEBUG_FS
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Sending confirmation (%s) of index request to client\n",
- ret == GNUNET_OK ? "success" : "failure");
-#endif
- return coreAPI->cs_send_value (sock, ret);
-}
-
-/**
- * Process a query to delete content.
- *
- * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
- */
-static int
-handle_cs_delete_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- int ret;
- const CS_fs_request_delete_MESSAGE *rd;
- GNUNET_DatastoreValue *value;
- GNUNET_HashCode query;
- unsigned int type;
-#if DEBUG_FS
- GNUNET_EncName enc;
-#endif
-
- if (ntohs (req->size) < sizeof (CS_fs_request_delete_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- rd = (const CS_fs_request_delete_MESSAGE *) req;
- value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
- ntohs (req->size) -
- sizeof (CS_fs_request_delete_MESSAGE));
- value->size =
- ntohl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
- sizeof (CS_fs_request_delete_MESSAGE));
- type =
- GNUNET_EC_file_block_get_type (ntohs (rd->header.size) -
- sizeof (CS_fs_request_delete_MESSAGE),
- (const GNUNET_EC_DBlock *) &rd[1]);
- value->type = htonl (type);
- memcpy (&value[1],
- &rd[1], ntohs (req->size) - sizeof (CS_fs_request_delete_MESSAGE));
- if (GNUNET_OK !=
- GNUNET_EC_file_block_check_and_get_query (ntohs (rd->header.size) -
- sizeof
- (CS_fs_request_delete_MESSAGE),
- (const GNUNET_EC_DBlock *)
- &rd[1], GNUNET_NO, &query))
- {
- GNUNET_free (value);
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
-#if DEBUG_FS
- IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- GNUNET_hash_to_enc (&query, &enc));
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received REQUEST DELETE (query: `%s', type: %u)\n", &enc,
- type);
-#endif
- GNUNET_mutex_lock (GNUNET_FS_lock);
- value->type = htonl (GNUNET_ECRS_BLOCKTYPE_ANY);
- ret = datastore->get (&query, type,
-
&GNUNET_FS_HELPER_complete_value_from_database_callback,
- value);
- if ((0 < ret) && (value->type != htonl (GNUNET_ECRS_BLOCKTYPE_ANY)))
- {
- ret = datastore->del (&query, value);
- }
- else
- { /* not found */
- ret = GNUNET_SYSERR;
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- GNUNET_free (value);
-#if DEBUG_FS
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Sending confirmation (%s) of delete request to client\n",
- ret != GNUNET_SYSERR ? "success" : "failure");
-#endif
- return coreAPI->cs_send_value (sock, ret);
-}
-
-/**
- * Process a client request unindex content.
- */
-static int
-handle_cs_unindex_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- int ret;
- const CS_fs_request_unindex_MESSAGE *ru;
- struct GNUNET_GE_Context *cectx;
-
- cectx = coreAPI->cs_log_context_create (sock);
- if (ntohs (req->size) != sizeof (CS_fs_request_unindex_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- GNUNET_GE_BREAK (cectx, 0);
- GNUNET_GE_free_context (cectx);
- return GNUNET_SYSERR;
- }
- ru = (const CS_fs_request_unindex_MESSAGE *) req;
-#if DEBUG_FS
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received REQUEST UNINDEX\n");
-#endif
- ret = GNUNET_FS_ONDEMAND_delete_indexed_content (cectx,
- datastore,
- ntohl (ru->blocksize),
- &ru->fileId);
- GNUNET_GE_free_context (cectx);
- return coreAPI->cs_send_value (sock, ret);
-}
-
-/**
- * Process a client request to test if certain
- * data is indexed.
- */
-static int
-handle_cs_test_indexed_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- int ret;
- const CS_fs_request_test_index_MESSAGE *ru;
-
- if (ntohs (req->size) != sizeof (CS_fs_request_test_index_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- ru = (const CS_fs_request_test_index_MESSAGE *) req;
-#if DEBUG_FS
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received REQUEST TESTINDEXED\n");
-#endif
- ret = GNUNET_FS_ONDEMAND_test_indexed_file (datastore, &ru->fileId);
- return coreAPI->cs_send_value (sock, ret);
-}
-
-struct FPPClosure
-{
- struct GNUNET_ClientHandle *sock;
- struct GNUNET_MultiHashMap *seen;
- unsigned int processed;
- int have_more;
-};
-
-/**
- * Any response that we get should be passed
- * back to the client. If the response is unique,
- * we should abort the iteration (return GNUNET_SYSERR).
- */
-static int
-fast_path_processor (const GNUNET_HashCode * key,
- const GNUNET_DatastoreValue *
- value, void *closure, unsigned long long uid)
-{
- struct FPPClosure *cls = closure;
- GNUNET_HashCode hc;
- unsigned int type;
- int ret;
-
- if (cls->processed > GNUNET_GAP_MAX_SYNC_PROCESSED)
- {
- cls->have_more = GNUNET_YES;
- return GNUNET_SYSERR;
- }
- type = ntohl (((const GNUNET_EC_DBlock *) &value[1])->type);
- ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
- key, value, cls->sock, NULL, &hc);
- if (ret == GNUNET_NO)
- return GNUNET_NO; /* delete + continue */
- cls->processed++;
- if (ret != GNUNET_OK)
- cls->have_more = GNUNET_YES; /* switch to async processing */
- if ((type == GNUNET_ECRS_BLOCKTYPE_DATA) || (ret != GNUNET_OK))
- return GNUNET_SYSERR; /* unique response or client can take no more
*/
- if (cls->seen == NULL)
- cls->seen = GNUNET_multi_hash_map_create (8);
- GNUNET_multi_hash_map_put (cls->seen,
- &hc,
- NULL, GNUNET_MultiHashMapOption_UNIQUE_FAST);
- return GNUNET_OK;
-}
-
-
-/**
- * Process a query from the client. Forwards to the network.
- *
- * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
- */
-static int
-handle_cs_query_start_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- static GNUNET_PeerIdentity all_zeros;
- struct FPPClosure fpp;
- const CS_fs_request_search_MESSAGE *rs;
- unsigned int keyCount;
- unsigned int type;
- unsigned int anonymityLevel;
- int have_target;
-#if DEBUG_FS
- GNUNET_EncName enc;
-#endif
-
- if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- rs = (const CS_fs_request_search_MESSAGE *) req;
- type = ntohl (rs->type);
- /* try "fast path" avoiding gap/dht if unique reply is locally available */
-#if DEBUG_FS
- IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- GNUNET_hash_to_enc (&rs->query[0], &enc));
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "FS received QUERY (query: `%s', type: %u)\n", &enc, type);
-#endif
- fpp.sock = sock;
- fpp.seen = NULL;
- fpp.have_more = GNUNET_NO;
- fpp.processed = 0;
- if (GNUNET_OK ==
- coreAPI->cs_send_message_now_test (sock,
- GNUNET_GAP_ESTIMATED_DATA_SIZE,
- GNUNET_NO))
- {
- if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
- {
- if (((1 == datastore->get (&rs->query[0],
- type, &fast_path_processor, &fpp)) ||
- (1 == datastore->get (&rs->query[0],
- GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
- &fast_path_processor, &fpp))) &&
- (fpp.have_more == GNUNET_NO))
- goto CLEANUP;
- }
- else
- datastore->get (&rs->query[0], type, &fast_path_processor, &fpp);
- }
- else
- fpp.have_more = GNUNET_YES;
- anonymityLevel = ntohl (rs->anonymity_level);
- keyCount =
- 1 + (ntohs (req->size) -
- sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
- have_target =
- memcmp (&all_zeros, &rs->target, sizeof (GNUNET_PeerIdentity)) != 0;
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "in dv_fs, have_target is %d", have_target);
-
- GNUNET_DV_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount,
- anonymityLevel, type, sock,
- have_target ? &rs->target : NULL,
- fpp.seen, fpp.have_more);
-CLEANUP:
- if (fpp.seen != NULL)
- GNUNET_multi_hash_map_destroy (fpp.seen);
- return GNUNET_OK;
-}
-
-/**
- * Process a stop request from the client.
- *
- * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
- */
-static int
-handle_cs_query_stop_request (struct GNUNET_ClientHandle *sock,
- const GNUNET_MessageHeader * req)
-{
- const CS_fs_request_search_MESSAGE *rs;
- unsigned int keyCount;
- unsigned int type;
- unsigned int anonymityLevel;
-
- if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
- {
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- rs = (const CS_fs_request_search_MESSAGE *) req;
- type = ntohl (rs->type);
- anonymityLevel = ntohl (rs->anonymity_level);
- keyCount =
- 1 + (ntohs (req->size) -
- sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
- GNUNET_DV_FS_QUERYMANAGER_stop_query (&rs->query[0], keyCount,
- anonymityLevel, type, sock);
- return GNUNET_OK;
-}
-
-
-/**
- * Return 1 if the current network (upstream) or CPU load is
- * (far) too high, 0 if the load is ok.
- */
-static int
-test_load_too_high ()
-{
- return ((hardCPULimit > 0) &&
- (GNUNET_cpu_get_load (ectx,
- coreAPI->cfg) >= hardCPULimit)) ||
- ((hardUpLimit > 0) &&
- (GNUNET_network_monitor_get_load (coreAPI->load_monitor,
- GNUNET_ND_UPLOAD) >= hardUpLimit));
-}
-
-static int
-send_results_dv (const GNUNET_HashCode * key,
- const GNUNET_DatastoreValue * value, void *closure,
- unsigned long long uid)
-{
- struct DV_send_closure *dvcls = (struct DV_send_closure *) closure;
- const P2P_gap_query_MESSAGE *original_msg = dvcls->message;
- P2P_gap_reply_MESSAGE *msg;
- GNUNET_DatastoreValue *enc;
- unsigned int size;
- unsigned long long et;
- GNUNET_CronTime now;
- int ret;
- int want_more;
-
- want_more = GNUNET_OK;
-
- enc = NULL;
- if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND)
- {
- if (GNUNET_OK !=
- GNUNET_FS_ONDEMAND_get_indexed_content (value, key, &enc))
- return GNUNET_NO;
- value = enc;
- }
-
- et = GNUNET_ntohll (value->expiration_time);
- now = GNUNET_get_time ();
- /* convert to relative expiration time */
- if (now < et)
- {
- et -= now;
- if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
- et %= GNUNET_GAP_MAX_MIGRATION_EXP_KSK;
- else
- et %= GNUNET_GAP_MAX_MIGRATION_EXP;
- }
- else
- {
- if (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
- return want_more; /* expired KSK -- ignore! */
- /* indicate entry has expired */
- et = -1;
- }
- size =
- sizeof (P2P_gap_reply_MESSAGE) + ntohl (value->size) -
- sizeof (GNUNET_DatastoreValue);
- msg = GNUNET_malloc (size);
- msg->header.type = htons (GNUNET_P2P_PROTO_GAP_RESULT);
- msg->header.size = htons (size);
- msg->reserved = htonl (0);
- msg->expiration = GNUNET_htonll (et);
- memcpy (&msg[1], &value[1], size - sizeof (P2P_gap_reply_MESSAGE));
-
- ret =
- dv_api->dv_send (&original_msg->returnTo, &msg->header,
- htonl (original_msg->priority) * 2, et);
- if (stats != NULL)
- {
- stats->change (stat_dv_replies_sent, 1);
- }
-
- GNUNET_free_non_null (enc);
- return ret;
-
-}
-
-/**
- * Handle P2P query for content.
- */
-static int
-handle_p2p_query (const GNUNET_PeerIdentity * sender,
- const GNUNET_MessageHeader * msg)
-{
- const P2P_gap_query_MESSAGE *req;
- unsigned int query_count;
- unsigned short size;
- unsigned int bloomfilter_size;
- int ttl;
- unsigned int prio;
- unsigned int type;
- unsigned int netLoad;
- int have_peer;
- int have_data;
- GNUNET_EncName enc;
- enum GNUNET_FS_RoutingPolicy policy;
- double preference;
- struct DV_send_closure *dv_cls;
- int result_count;
-
- if (stats != NULL)
- stats->change (stat_gap_query_received, 1);
- if (test_load_too_high ())
- {
-#if DEBUG_GAP
- if (sender != NULL)
- {
- IF_GELOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
- }
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Dropping query from %s, this peer is too busy.\n",
- sender == NULL ? "localhost" : (char *) &enc);
-#endif
- if (stats != NULL)
- stats->change (stat_gap_query_drop_busy, 1);
- return GNUNET_OK;
- }
- size = ntohs (msg->size);
- if (size < sizeof (P2P_gap_query_MESSAGE))
- {
- GNUNET_GE_BREAK_OP (ectx, 0);
- return GNUNET_SYSERR; /* malformed query */
- }
- req = (const P2P_gap_query_MESSAGE *) msg;
- query_count = ntohl (req->number_of_queries);
- if ((query_count == 0) ||
- (query_count > GNUNET_MAX_BUFFER_SIZE / sizeof (GNUNET_HashCode)) ||
- (size <
- sizeof (P2P_gap_query_MESSAGE) + (query_count -
- 1) * sizeof (GNUNET_HashCode))
- || (0 ==
- memcmp (&req->returnTo, coreAPI->my_identity,
- sizeof (GNUNET_PeerIdentity))))
- {
- GNUNET_GE_BREAK_OP (ectx, 0);
- return GNUNET_SYSERR; /* malformed query */
- }
- bloomfilter_size =
- size - (sizeof (P2P_gap_query_MESSAGE) +
- (query_count - 1) * sizeof (GNUNET_HashCode));
- GNUNET_GE_ASSERT (NULL, bloomfilter_size < size);
- prio = ntohl (req->priority);
- netLoad =
- GNUNET_network_monitor_get_load (coreAPI->load_monitor, GNUNET_ND_UPLOAD);
- if ((netLoad == (unsigned int) -1)
- || (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD))
- {
- prio = 0; /* minimum priority, no charge! */
- policy = GNUNET_FS_RoutingPolicy_ALL;
- }
- else
- {
- prio = -identity->changeHostTrust (sender, -prio);
- if (netLoad < GNUNET_GAP_IDLE_LOAD_THRESHOLD + prio)
- {
- policy = GNUNET_FS_RoutingPolicy_ALL;
- }
- else if (netLoad < 90 + 10 * prio)
- {
- policy =
- GNUNET_FS_RoutingPolicy_ANSWER | GNUNET_FS_RoutingPolicy_FORWARD;
- }
- else if (netLoad < 100)
- {
- policy = GNUNET_FS_RoutingPolicy_ANSWER;
- }
- else
- {
- if (stats != NULL)
- stats->change (stat_gap_query_drop_busy, 1);
- return GNUNET_OK; /* drop */
- }
- }
- if ((policy & GNUNET_FS_RoutingPolicy_INDIRECT) == 0)
- /* kill the priority (since we cannot benefit) */
- prio = 0;
- ttl = GNUNET_FS_HELPER_bound_ttl (ntohl (req->ttl), prio);
- type = ntohl (req->type);
- /* decrement ttl (always) */
- if (ttl < 0)
- {
- ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
- GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
- GNUNET_GAP_TTL_DECREMENT);
- if (ttl > 0)
- /* integer underflow => drop (should be very rare)! */
- return GNUNET_OK;
- }
- else
- {
- ttl -= 2 * GNUNET_GAP_TTL_DECREMENT +
- GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
- GNUNET_GAP_TTL_DECREMENT);
- }
- preference = (double) prio;
- if (preference < GNUNET_GAP_QUERY_BANDWIDTH_VALUE)
- preference = GNUNET_GAP_QUERY_BANDWIDTH_VALUE;
- coreAPI->p2p_connection_preference_increase (sender, preference);
-
- GNUNET_mutex_lock (GNUNET_FS_lock);
- have_peer = dv_api->have_peer (sender);
- have_data = datastore->get (&req->queries[0], type, NULL, NULL);
- GNUNET_hash_to_enc (&req->queries[0], &enc);
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "have_peer returned %d, get (%s) returned %d results for
query type %d",
- have_peer, (char *) &enc, have_data, type);
- if ((dv_api->have_peer (sender) > 0)
- && (datastore->get (&req->queries[0], type, NULL, NULL) > 0))
- {
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "We have the data, we know the return peer intimately
(DV), so we will try and send results thataway!\n");
-
- dv_cls = GNUNET_malloc (sizeof (struct DV_send_closure));
- dv_cls->message = (const P2P_gap_query_MESSAGE *) msg;
- dv_cls->request = NULL; /* Not used for now... */
- result_count = datastore->get (&req->queries[0], type, &send_results_dv,
dv_cls);
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "Found %d results (in handle_p2p_query)\n", result_count);
- GNUNET_free (dv_cls);
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_OK;
- }
- else if ((dv_api->have_peer (sender) > 0)
- && (datastore->
- get (&req->queries[0], GNUNET_ECRS_BLOCKTYPE_DATA, NULL,
- NULL) > 0))
- {
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "We have the data (blocktype_data), we know the return
peer intimately (DV), so we will try and send results thataway!\n");
-
- dv_cls = GNUNET_malloc (sizeof (struct DV_send_closure));
- dv_cls->message = (const P2P_gap_query_MESSAGE *) msg;
- dv_cls->request = NULL; /* Not used for now... */
- result_count = datastore->get (&req->queries[0],
GNUNET_ECRS_BLOCKTYPE_DATA,
- &send_results_dv, dv_cls);
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "Found %d results (in handle_p2p_query)\n", result_count);
-
- GNUNET_free (dv_cls);
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_OK;
- }
- else if ((dv_api->have_peer (sender) > 0)
- && (datastore->
- get (&req->queries[0], GNUNET_ECRS_BLOCKTYPE_ANY, NULL,
- NULL) > 0))
- {
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "We have the data (blocktype_any), we know the return
peer intimately (DV), so we will try and send results thataway!\n");
-
- dv_cls = GNUNET_malloc (sizeof (struct DV_send_closure));
- dv_cls->message = (const P2P_gap_query_MESSAGE *) msg;
- dv_cls->request = NULL; /* Not used for now... */
- result_count = datastore->get (&req->queries[0],
GNUNET_ECRS_BLOCKTYPE_ANY,
- &send_results_dv, dv_cls);
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "Found %d results (in handle_p2p_query)\n",
result_count);
- GNUNET_free (dv_cls);
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_OK;
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
-
- GNUNET_FS_GAP_execute_query (sender,
- prio,
- ntohl (req->priority),
- policy,
- ttl,
- type,
- query_count,
- &req->queries[0],
- ntohl (req->filter_mutator),
- bloomfilter_size, &req->queries[query_count]);
- return GNUNET_OK;
-}
-
-
-/**
- * Use content (forward to whoever sent the query).
- * @param hostId the peer from where the content came,
- * NULL for the local peer
- */
-static int
-handle_p2p_content (const GNUNET_PeerIdentity * sender,
- const GNUNET_MessageHeader * pmsg)
-{
- const P2P_gap_reply_MESSAGE *msg;
- const GNUNET_EC_DBlock *dblock;
- GNUNET_DatastoreValue *value;
- GNUNET_HashCode query;
- unsigned short size;
- unsigned int data_size;
- unsigned int prio;
- unsigned long long expiration;
- double preference;
- GNUNET_CronTime now;
-
- size = ntohs (pmsg->size);
- if (size < sizeof (P2P_gap_reply_MESSAGE))
- {
- GNUNET_GE_BREAK_OP (ectx, 0);
- return GNUNET_SYSERR; /* invalid! */
- }
- msg = (const P2P_gap_reply_MESSAGE *) pmsg;
- data_size = size - sizeof (P2P_gap_reply_MESSAGE);
- dblock = (const GNUNET_EC_DBlock *) &msg[1];
-
- expiration = GNUNET_ntohll (msg->expiration);
- if ((expiration > GNUNET_GAP_MAX_MIGRATION_EXP_KSK) &&
- (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD))
- return GNUNET_OK; /* expired KSK block -- ignore! */
- if (GNUNET_OK !=
- GNUNET_EC_file_block_check_and_get_query (data_size,
- dblock, GNUNET_YES, &query))
- {
- GNUNET_GE_BREAK_OP (ectx, 0);
- return GNUNET_SYSERR; /* invalid! */
- }
- if ((stats != NULL) && (sender != NULL))
- stats->change (stat_gap_content_received, 1);
- /* forward to other peers */
- prio = GNUNET_FS_GAP_handle_response (sender,
- &query,
- expiration, data_size, dblock);
- /* convert expiration to absolute time and bound properly for
- storage in local datastore */
- now = GNUNET_get_time ();
- if (expiration > GNUNET_GAP_MAX_MIGRATION_EXP)
- {
- /* expired, sometime in the past */
- expiration = now - 1;
- }
- else
- {
- /* expires in future, apply bounding! */
- if (ntohl (dblock->type) == GNUNET_ECRS_BLOCKTYPE_KEYWORD)
- expiration %= GNUNET_GAP_MAX_MIGRATION_EXP_KSK;
- else
- expiration %= GNUNET_GAP_MAX_MIGRATION_EXP;
- expiration += now;
- }
- /* forward to local clients */
- prio += GNUNET_DV_FS_QUERYMANAGER_handle_response (sender,
- &query,
- expiration,
- data_size, dblock);
- if ((sender != NULL) &&
- (active_migration == GNUNET_YES) &&
- ((prio > 0) || (!test_load_too_high ())))
- {
- /* consider storing in local datastore */
- value = GNUNET_malloc (data_size + sizeof (GNUNET_DatastoreValue));
- value->size = htonl (data_size + sizeof (GNUNET_DatastoreValue));
- value->type = dblock->type;
- value->priority = htonl (prio);
- value->anonymity_level = htonl (1);
- value->expiration_time = GNUNET_htonll (expiration);
- memcpy (&value[1], dblock, data_size);
- datastore->putUpdate (&query, value);
- GNUNET_free (value);
- }
- if (sender != NULL)
- { /* if we are the sender, sender will be NULL */
- identity->changeHostTrust (sender, prio);
- if (stats != NULL)
- stats->change (stat_gap_trust_awarded, prio);
- preference = (double) prio;
- if (preference < GNUNET_GAP_CONTENT_BANDWIDTH_VALUE)
- preference = GNUNET_GAP_CONTENT_BANDWIDTH_VALUE;
- coreAPI->p2p_connection_preference_increase (sender, preference);
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Initialize the FS module. This method name must match
- * the library name (libgnunet_XXX => initialize_XXX).
- *
- * @return GNUNET_SYSERR on errors
- */
-int
-initialize_module_dv_fs (GNUNET_CoreAPIForPlugins * capi)
-{
- ectx = capi->ectx;
- coreAPI = capi;
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_ContentHashKey) == 128);
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_DBlock) == 4);
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_IBlock) == 132);
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KBlock) == 524);
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_SBlock) == 588);
- GNUNET_GE_ASSERT (ectx, sizeof (GNUNET_EC_KSBlock) == 1116);
-
- if ((-1 == GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDCPULIMIT", 0, 100000, /* 1000 CPUs!? */
- 0, /* 0 == no
limit */
- &hardCPULimit)) || (-1
== GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDUPLIMIT", 0, 999999999, 0, /* 0 == no limit */
-
&hardUpLimit)))
- return GNUNET_SYSERR;
- active_migration
- = GNUNET_GC_get_configuration_value_yesno (coreAPI->cfg,
- "DV_FS",
- "ACTIVEMIGRATION", GNUNET_NO);
- stats = coreAPI->service_request ("stats");
- if (stats != NULL)
- {
- stat_gap_query_received =
- stats->create (gettext_noop ("# gap requests total received"));
- stat_gap_query_drop_busy =
- stats->create (gettext_noop ("# gap requests dropped due to load"));
- stat_gap_content_received =
- stats->create (gettext_noop ("# gap content total received"));
- stat_gap_trust_awarded =
- stats->create (gettext_noop ("# gap total trust awarded"));
- stat_dv_replies_sent =
- stats->create (gettext_noop ("# gap replies sent via dv"));
- }
- identity = coreAPI->service_request ("identity");
- if (identity == NULL)
- {
- GNUNET_GE_BREAK (ectx, 0);
- coreAPI->service_release (stats);
- return GNUNET_SYSERR;
- }
- datastore = coreAPI->service_request ("datastore");
- dv_api = coreAPI->service_request ("dv");
- if (datastore == NULL)
- {
- coreAPI->service_release (identity);
- coreAPI->service_release (stats);
- GNUNET_GE_BREAK (ectx, 0);
- return GNUNET_SYSERR;
- }
- GNUNET_FS_lock = coreAPI->global_lock_get (); // GNUNET_mutex_create
(GNUNET_YES);
- GNUNET_FS_ANONYMITY_init (coreAPI);
- GNUNET_FS_PLAN_init (coreAPI);
- GNUNET_FS_ONDEMAND_init (coreAPI);
- GNUNET_FS_PT_init (ectx, stats);
- GNUNET_DV_FS_QUERYMANAGER_init (coreAPI);
- GNUNET_FS_DV_DHT_init (coreAPI);
- GNUNET_FS_GAP_init (coreAPI);
- GNUNET_FS_MIGRATION_init (coreAPI);
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- _
- ("`%s' registering client handlers %d %d %d %d %d %d %d %d
and P2P handlers %d %d\n"),
- "fs", GNUNET_CS_PROTO_GAP_QUERY_START,
- GNUNET_CS_PROTO_GAP_QUERY_STOP,
- GNUNET_CS_PROTO_GAP_INSERT,
- GNUNET_CS_PROTO_GAP_INDEX, GNUNET_CS_PROTO_GAP_DELETE,
- GNUNET_CS_PROTO_GAP_UNINDEX, GNUNET_CS_PROTO_GAP_TESTINDEX,
- GNUNET_CS_PROTO_GAP_INIT_INDEX,
- GNUNET_P2P_PROTO_GAP_QUERY, GNUNET_P2P_PROTO_GAP_RESULT);
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->p2p_ciphertext_handler_register
- (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->p2p_ciphertext_handler_register
- (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register
- (GNUNET_CS_PROTO_GAP_QUERY_START,
- &handle_cs_query_start_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register
- (GNUNET_CS_PROTO_GAP_QUERY_STOP,
- &handle_cs_query_stop_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register (GNUNET_CS_PROTO_GAP_INSERT,
- &handle_cs_insert_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register (GNUNET_CS_PROTO_GAP_INDEX,
- &handle_cs_index_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->
- cs_handler_register (GNUNET_CS_PROTO_GAP_INIT_INDEX,
- &handle_cs_init_index_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register (GNUNET_CS_PROTO_GAP_DELETE,
- &handle_cs_delete_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_register (GNUNET_CS_PROTO_GAP_UNINDEX,
- &handle_cs_unindex_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->
- cs_handler_register (GNUNET_CS_PROTO_GAP_TESTINDEX,
- &handle_cs_test_indexed_request));
- GNUNET_GE_ASSERT (coreAPI->ectx,
- 0 ==
- GNUNET_GC_set_configuration_value_string (coreAPI->cfg,
- coreAPI->ectx,
- "ABOUT",
- "dv_fs",
- gettext_noop
- ("enables
(anonymous) file-sharing")));
- return GNUNET_OK;
-}
-
-void
-done_module_dv_fs ()
-{
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "dv_fs shutdown\n");
-
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->p2p_ciphertext_handler_unregister
- (GNUNET_P2P_PROTO_GAP_QUERY, &handle_p2p_query));
-
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->p2p_ciphertext_handler_unregister
- (GNUNET_P2P_PROTO_GAP_RESULT, &handle_p2p_content));
-
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_QUERY_START,
- &handle_cs_query_start_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_INSERT, &handle_cs_insert_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister (GNUNET_CS_PROTO_GAP_INDEX,
- &handle_cs_index_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_INIT_INDEX,
- &handle_cs_init_index_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_DELETE, &handle_cs_delete_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_UNINDEX,
- &handle_cs_unindex_request));
- GNUNET_GE_ASSERT (ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_handler_unregister
- (GNUNET_CS_PROTO_GAP_TESTINDEX,
- &handle_cs_test_indexed_request));
- GNUNET_FS_MIGRATION_done ();
- GNUNET_FS_GAP_done ();
- GNUNET_FS_DV_DHT_done ();
- GNUNET_DV_FS_QUERYMANAGER_done ();
- GNUNET_FS_ONDEMAND_done ();
- GNUNET_FS_PLAN_done ();
- GNUNET_FS_ANONYMITY_done ();
- GNUNET_FS_PT_done ();
- if (stats != NULL)
- {
- coreAPI->service_release (stats);
- stats = NULL;
- }
- if (dv_api != NULL)
- coreAPI->service_release (dv_api);
-
- coreAPI->service_release (datastore);
- datastore = NULL;
- coreAPI->service_release (identity);
- identity = NULL;
- GNUNET_FS_lock = NULL;
-}
-
-
-/**
- * Update FS.
- */
-void
-update_module_dv_fs (GNUNET_UpdateAPI * uapi)
-{
- uapi->service_update ("datastore");
-}
-
-
-/* end of dv_fs.c */
Deleted: GNUnet/src/applications/fs/gap/dv_querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.c 2009-12-02 17:52:51 UTC
(rev 9670)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.c 2009-12-02 18:20:14 UTC
(rev 9671)
@@ -1,849 +0,0 @@
-/*
- This file is part of GNUnet
- (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
- */
-
-/**
- * @file fs/gap/dv_querymanager.c
- * @brief management of queries from our clients
- * @author Christian Grothoff
- * @author Nathan Evans
- *
- * This code forwards queries (using GAP and DHT) to other peers and
- * passes replies (from GAP or DHT) back to clients.
- */
-
-#include "platform.h"
-#include "gnunet_protocols.h"
-#include "gnunet_stats_service.h"
-#include "querymanager.h"
-#include "fs.h"
-#include "fs_dv_dht.h"
-#include "gap.h"
-#include "plan.h"
-#include "pid_table.h"
-#include "shared.h"
-#include "gnunet_dv_service.h"
-
-#define CHECK_REPEAT_FREQUENCY (150 * GNUNET_CRON_MILLISECONDS)
-
-/**
- * Linked list with information for each client.
- */
-struct ClientDataList
-{
-
- /**
- * This is a linked list.
- */
- struct ClientDataList *next;
-
- /**
- * For which client is this data kept?
- */
- struct GNUNET_ClientHandle *client;
-
- /**
- * List of active requests for the client.
- */
- struct RequestList *requests;
-
- /**
- * Tail of the requests list.
- */
- struct RequestList *request_tail;
-
-};
-
-/**
- * List of all clients, their active requests and other
- * per-client information.
- */
-static struct ClientDataList *clients;
-
-static struct ClientDataList *clients_tail;
-
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
-static GNUNET_DV_ServiceAPI *dv_api;
-
-static GNUNET_Stats_ServiceAPI *stats;
-
-static GNUNET_Datastore_ServiceAPI *datastore;
-
-static int stat_gap_client_query_received;
-
-static int stat_gap_client_response_sent;
-
-static int stat_gap_client_query_tracked;
-
-static int stat_gap_client_query_injected;
-
-static int stat_gap_client_bf_updates;
-
-static int stat_gap_dv_sends;
-
-/**
- * How many bytes should a bloomfilter be if
- * we have already seen entry_count responses?
- * Note that GNUNET_GAP_BLOOMFILTER_K gives us the
- * number of bits set per entry. Furthermore,
- * we should not re-size the filter too often
- * (to keep it cheap).
- *
- * Since other peers will also add entries but
- * not resize the filter, we should generally
- * pick a slightly larger size than what the
- * strict math would suggest.
- *
- * @return must be a power of two and smaller
- * or equal to 2^15.
- */
-static unsigned int
-compute_bloomfilter_size (unsigned int entry_count)
-{
- unsigned short size;
- unsigned short max = 1 << 15;
- unsigned int ideal = (entry_count * GNUNET_GAP_BLOOMFILTER_K) / 4;
-
- if (entry_count > max)
- return max;
- size = 8;
- while ((size < max) && (size < ideal))
- size *= 2;
- return size;
-}
-
-static int
-mark_response_seen (const GNUNET_HashCode * key, void *value, void *cls)
-{
- GNUNET_FS_SHARED_mark_response_seen (key, cls);
- return GNUNET_OK;
-}
-
-static int
-send_dv_query (struct RequestList *request, const GNUNET_PeerIdentity * peer)
-{
- P2P_gap_query_MESSAGE *msg;
- unsigned int size;
- GNUNET_CronTime now;
- int ret;
- ret = GNUNET_SYSERR;
- int ttl;
- int prio = GNUNET_FS_GAP_get_average_priority ();
-
- GNUNET_GE_ASSERT (NULL, request->key_count > 0);
- size = sizeof (P2P_gap_query_MESSAGE)
- + request->bloomfilter_size + (request->key_count -
- 1) * sizeof (GNUNET_HashCode);
- msg = (P2P_gap_query_MESSAGE *) GNUNET_malloc (size);
- if ((prio > request->remaining_value) && (request->response_client == NULL))
- prio = request->remaining_value;
- now = GNUNET_get_time ();
- ttl = GNUNET_FS_HELPER_bound_ttl (now + 60 * GNUNET_CRON_SECONDS, prio);
- msg->header.size = htons (size);
- msg->header.type = htons (GNUNET_P2P_PROTO_GAP_QUERY);
- msg->type = htonl (request->type);
- msg->priority = htonl (prio);
- msg->ttl = htonl (ttl);
- msg->filter_mutator = htonl (request->bloomfilter_mutator);
- msg->number_of_queries = htonl (request->key_count);
- if (0 != (request->policy & GNUNET_FS_RoutingPolicy_INDIRECT))
- msg->returnTo = *coreAPI->my_identity;
- else
- GNUNET_FS_PT_resolve (request->response_target, &msg->returnTo);
- memcpy (&msg->queries[0],
- &request->queries[0],
- request->key_count * sizeof (GNUNET_HashCode));
- if (request->bloomfilter != NULL)
- GNUNET_bloomfilter_get_raw_data (request->bloomfilter,
- (char *) &msg->queries[request->
- key_count],
- request->bloomfilter_size);
- now = GNUNET_get_time ();
- if (now + ttl > request->last_request_time + request->last_ttl_used)
- {
- request->last_request_time = now;
- request->last_prio_used = prio;
- request->last_ttl_used = ttl;
- }
- request->remaining_value -= prio;
-
- ret = dv_api->dv_send (peer, &msg->header, prio * 2, ttl);
- if ((stats != NULL) && (ret > 0))
- {
- stats->change (stat_gap_dv_sends, 1);
- }
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "Sending message via DV returned %d, type of request %d
(htonl %d)\n",
- ret, request->type, htonl (request->type));
- return ret;
-}
-
-/**
- * A client is asking us to run a query. The query should be issued
- * until either a unique response has been obtained or until the
- * client disconnects.
- *
- * @param target peer known to have the content, maybe NULL.
- */
-void
-GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
- unsigned int key_count,
- unsigned int anonymityLevel,
- unsigned int type,
- struct GNUNET_ClientHandle *client,
- const GNUNET_PeerIdentity * target,
- const struct GNUNET_MultiHashMap *seen,
- int have_more)
-{
- struct ClientDataList *cl;
- struct RequestList *request;
-
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "entered GNUNET_DV_FS_QUERYMANAGER_start_query\n");
- if (target == NULL)
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "target is null!\n");
- GNUNET_GE_ASSERT (NULL, key_count > 0);
- if (stats != NULL)
- {
- stats->change (stat_gap_client_query_tracked, 1);
- stats->change (stat_gap_client_query_received, 1);
- }
- request =
- GNUNET_malloc (sizeof (struct RequestList) +
- (key_count - 1) * sizeof (GNUNET_HashCode));
- memset (request, 0, sizeof (struct RequestList));
- request->anonymityLevel = anonymityLevel;
- request->key_count = key_count;
- request->type = type;
- request->primary_target = GNUNET_FS_PT_intern (target);
- request->response_client = client;
- request->policy = GNUNET_FS_RoutingPolicy_ALL;
- if (have_more != GNUNET_NO)
- request->have_more = GNUNET_GAP_HAVE_MORE_INCREMENT;
- memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count);
- if (seen != NULL)
- {
- request->bloomfilter_entry_count = GNUNET_multi_hash_map_size (seen);
- request->bloomfilter_size =
- compute_bloomfilter_size (request->bloomfilter_entry_count);
- request->bloomfilter_mutator =
- GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
- request->bloomfilter =
- GNUNET_bloomfilter_init (NULL, NULL, request->bloomfilter_size,
- GNUNET_GAP_BLOOMFILTER_K);
- if (stats != NULL)
- stats->change (stat_gap_client_bf_updates, 1);
-
- GNUNET_multi_hash_map_iterate (seen, &mark_response_seen, request);
- }
- GNUNET_mutex_lock (GNUNET_FS_lock);
- cl = clients;
- while ((cl != NULL) && (cl->client != client))
- cl = cl->next;
- if (cl == NULL)
- {
- cl = GNUNET_malloc (sizeof (struct ClientDataList));
- memset (cl, 0, sizeof (struct ClientDataList));
- cl->client = client;
- cl->next = clients;
- clients = cl;
- if (clients_tail == NULL)
- clients_tail = cl;
- }
- request->next = cl->requests;
- cl->requests = request;
- if (cl->request_tail == NULL)
- cl->request_tail = request;
-
- if ((anonymityLevel == 0) && (target != NULL)
- && (dv_api->have_peer (target)))
- {
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "anonymity is zero, target non-null, and we know this
peer. Will attempt to send requests out over DV\n");
- if (send_dv_query (request, target) > 0)
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- request->last_dht_get = GNUNET_get_time ();
- request->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY;
- return;
- }
- }
- else if ((anonymityLevel == 0) && (target != NULL))
- {
- GNUNET_GE_LOG (coreAPI->ectx,
- GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK,
- "anonymity is zero, target non-null, but we don't know
this peer\n");
- }
- if ((GNUNET_YES == GNUNET_FS_PLAN_request (client, 0, request)) &&
- (stats != NULL))
- stats->change (stat_gap_client_query_injected, 1);
- if (anonymityLevel == 0)
- {
- request->last_dht_get = GNUNET_get_time ();
- request->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY;
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- if ((anonymityLevel == 0) && (type == 0)) /* Cannot search the dht with
type 0 */
- GNUNET_FS_DV_DHT_execute_query (GNUNET_ECRS_BLOCKTYPE_KEYWORD, query);
-}
-
-/**
- * A client is asking us to stop running a query (without disconnect).
- */
-int
-GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
- unsigned int key_count,
- unsigned int anonymityLevel,
- unsigned int type,
- struct GNUNET_ClientHandle *client)
-{
- struct ClientDataList *cl;
- struct ClientDataList *cprev;
- struct RequestList *pos;
- struct RequestList *rprev;
-
- GNUNET_mutex_lock (GNUNET_FS_lock);
- cl = clients;
- cprev = NULL;
- while ((cl != NULL) && (cl->client != client))
- {
- cprev = cl;
- cl = cl->next;
- }
- if (cl == NULL)
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_SYSERR;
- }
- rprev = NULL;
- pos = cl->requests;
- while (pos != NULL)
- {
- if ((pos->type == type) &&
- (pos->key_count == key_count) &&
- (0 == memcmp (query,
- &pos->queries[0],
- sizeof (GNUNET_HashCode) * key_count)) &&
- (pos->anonymityLevel == anonymityLevel))
- break;
- rprev = pos;
- pos = pos->next;
- }
- if (pos == NULL)
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_SYSERR;
- }
- if (cl->request_tail == pos)
- cl->request_tail = rprev;
- if (rprev == NULL)
- cl->requests = pos->next;
- else
- rprev->next = pos->next;
- GNUNET_FS_SHARED_free_request_list (pos);
- if (stats != NULL)
- stats->change (stat_gap_client_query_tracked, -1);
- if (cl->requests == NULL)
- {
- if (cl == clients_tail)
- clients_tail = cprev;
- if (cprev == NULL)
- clients = cl->next;
- else
- cprev->next = cl->next;
- GNUNET_free (cl);
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return GNUNET_OK;
-}
-
-struct IteratorClosure
-{
- struct GNUNET_BloomFilter *filter;
- int mingle_number;
-};
-
-/**
- * Iterator over Map hash codes.
- *
- * @param arg pointer to a location where we
- * have our current index into the linked list.
- * @return GNUNET_YES if we have more,
- * GNUNET_NO if this is the last entry
- */
-static int
-response_bf_iterator (const GNUNET_HashCode * key, void *value, void *arg)
-{
- struct IteratorClosure *cls = arg;
- GNUNET_HashCode n;
-
- GNUNET_FS_HELPER_mingle_hash (key, cls->mingle_number, &n);
- GNUNET_bloomfilter_add (cls->filter, &n);
- return GNUNET_YES;
-}
-
-/**
- * We got a response for a client request.
- * Check if we have seen this response already.
- * If not, check if it truly matches (namespace!).
- * If so, transmit to client and update response
- * lists and bloomfilter accordingly.
- *
- * @param value how much is this response worth to us?
- * the function should increment value accordingly
- * @return GNUNET_OK if this was the last response
- * and we should remove the request entry.
- * GNUNET_NO if we should continue looking
- * GNUNET_SYSERR on serious errors
- */
-static int
-handle_response (PID_INDEX sender,
- struct GNUNET_ClientHandle *client,
- struct RequestList *rl,
- const GNUNET_HashCode * primary_key,
- GNUNET_CronTime expirationTime,
- unsigned int size, const GNUNET_EC_DBlock * data,
- unsigned int *value)
-{
- struct IteratorClosure ic;
- CS_fs_reply_content_MESSAGE *msg;
- GNUNET_HashCode hc;
- int ret;
- unsigned int bf_size;
-
- /* check that content matches query */
- ret = GNUNET_FS_SHARED_test_valid_new_response (rl,
- primary_key,
- size, data, &hc);
- if (ret != GNUNET_OK)
- return ret;
- if (sender == 0) /* dht produced response */
- rl->dht_back_off = GNUNET_GAP_MAX_DHT_DELAY; /* go back! */
- /* send to client */
- msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size);
- msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size);
- msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT);
- msg->anonymity_level = htonl (0); /* unknown */
- msg->expiration_time = GNUNET_htonll (expirationTime);
- memcpy (&msg[1], data, size);
- ret = coreAPI->cs_send_message (client,
- &msg->header,
- (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
- ? GNUNET_NO : GNUNET_YES);
- GNUNET_free (msg);
- if (ret != GNUNET_OK)
- return GNUNET_NO;
- if (stats != NULL)
- stats->change (stat_gap_client_response_sent, 1);
-
- /* update *value */
- *value += 1 + rl->value;
- GNUNET_FS_PLAN_success (sender, client, 0, rl);
-
- if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
- return GNUNET_OK; /* the end */
-
- /* update bloom filter */
- rl->bloomfilter_entry_count++;
- bf_size = compute_bloomfilter_size (rl->bloomfilter_entry_count);
- if (rl->bloomfilter == NULL)
- {
- rl->bloomfilter_mutator
- = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
- rl->bloomfilter_size = bf_size;
- rl->bloomfilter = GNUNET_bloomfilter_init (NULL,
- NULL,
- rl->bloomfilter_size,
- GNUNET_GAP_BLOOMFILTER_K);
- if (stats != NULL)
- stats->change (stat_gap_client_bf_updates, 1);
- }
- else if (rl->bloomfilter_size != bf_size)
- {
- rl->bloomfilter_mutator
- = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, -1);
- GNUNET_bloomfilter_free (rl->bloomfilter);
- rl->bloomfilter =
- GNUNET_bloomfilter_init (NULL,
- NULL, bf_size, GNUNET_GAP_BLOOMFILTER_K);
- ic.filter = rl->bloomfilter;
- ic.mingle_number = rl->bloomfilter_mutator;
- if (rl->responses != NULL)
- GNUNET_multi_hash_map_iterate (rl->responses,
- &response_bf_iterator, &ic);
- if (stats != NULL)
- stats->change (stat_gap_client_bf_updates, 1);
- }
- GNUNET_FS_SHARED_mark_response_seen (&hc, rl);
-
- /* we want more */
- return GNUNET_NO;
-}
-
-/**
- * Handle the given response (by forwarding it to
- * other peers as necessary).
- *
- * @param sender who sent the response (good too know
- * for future routing decisions)
- * @param primary_query hash code used for lookup
- * (note that namespace membership may
- * require additional verification that has
- * not yet been performed; checking the
- * signature has already been done)
- * @param size size of the data
- * @param data the data itself (a GNUNET_EC_DBlock)
- * @return how much was this content worth to us?
- */
-unsigned int
-GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
- const GNUNET_HashCode *
- primary_query,
- GNUNET_CronTime expirationTime,
- unsigned int size,
- const GNUNET_EC_DBlock * data)
-{
- struct ClientDataList *cl;
- struct RequestList *rl;
- struct RequestList *prev;
- unsigned int value;
- PID_INDEX rid;
-
- rid = GNUNET_FS_PT_intern (sender);
- GNUNET_mutex_lock (GNUNET_FS_lock);
- value = 0;
- cl = clients;
- while (cl != NULL)
- {
- rl = cl->requests;
- prev = NULL;
- while (rl != NULL)
- {
- if (GNUNET_OK ==
- handle_response (rid,
- cl->client,
- rl,
- primary_query,
- expirationTime, size, data, &value))
- {
- if (prev != NULL)
- prev->next = rl->next;
- else
- cl->requests = rl->next;
- if (rl == cl->request_tail)
- cl->request_tail = prev;
- GNUNET_FS_SHARED_free_request_list (rl);
- if (stats != NULL)
- stats->change (stat_gap_client_query_tracked, -1);
- if (prev == NULL)
- rl = cl->requests;
- else
- rl = prev->next;
- }
- else
- {
- prev = rl;
- rl = rl->next;
- }
- }
- cl = cl->next;
- }
-
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- GNUNET_FS_PT_change_rc (rid, -1);
- return value;
-}
-
-/**
- * Method called whenever a given client disconnects.
- * Frees all of the associated data structures.
- */
-static void
-handle_client_exit (struct GNUNET_ClientHandle *client)
-{
- struct ClientDataList *cl;
- struct ClientDataList *prev;
- struct RequestList *rl;
-
- GNUNET_mutex_lock (GNUNET_FS_lock);
- cl = clients;
- prev = NULL;
- while ((cl != NULL) && (cl->client != client))
- {
- prev = cl;
- cl = cl->next;
- }
- if (cl == clients_tail)
- clients_tail = prev;
- if (cl != NULL)
- {
- while (cl->requests != NULL)
- {
- rl = cl->requests;
- cl->requests = rl->next;
- GNUNET_FS_SHARED_free_request_list (rl);
- if (stats != NULL)
- stats->change (stat_gap_client_query_tracked, -1);
- }
- if (prev == NULL)
- clients = cl->next;
- else
- prev->next = cl->next;
- GNUNET_free (cl);
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
-}
-
-
-struct HMClosure
-{
- struct RequestList *request;
- unsigned int processed;
- int have_more;
-};
-
-/**
- * Any response that we get should be passed
- * back to the client. If the response is unique,
- * we should about the iteration (return GNUNET_SYSERR).
- */
-static int
-have_more_processor (const GNUNET_HashCode * key,
- const GNUNET_DatastoreValue *
- value, void *closure, unsigned long long uid)
-{
- struct HMClosure *cls = closure;
- GNUNET_HashCode hc;
- int ret;
-
- ret = GNUNET_FS_HELPER_send_to_client (coreAPI,
- key, value,
- cls->request->response_client,
- cls->request, &hc);
- if (ret != GNUNET_OK)
- {
- /* client can take no more right now */
- cls->have_more = GNUNET_YES;
- return ret; /* NO: delete, SYSERR: abort */
- }
- GNUNET_FS_SHARED_mark_response_seen (&hc, cls->request);
- cls->processed++;
- if (cls->processed > GNUNET_GAP_MAX_ASYNC_PROCESSED)
- {
- cls->have_more = GNUNET_YES;
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Cron-job to periodically check if we should
- * repeat requests.
- */
-static void
-repeat_requests_job (void *unused)
-{
- struct HMClosure hmc;
- struct ClientDataList *client;
- struct RequestList *request;
- struct RequestList *prev;
- GNUNET_CronTime now;
-
- GNUNET_mutex_lock (GNUNET_FS_lock);
- if (clients == NULL)
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return;
- }
- now = GNUNET_get_time ();
- client = clients;
- if (clients_tail != client)
- {
- /* move client to tail of list */
- GNUNET_GE_ASSERT (NULL, clients_tail->next == NULL);
- clients = clients->next;
- clients_tail->next = client;
- clients_tail = client;
- client->next = NULL;
- }
- request = client->requests;
- if (request == NULL)
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return;
- }
- if (client->request_tail != request)
- {
- /* move request to tail of list */
- GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
- client->requests = request->next;
- client->request_tail->next = request;
- prev = client->request_tail;
- client->request_tail = request;
- request->next = NULL;
- }
- else
- {
- prev = NULL;
- }
- GNUNET_GE_ASSERT (NULL, request->next == NULL);
- GNUNET_GE_ASSERT (NULL, client->request_tail->next == NULL);
- if ((client->client != NULL) &&
- (GNUNET_OK !=
- coreAPI->cs_send_message_now_test (client->client,
- GNUNET_GAP_ESTIMATED_DATA_SIZE,
- GNUNET_NO)))
- {
- GNUNET_mutex_unlock (GNUNET_FS_lock);
- return;
- }
- if (request->have_more > 0)
- {
- request->have_more--;
- hmc.request = request;
- hmc.processed = 0;
- hmc.have_more = GNUNET_NO;
-
- if (request->type == GNUNET_ECRS_BLOCKTYPE_DATA)
- {
- if (((1 == datastore->get (&request->queries[0], request->type,
- &have_more_processor, &hmc)) ||
- (1 == datastore->get (&request->queries[0],
- GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
- &have_more_processor, &hmc))) &&
- (hmc.have_more == GNUNET_NO))
- {
- if (prev == NULL)
- {
- client->request_tail = NULL;
- client->requests = NULL;
- }
- else
- {
- prev->next = NULL;
- if (client->request_tail == request)
- client->request_tail = prev;
- }
- GNUNET_FS_SHARED_free_request_list (request);
- if (stats != NULL)
- stats->change (stat_gap_client_query_tracked, -1);
- }
- }
- else
- {
- datastore->get (&request->queries[0], request->type,
- &have_more_processor, &hmc);
- }
- if (hmc.have_more)
- request->have_more += GNUNET_GAP_HAVE_MORE_INCREMENT;
- }
- else
- {
- if ((NULL == request->plan_entries) &&
- ((client->client != NULL) ||
- (request->expiration > now)) &&
- (request->last_ttl_used * GNUNET_CRON_SECONDS +
- request->last_request_time < now))
- {
- /*if ((GNUNET_OK ==
- GNUNET_FS_PLAN_request (client->client, 0, request))
- && (stats != NULL))
- stats->change (stat_gap_client_query_injected, 1); */
- }
-
- if ((request->anonymityLevel == 0) &&
- (request->last_dht_get + request->dht_back_off < now))
- {
- if (request->dht_back_off * 2 > request->dht_back_off)
- request->dht_back_off *= 2;
- request->last_dht_get = now;
- /*GNUNET_FS_DV_DHT_execute_query (request->type,
&request->queries[0]); */
- }
- }
- GNUNET_mutex_unlock (GNUNET_FS_lock);
-}
-
-int
-GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi)
-{
- coreAPI = capi;
- GNUNET_GE_ASSERT (coreAPI->ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_disconnect_handler_register
- (&handle_client_exit));
- datastore = coreAPI->service_request ("datastore");
- stats = coreAPI->service_request ("stats");
- dv_api = coreAPI->service_request ("dv");
- if (stats != NULL)
- {
- stat_gap_client_query_received =
- stats->create (gettext_noop ("# gap client queries received"));
- stat_gap_client_response_sent =
- stats->create (gettext_noop ("# gap replies sent to clients"));
- stat_gap_client_query_tracked =
- stats->create (gettext_noop ("# gap client requests tracked"));
- stat_gap_client_query_injected =
- stats->create (gettext_noop ("# gap client requests injected"));
- stat_gap_client_bf_updates =
- stats->create (gettext_noop
- ("# gap query bloomfilter resizing updates"));
- stat_gap_dv_sends =
- stats->create (gettext_noop ("# dv gap requests sent"));
- }
- GNUNET_cron_add_job (coreAPI->cron,
- &repeat_requests_job,
- CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
- return 0;
-}
-
-int
-GNUNET_DV_FS_QUERYMANAGER_done ()
-{
- GNUNET_cron_del_job (coreAPI->cron,
- &repeat_requests_job, CHECK_REPEAT_FREQUENCY, NULL);
- GNUNET_GE_ASSERT (coreAPI->ectx,
- GNUNET_SYSERR !=
- coreAPI->cs_disconnect_handler_unregister
- (&handle_client_exit));
- while (clients != NULL)
- handle_client_exit (clients->client);
- coreAPI->service_release (datastore);
- datastore = NULL;
- coreAPI->service_release (dv_api);
- dv_api = NULL;
- if (stats != NULL)
- {
- coreAPI->service_release (stats);
- stats = NULL;
- }
- return 0;
-}
-
-/* end of dv_querymanager.c */
Deleted: GNUnet/src/applications/fs/gap/dv_querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/dv_querymanager.h 2009-12-02 17:52:51 UTC
(rev 9670)
+++ GNUnet/src/applications/fs/gap/dv_querymanager.h 2009-12-02 18:20:14 UTC
(rev 9671)
@@ -1,91 +0,0 @@
-/*
- This file is part of GNUnet
- (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
- */
-
-/**
- * @file fs/gap/dv_querymanager.h
- * @brief management of queries from our clients
- * @author Christian Grothoff, Nathan Evans
- */
-#ifndef DV_QUERYMANAGER_H
-#define DV_QUERYMANAGER_H
-
-#include "gnunet_util.h"
-#include "gnunet_core.h"
-#include "ecrs_core.h"
-#include "shared.h"
-
-int GNUNET_DV_FS_QUERYMANAGER_init (GNUNET_CoreAPIForPlugins * capi);
-
-int GNUNET_DV_FS_QUERYMANAGER_done (void);
-
-
-/**
- * A client is asking us to run a query. The query should be issued
- * until either a unique response has been obtained, the client
- * requests us to stop or until the client disconnects.
- *
- * @param target peer known to have the content, maybe NULL.
- * @param have_more do we have more results in our local datastore?
- */
-void
-GNUNET_DV_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
- unsigned int key_count,
- unsigned int anonymityLevel,
- unsigned int type,
- struct GNUNET_ClientHandle *client,
- const GNUNET_PeerIdentity * target,
- const struct GNUNET_MultiHashMap *seen,
- int have_more);
-
-/**
- * A client is asking us to stop running a query (without disconnect).
- */
-int
-GNUNET_DV_FS_QUERYMANAGER_stop_query (const GNUNET_HashCode * query,
- unsigned int key_count,
- unsigned int anonymityLevel,
- unsigned int type,
- struct GNUNET_ClientHandle *client);
-
-/**
- * Handle the given response (by forwarding it to
- * other peers as necessary).
- *
- * @param sender who send the response (good too know
- * for future routing decisions)
- * @param primary_query hash code used for lookup
- * (note that namespace membership may
- * require additional verification that has
- * not yet been performed; checking the
- * signature has already been done)
- * @param size size of the data
- * @param data the data itself (a GNUNET_EC_DBlock)
- * @return how much was this content worth to us?
- */
-unsigned int
-GNUNET_DV_FS_QUERYMANAGER_handle_response (const GNUNET_PeerIdentity * sender,
- const GNUNET_HashCode *
- primary_query,
- GNUNET_CronTime expirationTime,
- unsigned int size,
- const GNUNET_EC_DBlock * data);
-
-
-#endif
Deleted: GNUnet/src/applications/fs/gap/fs_dv_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.c 2009-12-02 17:52:51 UTC (rev
9670)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.c 2009-12-02 18:20:14 UTC (rev
9671)
@@ -1,295 +0,0 @@
-/*
- This file is part of GNUnet
- (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
- */
-
-/**
- * @file fs/gap/fs_dv_dht.c
- * @brief integration of file-sharing with the DHT
- * infrastructure
- * @author Christian Grothoff, Nathan Evans
- */
-
-#include "platform.h"
-#include "gnunet_dv_dht_service.h"
-#include "gnunet_sqstore_service.h"
-#include "gnunet_stats_service.h"
-#include "gnunet_protocols.h"
-#include "ecrs_core.h"
-#include "fs.h"
-#include "shared.h"
-#include "fs_dv_dht.h"
-#include "dv_querymanager.h"
-
-/**
- * Linked list containing the DHT get handles
- * of our active requests.
- */
-struct ActiveRequestRecords
-{
-
- struct ActiveRequestRecords *next;
-
- struct GNUNET_DV_DHT_GetHandle *handle;
-
- GNUNET_CronTime end_time;
-
- unsigned int type;
-
-};
-
-static GNUNET_DV_DHT_ServiceAPI *dv_dht;
-
-static GNUNET_SQstore_ServiceAPI *sqstore;
-
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
-static GNUNET_Stats_ServiceAPI *stats;
-
-static int stat_push_count;
-
-static struct ActiveRequestRecords *records;
-
-/**
- * Thread that does the pushing.
- */
-static struct GNUNET_ThreadHandle *thread;
-
-/**
- * Should the thread terminate?
- */
-static int shutdown_requested;
-
-/**
- * Total number of entries with anonymity 0.
- * Used to calculate how long we should wait
- * between iterations.
- */
-static unsigned int total;
-
-
-/**
- * Cancel all requests with the DHT that
- * are older than a certain time limit.
- */
-static void
-purge_old_records (GNUNET_CronTime limit)
-{
- struct ActiveRequestRecords *pos;
- struct ActiveRequestRecords *prev;
-
- prev = NULL;
- pos = records;
- while (pos != NULL)
- {
- if (pos->end_time < limit)
- {
- if (prev == NULL)
- records = pos->next;
- else
- prev->next = pos->next;
- dv_dht->get_stop (pos->handle);
- GNUNET_free (pos);
- if (prev == NULL)
- pos = records;
- else
- pos = prev->next;
- }
- else
- {
- prev = pos;
- pos = pos->next;
- }
- }
-}
-
-
-/**
- * We got a result from the DHT. Check that it is valid
- * and pass to our clients.
- *
- * @param key the current key
- * @param value the current value
- * @param cls argument passed for context (closure)
- * @return GNUNET_OK to continue with iteration, GNUNET_SYSERR to abort
- */
-static int
-response_callback (const GNUNET_HashCode * key,
- unsigned int type,
- unsigned int size, const char *value, void *cls)
-{
- struct ActiveRequestRecords *record = cls;
- const GNUNET_EC_DBlock *dblock;
- GNUNET_HashCode hc;
-
- dblock = (const GNUNET_EC_DBlock *) value;
- if ((GNUNET_SYSERR ==
- GNUNET_EC_file_block_check_and_get_query (size,
- dblock,
- GNUNET_YES,
- &hc)) ||
- (0 != memcmp (key, &hc, sizeof (GNUNET_HashCode))))
- {
- GNUNET_GE_BREAK_OP (NULL, 0);
- return GNUNET_OK;
- }
- GNUNET_DV_FS_QUERYMANAGER_handle_response (NULL, &hc, 0, size, dblock);
- if (record->type == GNUNET_ECRS_BLOCKTYPE_DATA)
- {
- record->end_time = 0; /* delete ASAP */
- return GNUNET_SYSERR; /* no more! */
- }
- return GNUNET_OK;
-}
-
-/**
- * Execute a GAP query. Determines where to forward
- * the query and when (and captures state for the response).
- * May also have to check the local datastore.
- *
- * @param type type of content requested
- * @param query hash code of the query
- */
-void
-GNUNET_FS_DV_DHT_execute_query (unsigned int type,
- const GNUNET_HashCode * query)
-{
- struct ActiveRequestRecords *record;
- GNUNET_CronTime now;
-
- if (dv_dht == NULL)
- return;
-
- now = GNUNET_get_time ();
- record = GNUNET_malloc (sizeof (struct ActiveRequestRecords));
- record->end_time = now + GNUNET_GAP_MAX_DHT_DELAY;
- record->type = type;
- /*record->type = GNUNET_ECRS_BLOCKTYPE_KEYWORD; *//* Anonymous query should
only get this type, right? */
- record->handle =
- dv_dht->get_start (record->type, query, &response_callback, record);
- if (record->handle == NULL)
- {
- GNUNET_free (record);
- return; /* failed in DHT */
- }
- GNUNET_mutex_lock (GNUNET_FS_lock);
- record->next = records;
- records = record;
- purge_old_records (now);
- GNUNET_mutex_unlock (GNUNET_FS_lock);
-}
-
-/**
- * Callback invoked on zero-anonymity content
- * (used to push that content into the DHT).
- */
-static int
-push_callback (const GNUNET_HashCode * key,
- const GNUNET_DatastoreValue * value, void *closure,
- unsigned long long uid)
-{
- GNUNET_CronTime delay;
-
- if (GNUNET_YES == shutdown_requested)
- return GNUNET_SYSERR;
- /* try pushing out everything every 6h,
- but do not push more often than every 5s */
- delay = 6 * GNUNET_CRON_HOURS / total;
- if (delay < 5 * GNUNET_CRON_SECONDS)
- delay = 5 * GNUNET_CRON_SECONDS;
- if (delay > 60 * GNUNET_CRON_SECONDS)
- delay = 60 * GNUNET_CRON_SECONDS;
- GNUNET_thread_sleep (delay);
- if (GNUNET_YES == shutdown_requested)
- return GNUNET_SYSERR;
-
- dv_dht->put (key,
- ntohl (value->type),
- ntohl (value->size) - sizeof (GNUNET_DatastoreValue),
- (const char *) &value[1]);
- if (stats != NULL)
- stats->change (stat_push_count, 1);
- if (GNUNET_YES == shutdown_requested)
- return GNUNET_SYSERR;
- return GNUNET_OK;
-}
-
-/**
- * Main method of the thread responsible for pushing
- * out the content.
- */
-static void *
-push_thread (void *cls)
-{
- while ((shutdown_requested == GNUNET_NO) &&
- (dv_dht != NULL) && (sqstore != NULL))
- {
- if (total == 0)
- total = 1;
- total = sqstore->iterateNonAnonymous (0, &push_callback, NULL);
- if ((shutdown_requested == GNUNET_NO) && (total == 0))
- GNUNET_thread_sleep (5 * GNUNET_CRON_MINUTES);
- }
- return NULL;
-}
-
-
-int
-GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi)
-{
- coreAPI = capi;
- dv_dht = capi->service_request ("dv_dht");
- sqstore = capi->service_request ("sqstore");
- stats = capi->service_request ("stats");
- if (stats != NULL)
- stat_push_count
- = stats->create (gettext_noop ("# blocks pushed into DHT"));
- if ((dv_dht != NULL) && (sqstore != NULL))
- {
- shutdown_requested = GNUNET_NO;
- thread = GNUNET_thread_create (&push_thread, NULL, 1024 * 128);
- }
- return 0;
-}
-
-int
-GNUNET_FS_DV_DHT_done ()
-{
- void *unused;
-
- purge_old_records (-1);
- if (thread != NULL)
- {
- shutdown_requested = GNUNET_YES;
- GNUNET_thread_stop_sleep (thread);
- GNUNET_thread_join (thread, &unused);
- }
- if (stats != NULL)
- {
- coreAPI->service_release (stats);
- stats = NULL;
- }
- if (dv_dht != NULL)
- coreAPI->service_release (dv_dht);
- dv_dht = NULL;
- if (sqstore != NULL)
- coreAPI->service_release (sqstore);
- sqstore = NULL;
- coreAPI = NULL;
- return 0;
-}
Deleted: GNUnet/src/applications/fs/gap/fs_dv_dht.h
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dv_dht.h 2009-12-02 17:52:51 UTC (rev
9670)
+++ GNUnet/src/applications/fs/gap/fs_dv_dht.h 2009-12-02 18:20:14 UTC (rev
9671)
@@ -1,48 +0,0 @@
-/*
- This file is part of GNUnet
- (C) 2001 - 2009 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
- */
-
-/**
- * @file fs/gap/fs_dv_dht.h
- * @brief integration of file-sharing with the DV_DHT
- * infrastructure
- * @author Christian Grothoff, Nathan Evans
- */
-#ifndef FS_DV_DHT_H
-#define FS_DV_DHT_H
-
-#include "gnunet_util.h"
-
-int GNUNET_FS_DV_DHT_init (GNUNET_CoreAPIForPlugins * capi);
-
-int GNUNET_FS_DV_DHT_done (void);
-
-/**
- * Execute a GAP query. Determines where to forward
- * the query and when (and captures state for the response).
- * May also have to check the local datastore.
- *
- * @param type type of content requested
- * @param query hash code of the query
- */
-void
-GNUNET_FS_DV_DHT_execute_query (unsigned int type,
- const GNUNET_HashCode * query);
-
-#endif
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r9671 - GNUnet/src/applications/fs/gap,
gnunet <=