gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3854 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r3854 - GNUnet/src/applications/dht/module
Date: Sat, 2 Dec 2006 19:21:39 -0800 (PST)

Author: grothoff
Date: 2006-12-02 19:21:36 -0800 (Sat, 02 Dec 2006)
New Revision: 3854

Modified:
   GNUnet/src/applications/dht/module/dstore.c
   GNUnet/src/applications/dht/module/dstore.h
   GNUnet/src/applications/dht/module/routing.c
Log:
more DHT code

Modified: GNUnet/src/applications/dht/module/dstore.c
===================================================================
--- GNUnet/src/applications/dht/module/dstore.c 2006-12-03 01:31:53 UTC (rev 
3853)
+++ GNUnet/src/applications/dht/module/dstore.c 2006-12-03 03:21:36 UTC (rev 
3854)
@@ -283,29 +283,40 @@
   MUTEX_UNLOCK(ds->lock);
   return SYSERR;
 }
+
+/**
+ * Lookup in the local datastore.
+ * @return total number of results found
+ */
+int dht_store_get(const HashCode512 * key,
+                 unsigned int type,
+                 ResultHandler handler,
+                 void * cls) {
+  return 0;
+}
+
+/**
+ * Store the given data in the local datastore.
+ */
+void dht_store_put(unsigned int type,
+                  const HashCode512 * key,
+                  cron_t discard_time,
+                  unsigned int size,
+                  const char * data) {
+  if (discard_time < get_time())
+    return;
   
+}
+  
 /**
  * Initialize dstore DHT component.
  *
  * @param capi the core API
  * @return OK on success
  */
