gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r16991 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r16991 - gnunet/src/dht
Date: Wed, 21 Sep 2011 18:47:21 +0200

Author: grothoff
Date: 2011-09-21 18:47:21 +0200 (Wed, 21 Sep 2011)
New Revision: 16991

Modified:
   gnunet/src/dht/gnunet-service-dht-new.c
   gnunet/src/dht/gnunet-service-dht_neighbours.c
   gnunet/src/dht/gnunet-service-dht_neighbours.h
Log:
stuff

Modified: gnunet/src/dht/gnunet-service-dht-new.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht-new.c     2011-09-21 15:42:32 UTC (rev 
16990)
+++ gnunet/src/dht/gnunet-service-dht-new.c     2011-09-21 16:47:21 UTC (rev 
16991)
@@ -777,27 +777,6 @@
 
 
 /**
- * Find the optimal bucket for this key, regardless
- * of the current number of buckets in use.
- *
- * @param hc the hashcode to compare our identity to
- *
- * @return the proper bucket index, or GNUNET_SYSERR
- *         on error (same hashcode)
- */
-static int
-find_bucket (const GNUNET_HashCode * hc)
-{
-  unsigned int bits;
-
-  bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
-  if (bits == MAX_BUCKETS)
-    return GNUNET_SYSERR;
-  return MAX_BUCKETS - bits - 1;
-}
-
-
-/**
  * Find which k-bucket this peer should go into,
  * taking into account the size of the k-bucket
  * array.  This means that if more bits match than
@@ -917,128 +896,7 @@
                                           &update_core_preference_finish, 
peer);
 }
 
-
 /**
- * Given a peer and its corresponding bucket,
- * remove it from that bucket.  Does not free
- * the PeerInfo struct, nor cancel messages
- * or free messages waiting to be sent to this
- * peer!
- *
- * @param peer the peer to remove
- * @param bucket the bucket the peer belongs to
- */
-static void
-remove_peer (struct PeerInfo *peer, unsigned int bucket)
-{
-  GNUNET_assert (k_buckets[bucket].peers_size > 0);
-  GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
-                               peer);
-  k_buckets[bucket].peers_size--;
-  if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) 
&&
-      (lowest_bucket < MAX_BUCKETS - 1))
-    lowest_bucket++;
-}
-
-/**
- * Removes peer from a bucket, then frees associated
- * resources and frees peer.
- *
- * @param peer peer to be removed and freed
- * @param bucket which bucket this peer belongs to
- */
-static void
-delete_peer (struct PeerInfo *peer, unsigned int bucket)
-{
-  struct P2PPendingMessage *pos;
-  struct P2PPendingMessage *next;
-
-  remove_peer (peer, bucket);   /* First remove the peer from its bucket */
-  if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (peer->send_task);
-  if ((peer->th != NULL) && (coreAPI != NULL))
-    GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
-
-  pos = peer->head;
-  while (pos != NULL)           /* Remove any pending messages for this peer */
-  {
-    increment_stats
-        ("# dht pending messages discarded (due to disconnect/shutdown)");
-    next = pos->next;
-    GNUNET_free (pos);
-    pos = next;
-  }
-
-  GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
-                 (all_known_peers, &peer->id.hashPubKey));
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
-                                                       &peer->id.hashPubKey,
-                                                       peer));
-  GNUNET_free (peer);
-  decrement_stats (STAT_PEERS_KNOWN);
-}
-
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value PeerInfo of the peer to move to new lowest bucket
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
-{
-  struct PeerInfo *peer = value;
-  int new_bucket;
-
-  GNUNET_assert (lowest_bucket > 0);
-  new_bucket = lowest_bucket - 1;
-  remove_peer (peer, lowest_bucket);
-  GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
-                                     k_buckets[new_bucket].tail,
-                                     k_buckets[new_bucket].tail, peer);
-  k_buckets[new_bucket].peers_size++;
-  return GNUNET_YES;
-}
-
-
-/**
- * The current lowest bucket is full, so change the lowest
- * bucket to the next lower down, and move any appropriate
- * entries in the current lowest bucket to the new bucket.
- */
-static void
-enable_next_bucket ()
-{
-  struct GNUNET_CONTAINER_MultiHashMap *to_remove;
-  struct PeerInfo *pos;
-
-  GNUNET_assert (lowest_bucket > 0);
-  to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
-  pos = k_buckets[lowest_bucket].head;
-
-  /* Populate the array of peers which should be in the next lowest bucket */
-  while (pos != NULL)
-  {
-    if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
-      GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
-                                         
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    pos = pos->next;
-  }
-
-  /* Remove peers from lowest bucket, insert into next lowest bucket */
-  GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (to_remove);
-  lowest_bucket = lowest_bucket - 1;
-}
-
-
-/**
  * Find the closest peer in our routing table to the
  * given hashcode.
  *
@@ -2464,286 +2322,6 @@
 
 
 /**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
-{
-  struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
-
-  GNUNET_CONTAINER_bloomfilter_add (bloom, key);
-  return GNUNET_YES;
-}
-
-/**
- * Task to send a find peer message for our own peer identifier
- * so that we can find the closest peers in the network to ourselves
- * and attempt to connect to them.
- *
- * @param cls closure for this task
- * @param tc the context under which the task is running
- */
-static void
-send_find_peer_message (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
-  struct DHT_MessageContext msg_ctx;
-  struct GNUNET_TIME_Relative next_send_time;
-  struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
-
-  if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
-    return;
-
-  if (newly_found_peers > bucket_size)        /* If we are finding peers 
already, no need to send out our request right now! */
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Have %d newly found peers since last find peer message 
sent!\n",
-                newly_found_peers);
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
-                                  &send_find_peer_message, NULL);
-    newly_found_peers = 0;
-    return;
-  }
-
-  increment_stats (STAT_FIND_PEER_START);
-#if FIND_PEER_WITH_HELLO
-  find_peer_msg =
-      GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
-                     GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
-                                        my_hello));
-  find_peer_msg->header.size =
-      htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
-             GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
-  memcpy (&find_peer_msg[1], my_hello,
-          GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
-#else
-  find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
-  find_peer_msg->header.size =
-      htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
-#endif
-  find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
-  temp_bloom =
-      GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
-  GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
-                                         temp_bloom);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
-                                                            find_peer_msg->
-                                                            bloomfilter,
-                                                            DHT_BLOOM_SIZE));
-  GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
-  memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
-  memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
-  msg_ctx.unique_id =
-      GNUNET_ntohll (GNUNET_CRYPTO_random_u64
-                     (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
-  msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
-  msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
-  msg_ctx.network_size = log_of_network_size_estimate;
-  msg_ctx.peer = my_identity;
-  msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
-  msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
-
-  demultiplex_message (&find_peer_msg->header, &msg_ctx);
-  GNUNET_free (find_peer_msg);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
-              "DHT", "FIND PEER");
-  if (newly_found_peers < bucket_size)
-  {
-    next_send_time.rel_value =
-        (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
-        GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
-                                  DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 
2);
-  }
-  else
-  {
-    next_send_time.rel_value =
-        DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
-        GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
-                                  DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
-                                  DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
-  }
-
-  GNUNET_assert (next_send_time.rel_value != 0);
-  find_peer_context.count = 0;
-  newly_found_peers = 0;
-  find_peer_context.start = GNUNET_TIME_absolute_get ();
-  GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
-                               NULL);  
-}
-
-
-/**
- * Core handler for p2p route requests.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @param atsi performance data
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity 
*peer,
-                              const struct GNUNET_MessageHeader *message,
-                              const struct GNUNET_TRANSPORT_ATS_Information
-                              *atsi)
-{
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s:%s': Received P2P request from peer %s\n", my_short_id,
-              "DHT", GNUNET_i2s (peer));
-#endif
-  struct GNUNET_DHT_P2PRouteMessage *incoming =
-      (struct GNUNET_DHT_P2PRouteMessage *) message;
-  struct GNUNET_MessageHeader *enc_msg =
-      (struct GNUNET_MessageHeader *) &incoming[1];
-  struct DHT_MessageContext *msg_ctx;
-  char *route_path;
-  int path_size;
-
-  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_YES;
-  }
-
-  if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending of previous replies took too long, backing off!\n");
-    increment_stats ("# route requests dropped due to high load");
-    decrease_max_send_delay (get_max_send_delay ());
-    return GNUNET_YES;
-  }
-  msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
-  msg_ctx->bloom =
-      GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
-                                         DHT_BLOOM_K);
-  GNUNET_assert (msg_ctx->bloom != NULL);
-  msg_ctx->hop_count = ntohl (incoming->hop_count);
-  memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
-  msg_ctx->replication = ntohl (incoming->desired_replication_level);
-  msg_ctx->msg_options = ntohl (incoming->options);
-  if (GNUNET_DHT_RO_RECORD_ROUTE ==
-      (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
-  {
-    path_size =
-        ntohl (incoming->outgoing_path_length) *
-        sizeof (struct GNUNET_PeerIdentity);
-    if (ntohs (message->size) !=
-        (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
-         path_size))
-    {
-      GNUNET_break_op (0);
-      GNUNET_free (msg_ctx);
-      return GNUNET_YES;
-    }
-    route_path = (char *) &incoming[1];
-    route_path = route_path + ntohs (enc_msg->size);
-    msg_ctx->path_history =
-        GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
-    memcpy (msg_ctx->path_history, route_path, path_size);
-    memcpy (&msg_ctx->path_history[path_size], &my_identity,
-            sizeof (struct GNUNET_PeerIdentity));
-    msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
-  }
-  msg_ctx->network_size = ntohl (incoming->network_size);
-  msg_ctx->peer = *peer;
-  msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
-  msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
-  demultiplex_message (enc_msg, msg_ctx);
-  if (msg_ctx->bloom != NULL)
-  {
-    GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
-    msg_ctx->bloom = NULL;
-  }
-  GNUNET_free (msg_ctx);
-  return GNUNET_YES;
-}
-
-
-/**
- * Core handler for p2p route results.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @param atsi performance data
- *
- */
-static int
-handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
-                             const struct GNUNET_MessageHeader *message,
-                             const struct GNUNET_TRANSPORT_ATS_Information
-                             *atsi)
-{
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
-              GNUNET_i2s (peer));
-#endif
-  const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
-      (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
-  struct GNUNET_MessageHeader *enc_msg =
-      (struct GNUNET_MessageHeader *) &incoming[1];
-  struct DHT_MessageContext msg_ctx;
-
-  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_YES;
-  }
-
-  memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
-  memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
-  msg_ctx.msg_options = ntohl (incoming->options);
-  msg_ctx.hop_count = ntohl (incoming->hop_count);
-  msg_ctx.peer = *peer;
-  msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing 
a higher priority */
-  msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
-  if ((GNUNET_DHT_RO_RECORD_ROUTE ==
-       (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
-      (ntohl (incoming->outgoing_path_length) > 0))
-  {
-    if (ntohs (message->size) -
-        sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
-        ntohs (enc_msg->size) !=
-        ntohl (incoming->outgoing_path_length) *
-        sizeof (struct GNUNET_PeerIdentity))
-    {
-#if DEBUG_DHT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Return message indicated a path was included, but sizes are 
wrong: Total size %d, enc size %d, left %d, expected %d\n",
-                  ntohs (message->size), ntohs (enc_msg->size),
-                  ntohs (message->size) -
-                  sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
-                  ntohs (enc_msg->size),
-                  ntohl (incoming->outgoing_path_length) *
-                  sizeof (struct GNUNET_PeerIdentity));
-#endif
-      GNUNET_break_op (0);
-      return GNUNET_NO;
-    }
-    msg_ctx.path_history = (char *) &incoming[1];
-    msg_ctx.path_history += ntohs (enc_msg->size);
-    msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
-  }
-  route_result_message (enc_msg, &msg_ctx);
-  return GNUNET_YES;
-}
-
-
-/**
  * Receive the HELLO from transport service,
  * free current and replace if necessary.
  *
@@ -2757,7 +2335,6 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received our `%s' from transport service\n", "HELLO");
 #endif
-
   GNUNET_assert (message != NULL);
   GNUNET_free_non_null (my_hello);
   my_hello = GNUNET_malloc (ntohs (message->size));
@@ -2788,16 +2365,8 @@
     GNUNET_TRANSPORT_disconnect (transport_handle);
     transport_handle = NULL;
   }
+  GDS_NEIGHBOURS_done ();
   GDS_NSE_done ();
-  if (coreAPI != NULL)
-  {
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
-                my_short_id, "DHT");
-#endif
-    GNUNET_CORE_disconnect (coreAPI);
-    coreAPI = NULL;
-  }
   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; 
bucket_count++)
   {
     while (k_buckets[bucket_count].head != NULL)
@@ -2830,60 +2399,10 @@
     GNUNET_BLOCK_context_destroy (block_context);
     block_context = NULL;
   }
-  GNUNET_free_non_null (my_short_id);
-  my_short_id = NULL;
 }
 
 
-/**
- * To be called on core init/fail.
- *
- * @param cls service closure
- * @param server handle to the server for this service
- * @param identity the public identity of this peer
- * @param publicKey the public key of this peer
- */
-static void
-core_init (void *cls, struct GNUNET_CORE_Handle *server,
-           const struct GNUNET_PeerIdentity *identity,
-           const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
-{
 
-  if (server == NULL)
-  {
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
-                "dht", GNUNET_i2s (identity));
-#endif
-    GNUNET_SCHEDULER_cancel (cleanup_task);
-    GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
-    return;
-  }
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Core connection initialized, I am peer: %s\n", "dht",
-              GNUNET_i2s (identity));
-#endif
-
-  /* Copy our identity so we can use it */
-  memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
-  if (my_short_id != NULL)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%s Receive CORE INIT message but have already been 
initialized! Did CORE fail?\n",
-                "DHT SERVICE");
-  my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
-}
-
-
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-  {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
-  {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
-  {NULL, 0, 0}
-};
-
-
-
-
 /**
  * Process dht requests.
  *

Modified: gnunet/src/dht/gnunet-service-dht_neighbours.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_neighbours.c      2011-09-21 15:42:32 UTC 
(rev 16990)
+++ gnunet/src/dht/gnunet-service-dht_neighbours.c      2011-09-21 16:47:21 UTC 
(rev 16991)
@@ -49,8 +49,124 @@
  */
 #define DEFAULT_BUCKET_SIZE 4
 
+/**
+ * Size of the bloom filter the DHT uses to filter peers.
+ */
+#define DHT_BLOOM_SIZE 128
 
+
 /**
+ * P2P PUT message
+ */
+struct PeerPutMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Processing options
+   */
+  uint32_t options GNUNET_PACKED;
+
+  /**
+   * Content type.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Hop count
+   */
+  uint32_t hop_count GNUNET_PACKED;
+
+  /**
+   * Replication level for this message
+   */
+  uint32_t desired_replication_level GNUNET_PACKED;
+
+  /**
+   * Generic route path length for a message in the
+   * DHT that arrived at a peer and generated
+   * a reply. Copied to the end of this message.
+   */
+  uint32_t outgoing_path_length GNUNET_PACKED;
+
+  /**
+   * Bloomfilter (for peer identities) to stop circular routes
+   */
+  char bloomfilter[DHT_BLOOM_SIZE];
+
+  /**
+   * The key we are storing under.
+   */
+  GNUNET_HashCode key;
+
+  /* put path (if tracked) */
+
+  /* Payload */
+
+};
+
+
+/**
+ * P2P GET message
+ */
+struct PeerGetMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Processing options
+   */
+  uint32_t options GNUNET_PACKED;
+
+  /**
+   * Desired content type.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Hop count
+   */
+  uint32_t hop_count GNUNET_PACKED;
+
+  /**
+   * Desired replication level for this request.
+   */
+  uint32_t desired_replication_level GNUNET_PACKED;
+
+  /**
+   * Size of the extended query.
+   */
+  uint32_t xquery_size;
+
+  /**
+   * Bloomfilter mutator.
+   */
+  uint32_t bf_mutator;
+
+  /**
+   * Bloomfilter (for peer identities) to stop circular routes
+   */
+  char bloomfilter[DHT_BLOOM_SIZE];
+
+  /**
+   * The key we are looking for.
+   */
+  GNUNET_HashCode key;
+
+  /* xquery */
+
+  /* result bloomfilter */
+
+};
+
+
+/**
  * Linked list of messages to send to a particular other peer.
  */
 struct P2PPendingMessage
@@ -183,14 +299,19 @@
 
 
 /**
- * The lowest currently used bucket.
+ * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
  */
-static unsigned int lowest_bucket;      /* Initially equal to MAX_BUCKETS - 1 
*/
+static unsigned int closest_bucket;
 
 /**
- * The buckets (Kademlia routing table, complete with growth).
- * Array of size MAX_BUCKET_SIZE.
+ * How many peers have we added since we sent out our last
+ * find peer request?
  */
+static unsigned int newly_found_peers;
+
+/**
+ * The buckets.  Array of size MAX_BUCKET_SIZE.  Offset 0 means 0 bits 
matching.
+ */
 static struct PeerBucket k_buckets[MAX_BUCKETS];
 
 /**
@@ -203,9 +324,36 @@
  */
 static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
 
+/**
+ * Task that sends FIND PEER requests.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
 
 
 /**
+ * Find the optimal bucket for this key.
+ *
+ * @param hc the hashcode to compare our identity to
+ * @return the proper bucket index, or GNUNET_SYSERR
+ *         on error (same hashcode)
+ */
+static int
+find_bucket (const GNUNET_HashCode * hc)
+{
+  unsigned int bits;
+
+  bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
+  if (bits == MAX_BUCKETS)
+    {
+      /* How can all bits match? Got my own ID? */
+      GNUNET_break (0);
+      return GNUNET_SYSERR; 
+    }
+  return MAX_BUCKETS - bits - 1;
+}
+
+
+/**
  * Method called whenever a peer connects.
  *
  * @param cls closure
@@ -222,29 +370,15 @@
   /* Check for connect to self message */
   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
     return;
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s:%s Receives core connect message for peer %s distance %d!\n",
-              my_short_id, "dht", GNUNET_i2s (peer), distance);
-#endif
-
   if (GNUNET_YES ==
       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
                                               &peer->hashPubKey))
   {
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s:%s Received %s message for peer %s, but already have peer 
in RT!",
-                my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
-#endif
     GNUNET_break (0);
     return;
   }