-Blockstore * init_dht_store(size_t max_size,
-                           CoreAPIForApplication * capi) {
-  Blockstore * res;
-  MemoryDatastore * md;
-
-  md = MALLOC(sizeof(MemoryDatastore));
-  md->max_memory = max_size;
-  md->first = NULL;
-  md->lock = MUTEX_CREATE(YES);
-  res = MALLOC(sizeof(Blockstore));
-  res->get = &ds_lookup;
-  res->put = &ds_store;
-  res->del = &ds_remove;
-  res->iterate = &ds_iterate;
-  res->closure = md;
-  return res;
+int init_dht_store(size_t max_size,
+                  CoreAPIForApplication * capi) {
+  return OK;
 }
 
 /**
@@ -313,25 +324,7 @@
  *
  * @return OK on success
  */
-int done_dht_store(Blockstore * ds) {
-  MemoryDatastore * md;
-  HT_Entry * pos;
-  HT_Entry * next;
-  unsigned int i;
-
-  md  = ds->closure;
-  pos = md->first;
-  while (pos != NULL) {
-    next = pos->next;
-    for (i=0;i<pos->count;i++)
-      FREENONNULL(pos->values[i]);
-    FREE(pos->values);
-    FREE(pos);
-    pos = next;
-  }
-  MUTEX_DESTROY(md->lock);
-  FREE(md);
-  FREE(ds);
+int done_dht_store() {
   return OK;
 }
 

Modified: GNUnet/src/applications/dht/module/dstore.h
===================================================================
--- GNUnet/src/applications/dht/module/dstore.h 2006-12-03 01:31:53 UTC (rev 
3853)
+++ GNUnet/src/applications/dht/module/dstore.h 2006-12-03 03:21:36 UTC (rev 
3854)
@@ -31,20 +31,44 @@
 #include "gnunet_core.h"
 #include "gnunet_blockstore.h"
 
+typedef void (*ResultHandler)(const HashCode512 * key,
+                             unsigned int type,
+                             unsigned int size,
+                             const char * data,
+                             void * cls);
+
 /**
+ * Lookup in the local datastore.
+ * @return total number of results found
+ */
+int dht_store_get(const HashCode512 * key,
+                 unsigned int type,
+                 ResultHandler handler,
+                 void * cls);
+
+/**
+ * Store the given data in the local datastore.
+ */
+void dht_store_put(unsigned int type,
+                  const HashCode512 * key,
+                  cron_t discard_time,
+                  unsigned int size,
+                  const char * data);
+
+/**
  * Initialize dstore DHT component.
  *
  * @param capi the core API
- * @return NULL on error
+ * @return OK on success
  */
-Blockstore * init_dht_store(size_t max_size,
-                           CoreAPIForApplication * capi);
+int init_dht_store(size_t max_size,
+                  CoreAPIForApplication * capi);
 
 /**
  * Shutdown dstore DHT component.
  *
  * @return OK on success
  */
-int done_dht_store(Blockstore * store);
+int done_dht_store(void);
 
 #endif

Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c        2006-12-03 01:31:53 UTC 
(rev 3853)
+++ GNUnet/src/applications/dht/module/routing.c        2006-12-03 03:21:36 UTC 
(rev 3854)
@@ -27,20 +27,294 @@
  * - tracking of get/put opertations
  * - retry
  * - reply handling
- * - register handlers (get, put, result)
+ * - prioritization
+ * - stats
  */
 
 #include "platform.h"
 #include "routing.h"
 #include "table.h"
+#include "dstore.h"
+#include "gnunet_protocols.h"
+#include "gnunet_stats_service.h"
 
 /**
+ * @brief record used for sending response back
+ */
+typedef struct {
+
+  /**
+   * Source of the request.  Replies should be forwarded to
+   * this peer.
+   */
+  PeerIdentity source;
+
+  /**
+   * If local peer is NOT interested in results, this callback
+   * will be NULL. 
+   */
+  ResultHandler receiver;
+
+  void * receiver_closure;
+
+} DHT_Source_Route;
+
+/**
+ * @brief message send for DHT lookup
+ */
+typedef struct {
+
+  MESSAGE_HEADER header;
+
+  /**
+   * Type of the requested content
+   */
+  unsigned int type;
+
+  /**
+   * Priority of requested content
+   */ 
+  unsigned int prio;
+
+  /**
+   * Reserved (for now, always zero)
+   */
+  unsigned int reserved;
+
+  /**
+   * Search key.
+   */
+  HashCode512 key;
+
+} DHT_GET_MESSAGE;
+
+/**
+ * @brief message send for DHT put
+ *
+ * Message is followed by the data.
+ */
+typedef struct {
+
+  MESSAGE_HEADER header;
+
+  /**
+   * Type of the content
+   */
+  unsigned int type;
+
+  /**
+   * When to discard the content (relative time)
+   */
+  cron_t timeout;
+
+  /**
+   * Key for the content.
+   */ 
+  HashCode512 key;
+
+} DHT_PUT_MESSAGE;
+
+/**
+ * @brief message send for DHT put
+ *
+ * Message is followed by the data.
+ */
+typedef struct {
+
+  MESSAGE_HEADER header;
+
+  /**
+   * Type of the content
+   */
+  unsigned int type;
+
+  /**
+   * Key for the content.
+   */ 
+  HashCode512 key;
+
+} DHT_RESULT_MESSAGE;
+
+/**
+ * Entry in the DHT routing table.
+ */
+typedef struct {
+
+  DHT_Source_Route source;
+
+  DHT_GET_MESSAGE * get;
+
+} DHTQueryRecord;
+
+static unsigned int rt_size;
+
+static unsigned int rt_pos;
+
+static DHTQueryRecord ** records;
+
+/**
+ * Statistics service.
+ */
+static Stats_ServiceAPI * stats;
+
+static struct MUTEX * lock;
+
+static CoreAPIForApplication * coreAPI;
+
+static unsigned int stat_replies_routed;
+
+static unsigned int stat_requests_routed;
+
+/**
+ * Given a result, lookup in the routing table
+ * where to send it next.
+ */
+static void routeResult(const HashCode512 * key,
+                       unsigned int type,
+                       unsigned int size,
+                       const char * data,
+                       void * cls) {
+}
+
+/**
+ * Larger factors will result in more aggressive routing of GET
+ * operations (each peer will either forward to GET_TRIES peers that
+ * are closer to the key).
+ */ 
+#define GET_TRIES 4
+
+/**
+ * Handle GET message.
+ */
+static int handleGet(const PeerIdentity * sender,
+                    const MESSAGE_HEADER * msg) {
+  PeerIdentity next[GET_TRIES];
+  const DHT_GET_MESSAGE * get;
+  int total;
+  int i;
+
+  if (ntohs(msg->size) != sizeof(DHT_GET_MESSAGE)) {
+    GE_BREAK(NULL, 0);
+    return SYSERR;
+  }
+  get = (const DHT_GET_MESSAGE*) msg;
+  total = dht_store_get(&get->key,
+                       ntohl(get->type),
+                       &routeResult,
+                       NULL);
+  if (total > 0)
+    return OK;
+  for (i=0;i<GET_TRIES;i++) {
+    if (OK != select_dht_peer(&next[i],
+                             &get->key,
+                             &next[0],
+                             i)) 
+      break;
+    if (-1 == hashCodeCompareDistance(&next[i].hashPubKey,
+                                     &coreAPI->myIdentity->hashPubKey,
+                                     &get->key))
+      coreAPI->unicast(&next[i],
+                      msg,
+                      0, /* FIXME: priority */
+                      5 * cronSECONDS); /* FIXME */
+  }
+  return OK;
+}
+
+/**
+ * Larger factors will result in more replication and
+ * more aggressive routing of PUT operations (each 
+ * peer will either forward to PUT_TRIES peers that
+ * are closer to the key, or replicate the content).
+ */ 
+#define PUT_TRIES 2
+
+/**
+ * Handle PUT message.
+ */
+static int handlePut(const PeerIdentity * sender,
+                    const MESSAGE_HEADER * msg) {
+  PeerIdentity next[PUT_TRIES];
+  const DHT_PUT_MESSAGE * put;
+  int store;
+  int i;
+
+  if (ntohs(msg->size) < sizeof(DHT_PUT_MESSAGE)) {
+    GE_BREAK(NULL, 0);
+    return SYSERR;
+  }
+  put = (const DHT_PUT_MESSAGE*) msg;
+  store = 0;
+  for (i=0;i<PUT_TRIES;i++) {
+    if (OK != select_dht_peer(&next[i],
+                             &put->key,
+                             &next[0],
+                             i)) {
+      store = 1;
+      break;
+    }
+    if (1 == hashCodeCompareDistance(&next[i].hashPubKey,
+                                    &coreAPI->myIdentity->hashPubKey,
+                                    &put->key))
+      store = 1; /* we're closer than the selected target */
+    else 
+      coreAPI->unicast(&next[i],
+                      msg,
+                      0, /* FIXME: priority */
+                      5 * cronSECONDS); /* FIXME */
+  }
+  if (store != 0)
+    dht_store_put(ntohl(put->type),
+                 &put->key,
+                 ntohll(put->timeout) + get_time(),
+                 ntohs(put->header.size) - sizeof(DHT_PUT_MESSAGE),
+                 (const char*) &put[1]);
+  return OK;
+}
+
+/**
+ * Handle RESULT message.
+ */
+static int handleResult(const PeerIdentity * sender,
+                       const MESSAGE_HEADER * msg) {
+  const DHT_RESULT_MESSAGE * result;
+
+  if (ntohs(msg->size) < sizeof(DHT_RESULT_MESSAGE)) {
+    GE_BREAK(NULL, 0);
+    return SYSERR;
+  }
+  result = (const DHT_RESULT_MESSAGE*) msg;
+  routeResult(&result->key,
+             ntohl(result->type),
+             ntohs(result->header.size) - sizeof(DHT_RESULT_MESSAGE),
+             (const char*) &result[1],
+             NULL);
+  return OK;
+}
+
+/**
  * Initialize routing DHT component.
  *
  * @param capi the core API
  * @return OK on success
  */
 int init_dht_routing(CoreAPIForApplication * capi) {
+  coreAPI = capi;
+  GROW(records,
+       rt_size,
+       512);
+  lock = MUTEX_CREATE(NO);
+  stats = capi->requestService("stats");
+  if (stats != NULL) {
+    stat_replies_routed = stats->create(gettext_noop("# dht replies routed"));
+    stat_requests_routed = stats->create(gettext_noop("# dht requests 
routed"));
+  }
+  coreAPI->registerHandler(P2P_PROTO_DHT_GET,
+                          &handleGet);
+  coreAPI->registerHandler(P2P_PROTO_DHT_PUT,
+                          &handlePut);
+  coreAPI->registerHandler(P2P_PROTO_DHT_RESULT,
+                          &handleResult);
   return OK;
 }
 
@@ -50,7 +324,29 @@
  * @return OK on success
  */
 int done_dht_routing() {
+  unsigned int i;
+
+  coreAPI->unregisterHandler(P2P_PROTO_DHT_GET,
+                            &handleGet);
+  coreAPI->unregisterHandler(P2P_PROTO_DHT_PUT,
+                            &handlePut);
+  coreAPI->unregisterHandler(P2P_PROTO_DHT_RESULT,
+                            &handleResult);
+  if (stats != NULL) {
+    coreAPI->releaseService(stats);
+    stats = NULL;
+  }
+  MUTEX_DESTROY(lock);
+  for (i=0;i<rt_size;i++) {
+    if (records[i] != NULL) {
+      FREE(records[i]->get);
+      FREE(records[i]);
+    }
+  }
+  GROW(records,
+       rt_size,
+       0);
   return OK;
 }
 
-
+/* end of routing.c */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]