-
-  peer_bucket = find_current_bucket (&peer->hashPubKey);
-  GNUNET_assert (peer_bucket >= lowest_bucket);
-  GNUNET_assert (peer_bucket < MAX_BUCKETS);
+  peer_bucket = find_bucket (&peer->hashPubKey);
+  GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
   ret = GNUNET_malloc (sizeof (struct PeerInfo));
 #if 0
   ret->latency = latency;
@@ -255,23 +389,17 @@
                                      k_buckets[peer_bucket].tail,
                                      k_buckets[peer_bucket].tail, ret);
   k_buckets[peer_bucket].peers_size++;
-  if ((GNUNET_CRYPTO_hash_matching_bits
-       (&my_identity.hashPubKey, &peer->hashPubKey) > 0) &&
-      (k_buckets[peer_bucket].peers_size <= bucket_size))
-    ret->preference_task =
-        GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
-  if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
-    enable_next_bucket ();
+  closest_bucket = GNUNET_MAX (closest_bucket,
+                              peer_bucket);
+  if ( (peer_bucket > 0) &&
+       (k_buckets[peer_bucket].peers_size <= bucket_size) )
+    ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, 
ret);
   newly_found_peers++;
-  GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (all_known_peers, 
+                                                   &peer->hashPubKey, ret,
+                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   increment_stats (STAT_PEERS_KNOWN);
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
-              ret == NULL ? "NOT ADDED" : "PEER ADDED");
-#endif
 }
 
 
@@ -286,68 +414,547 @@
 {
   struct PeerInfo *to_remove;
   int current_bucket;
+  struct P2PPendingMessage *pos;
+  struct P2PPendingMessage *next;
 
   /* Check for disconnect from self message */
   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
     return;
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s:%s: Received peer disconnect message for peer `%s' from 
%s\n",
-              my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
-#endif
-
-  if (GNUNET_YES !=
-      GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
-                                              &peer->hashPubKey))
-  {
-    GNUNET_break (0);
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
-                my_short_id, "DHT", GNUNET_i2s (peer));
-#endif
-    return;
-  }
-  increment_stats (STAT_DISCONNECTS);
-  GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
-                 (all_known_peers, &peer->hashPubKey));
   to_remove =
       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
-  GNUNET_assert (to_remove != NULL);
+  if (NULL == to_remove)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
+                                                       &peer->hashPubKey,
+                                                       to_remove));
   if (NULL != to_remove->info_ctx)
   {
     GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
     to_remove->info_ctx = NULL;
   }
-  GNUNET_assert (0 ==
-                 memcmp (peer, &to_remove->id,
-                         sizeof (struct GNUNET_PeerIdentity)));
   current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
-  delete_peer (to_remove, current_bucket);
+  GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
+                              k_buckets[current_bucket].tail,
+                               to_remove);
+  GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
+  k_buckets[current_bucket].peers_size--;
+  while ( (lowest_bucket > 0) &&
+         (k_buckets[lowest_bucket].peers_size == 0) )
+    lowest_bucket--;
+
+  if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (peer->send_task);
+    peer->send_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (to_remove->th != NULL) 
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
+    to_remove->th = NULL;
+  }
+  while (NULL != (pos = to_remove->head))
+  {
+    GNUNET_CONTAINER_DLL_remove (to_remove->head,
+                                to_remove->tail,
+                                pos);
+    GNUNET_free (pos);
+  }
 }
 
 
+/**
+ * Perform a PUT operation.  // FIXME: document if this is only
+ * routing or also storage and/or even local client notification!
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param desired_replication_level desired replication count
+ * @param expiration_time when does the content expire
+ * @param key key for the content
+ * @param put_path_length number of entries in put_path
+ * @param put_path peers this request has traversed so far (if tracked)
+ * @param data payload to store
+ * @param data_size number of bytes in data
+ */
+void
+GST_NEIGHBOURS_handle_put (uint32_t type,
+                          uint32_t options,
+                          uint32_t desired_replication_level,
+                          GNUNET_TIME_Absolute expiration_time,
+                          const GNUNET_HashCode *key,
+                          unsigned int put_path_length,
+                          struct GNUNET_PeerIdentity *put_path,
+                          const void *data,
+                          size_t data_size)
+{
+  // FIXME
+}
 
+
 /**
- * Initialize neighbours subsystem.
+ * Perform a GET operation.  // FIXME: document if this is only
+ * routing or also state-tracking and/or even local lookup!
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param desired_replication_level desired replication count
+ * @param key key for the content
+ * @param xquery extended query
+ * @param xquery_size number of bytes in xquery
+ * @param reply_bf bloomfilter to filter duplicates
+ * @param reply_bf_mutator mutator for reply_bf
+ * @param peer_bf filter for peers not to select (again)
  */
 void
-GST_NEIGHBOURS_init ()
+GST_NEIGHBOURS_handle_get (uint32_t type,
+                          uint32_t options,
+                          uint32_t desired_replication_level,
+                          const GNUNET_HashCode *key,
+                          const void *xquery,
+                          size_t xquery_size,
+                          const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
+                          uint32_t reply_bf_mutator,
+                          const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
 {
+  // FIXME
 }
 
 
 /**
- * Shutdown neighbours subsystem.
+ * Handle a reply (route to origin).  FIXME: should this be here?
+ * (reply-routing table might be better done elsewhere).
+ *
+ * @param type type of the block
+ * @param options routing options
+ * @param expiration_time when does the content expire
+ * @param key key for the content
+ * @param put_path_length number of entries in put_path
+ * @param put_path peers the original PUT traversed (if tracked)
+ * @param get_path_length number of entries in put_path
+ * @param get_path peers this reply has traversed so far (if tracked)
+ * @param data payload of the reply
+ * @param data_size number of bytes in data
  */
 void
-GST_NEIGHBOURS_done ()
+GST_NEIGHBOURS_handle_reply (uint32_t type,
+                            uint32_t options,
+                            GNUNET_TIME_Absolute expiration_time,
+                            const GNUNET_HashCode *key,
+                            unsigned int put_path_length,
+                            struct GNUNET_PeerIdentity *put_path,
+                            unsigned int get_path_length,
+                            struct GNUNET_PeerIdentity *get_path,
+                            const void *data,
+                            size_t data_size)
 {
+  // FIXME
 }
 
 
+/**
+ * Add each of the peers we already know to the bloom filter of
+ * the request so that we don't get duplicate HELLOs.
+ *
+ * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
+ * @param key peer identity to add to the bloom filter
+ * @param value value the peer information (unused)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
 
+  GNUNET_CONTAINER_bloomfilter_add (bloom, key);
+  return GNUNET_YES;
+}
 
 
+/**
+ * Task to send a find peer message for our own peer identifier
+ * so that we can find the closest peers in the network to ourselves
+ * and attempt to connect to them.
+ *
+ * @param cls closure for this task
+ * @param tc the context under which the task is running
+ */
+static void
+send_find_peer_message (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
+  struct DHT_MessageContext msg_ctx;
+  struct GNUNET_TIME_Relative next_send_time;
+  struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
 
+  find_peer_task = GNUNET_SCHEDULER_NO_TASK;
+  if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+    return;
+  if (newly_found_peers > bucket_size) 
+  {
+    /* If we are finding many peers already, no need to send out our request 
right now! */
+    find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+                                                  &send_find_peer_message, 
NULL);
+    newly_found_peers = 0;
+    return;
+  }
+
+  // FIXME: build message...
+  find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
+  find_peer_msg->header.size =
+      htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
+  find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+  temp_bloom =
+      GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+  GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
+                                         temp_bloom);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
+                                                            find_peer_msg->
+                                                            bloomfilter,
+                                                            DHT_BLOOM_SIZE));
+  GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
+
+  memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
+  memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
+  msg_ctx.unique_id =
+      GNUNET_ntohll (GNUNET_CRYPTO_random_u64
+                     (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
+  msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
+  msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
+  msg_ctx.network_size = log_of_network_size_estimate;
+  msg_ctx.peer = my_identity;
+  msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
+  msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
+  // FIXME: transmit message...
+  demultiplex_message (&find_peer_msg->header, &msg_ctx);
+  GNUNET_free (find_peer_msg);
+
+  /* schedule next round */
+  newly_found_peers = 0;
+  next_send_time.rel_value =
+    (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
+                             DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
+  find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, 
+                                                &send_find_peer_message,
+                                                NULL);  
+}
+
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls service closure
+ * @param server handle to the server for this service
+ * @param identity the public identity of this peer
+ * @param publicKey the public key of this peer
+ */
+static void
+core_init (void *cls, struct GNUNET_CORE_Handle *server,
+           const struct GNUNET_PeerIdentity *identity,
+           const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
+{
+  GNUNET_assert (server != NULL);
+  my_identity = *identity;
+  next_send_time.rel_value =
+    DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
+    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
+                             (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
+                              2) -
+                             DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
+  find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
+                                                &send_find_peer_message,
+                                                NULL);
+}
+
+
+/**
+ * Core handler for p2p get requests.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
+                   const struct GNUNET_MessageHeader *message,
+                   const struct GNUNET_TRANSPORT_ATS_Information
+                   *atsi)
+{
+  struct GNUNET_DHT_P2PRouteMessage *incoming =
+      (struct GNUNET_DHT_P2PRouteMessage *) message;
+  struct GNUNET_MessageHeader *enc_msg =
+      (struct GNUNET_MessageHeader *) &incoming[1];
+  struct DHT_MessageContext *msg_ctx;
+  char *route_path;
+  int path_size;
+
+  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_YES;
+  }
+
+  if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending of previous replies took too long, backing off!\n");
+    increment_stats ("# route requests dropped due to high load");
+    decrease_max_send_delay (get_max_send_delay ());
+    return GNUNET_YES;
+  }
+  msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
+  msg_ctx->bloom =
+      GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
+                                         DHT_BLOOM_K);
+  GNUNET_assert (msg_ctx->bloom != NULL);
+  msg_ctx->hop_count = ntohl (incoming->hop_count);
+  memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
+  msg_ctx->replication = ntohl (incoming->desired_replication_level);
+  msg_ctx->msg_options = ntohl (incoming->options);
+  if (GNUNET_DHT_RO_RECORD_ROUTE ==
+      (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
+  {
+    path_size =
+        ntohl (incoming->outgoing_path_length) *
+        sizeof (struct GNUNET_PeerIdentity);
+    if (ntohs (message->size) !=
+        (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
+         path_size))
+    {
+      GNUNET_break_op (0);
+      GNUNET_free (msg_ctx);
+      return GNUNET_YES;
+    }
+    route_path = (char *) &incoming[1];
+    route_path = route_path + ntohs (enc_msg->size);
+    msg_ctx->path_history =
+        GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
+    memcpy (msg_ctx->path_history, route_path, path_size);
+    memcpy (&msg_ctx->path_history[path_size], &my_identity,
+            sizeof (struct GNUNET_PeerIdentity));
+    msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+  }
+  msg_ctx->network_size = ntohl (incoming->network_size);
+  msg_ctx->peer = *peer;
+  msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
+  msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
+  demultiplex_message (enc_msg, msg_ctx);
+  if (msg_ctx->bloom != NULL)
+  {
+    GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
+    msg_ctx->bloom = NULL;
+  }
+  GNUNET_free (msg_ctx);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Core handler for p2p put requests.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
+                   const struct GNUNET_MessageHeader *message,
+                   const struct GNUNET_TRANSPORT_ATS_Information
+                   *atsi)
+{
+  struct GNUNET_DHT_P2PRouteMessage *incoming =
+      (struct GNUNET_DHT_P2PRouteMessage *) message;
+  struct GNUNET_MessageHeader *enc_msg =
+      (struct GNUNET_MessageHeader *) &incoming[1];
+  struct DHT_MessageContext *msg_ctx;
+  char *route_path;
+  int path_size;
+
+  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_YES;
+  }
+
+  if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending of previous replies took too long, backing off!\n");
+    increment_stats ("# route requests dropped due to high load");
+    decrease_max_send_delay (get_max_send_delay ());
+    return GNUNET_YES;
+  }
+  msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
+  msg_ctx->bloom =
+      GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
+                                         DHT_BLOOM_K);
+  GNUNET_assert (msg_ctx->bloom != NULL);
+  msg_ctx->hop_count = ntohl (incoming->hop_count);
+  memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
+  msg_ctx->replication = ntohl (incoming->desired_replication_level);
+  msg_ctx->msg_options = ntohl (incoming->options);
+  if (GNUNET_DHT_RO_RECORD_ROUTE ==
+      (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
+  {
+    path_size =
+        ntohl (incoming->outgoing_path_length) *
+        sizeof (struct GNUNET_PeerIdentity);
+    if (ntohs (message->size) !=
+        (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
+         path_size))
+    {
+      GNUNET_break_op (0);
+      GNUNET_free (msg_ctx);
+      return GNUNET_YES;
+    }
+    route_path = (char *) &incoming[1];
+    route_path = route_path + ntohs (enc_msg->size);
+    msg_ctx->path_history =
+        GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
+    memcpy (msg_ctx->path_history, route_path, path_size);
+    memcpy (&msg_ctx->path_history[path_size], &my_identity,
+            sizeof (struct GNUNET_PeerIdentity));
+    msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+  }
+  msg_ctx->network_size = ntohl (incoming->network_size);
+  msg_ctx->peer = *peer;
+  msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
+  msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
+  demultiplex_message (enc_msg, msg_ctx);
+  if (msg_ctx->bloom != NULL)
+  {
+    GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
+    msg_ctx->bloom = NULL;
+  }
+  GNUNET_free (msg_ctx);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Core handler for p2p route results.
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ * @param atsi performance data
+ *
+ */
+static int
+handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
+                      const struct GNUNET_MessageHeader *message,
+                      const struct GNUNET_TRANSPORT_ATS_Information
+                      *atsi)
+{
+  const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
+      (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
+  struct GNUNET_MessageHeader *enc_msg =
+      (struct GNUNET_MessageHeader *) &incoming[1];
+  struct DHT_MessageContext msg_ctx;
+
+  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_YES;
+  }
+
+  memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
+  memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
+  msg_ctx.msg_options = ntohl (incoming->options);
+  msg_ctx.hop_count = ntohl (incoming->hop_count);
+  msg_ctx.peer = *peer;
+  msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;  /* Make result routing 
a higher priority */
+  msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
+  if ((GNUNET_DHT_RO_RECORD_ROUTE ==
+       (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
+      (ntohl (incoming->outgoing_path_length) > 0))
+  {
+    if (ntohs (message->size) -
+        sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
+        ntohs (enc_msg->size) !=
+        ntohl (incoming->outgoing_path_length) *
+        sizeof (struct GNUNET_PeerIdentity))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_NO;
+    }
+    msg_ctx.path_history = (char *) &incoming[1];
+    msg_ctx.path_history += ntohs (enc_msg->size);
+    msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
+  }
+  route_result_message (enc_msg, &msg_ctx);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Initialize neighbours subsystem.
+ */
+int
+GST_NEIGHBOURS_init ()
+{
+  static struct GNUNET_CORE_MessageHandler core_handlers[] = {
+    {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
+    {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
+    {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
+    {NULL, 0, 0}
+  };
+  unsigned long long temp_config_num;
+  struct GNUNET_TIME_Relative next_send_time;
+ 
+  if (GNUNET_OK ==
+      GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
+                                             &temp_config_num))
+    bucket_size = (unsigned int) temp_config_num;  
+  coreAPI = GNUNET_CORE_connect (GDS_cfg,   /* Main configuration */
+                                 DEFAULT_CORE_QUEUE_SIZE,       /* queue size 
*/
+                                 NULL,  /* Closure passed to DHT functions */
+                                 &core_init,    /* Call core_init once 
connected */
+                                 &handle_core_connect,  /* Handle connects */
+                                 &handle_core_disconnect,       /* remove 
peers on disconnects */
+                                 NULL,  /* Do we care about "status" updates? 
*/
+                                 NULL,  /* Don't want notified about all 
incoming messages */
+                                 GNUNET_NO,     /* For header only inbound 
notification */
+                                 NULL,  /* Don't want notified about all 
outbound messages */
+                                 GNUNET_NO,     /* For header only outbound 
notification */
+                                 core_handlers);        /* Register these 
handlers */  
+  if (coreAPI == NULL)
+    return GNUNET_SYSERR;
+  all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Shutdown neighbours subsystem.
+ */
+void
+GST_NEIGHBOURS_done ()
+{
+  GNUNET_assert (coreAPI != NULL);
+  GNUNET_CORE_disconnect (coreAPI);
+  coreAPI = NULL;    
+  GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size 
(all_known_peers));
+  GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
+  all_known_peers = NULL;
+  if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
+  {
+    GNUNET_SCHEDULER_cancel (find_peer_task);
+    find_peer_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+}
+
+
 /* end of gnunet-service-dht_neighbours.c */

Modified: gnunet/src/dht/gnunet-service-dht_neighbours.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht_neighbours.h      2011-09-21 15:42:32 UTC 
(rev 16990)
+++ gnunet/src/dht/gnunet-service-dht_neighbours.h      2011-09-21 16:47:21 UTC 
(rev 16991)
@@ -56,7 +56,6 @@
 /**
  * Perform a GET operation.
  *
- *
  * @param type type of the block
  * @param options routing options
  * @param desired_replication_level desired replication count




reply via email to

[Prev in Thread] Current Thread [Next in Thread]