gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r166 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r166 - GNUnet/src/applications/dht/module
Date: Thu, 3 Feb 2005 02:56:28 -0800 (PST)

Author: grothoff
Date: 2005-02-03 02:56:27 -0800 (Thu, 03 Feb 2005)
New Revision: 166

Modified:
   GNUnet/src/applications/dht/module/dht.c
Log:
dht compiles -- finally.  Not that it works yet.

Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c    2005-02-03 10:28:29 UTC (rev 
165)
+++ GNUnet/src/applications/dht/module/dht.c    2005-02-03 10:56:27 UTC (rev 
166)
@@ -564,15 +564,6 @@
 
 typedef struct {
   /**
-   * Number of results currently received (size of the
-   * results-array).
-   */
-  unsigned int count;
-  /**
-   * The peers that confirmed storing the record so far.
-   */
-  PeerIdentity * peers;
-  /**
    * RPC callback to call with the final result set.
    */
   Async_RPC_Complete_Callback callback;
@@ -598,10 +589,6 @@
 
 typedef struct {
   /**
-   * Number of results currently received.
-   */
-  unsigned int count;
-  /**
    * RPC callback to call with the final result set.
    */
   Async_RPC_Complete_Callback callback;
@@ -2323,13 +2310,14 @@
  * @param closure extra argument to callback
  * @return handle to stop the async put
  */
-static struct DHT_PUT_RECORD * dht_put_async_start(const DHT_TableId * table,
-                                                  const HashCode160 * key,
-                                                  unsigned int type, /* 
REMOVE! */
-                                                  cron_t timeout,
-                                                  const DataContainer * value,
-                                                  DHT_OP_Complete callback,
-                                                  void * closure) {
+static struct DHT_PUT_RECORD * 
+dht_put_async_start(const DHT_TableId * table,
+                   const HashCode160 * key,
+                   unsigned int type, /* REMOVE! */
+                   cron_t timeout,
+                   const DataContainer * value,
+                   DHT_OP_Complete callback,
+                   void * closure) {
   int i;
   LocalTableData * ltd;
   DHT_PUT_RECORD * ret;
@@ -2744,8 +2732,7 @@
  * @return SYSERR on error, OK on success
  */
 static int dht_join(Blockstore * datastore,
-                   const DHT_TableId * table,
-                   cron_t timeout) {
+                   const DHT_TableId * table) {
   int i;
 
   ENTER();
@@ -2985,8 +2972,6 @@
  */
 static void rpc_DHT_findValue_abort(RPC_DHT_FindValue_Context * fw) {
   RPC_Param * results;
-  int errorCode;
-  int i;
 
   ENTER();
   delAbortJob((CronJob) &rpc_DHT_findValue_abort,
@@ -3000,28 +2985,14 @@
   fw->get_record = NULL;
 
   /* build RPC reply, call RPC callback */ 
-  results = RPC_paramNew();
-  if (fw->count > 0) {
-    errorCode = RPC_ERROR_OK;
-    for (i=fw->count-1;i>=0;i--) {
-      RPC_paramAdd(results,
-                  "data",
-                  ntohl(fw->results[i]->size),
-                  fw->results[i]);
-      FREE(fw->results[i]);
-    }
-    GROW(fw->results,
-        fw->count,
-        0);
-  } else {
-    errorCode = RPC_ERROR_TIMEOUT;
-  }
-  addOptionalFields(results);
-  if (fw->callback != NULL)
+  if (fw->callback != NULL) {
+    results = RPC_paramNew();
+    addOptionalFields(results);
     fw->callback(results,
-                errorCode,
+                OK,
                 fw->rpc_context);
-  RPC_paramFree(results);
+    RPC_paramFree(results);
+  }
   fw->done = YES;
   MUTEX_UNLOCK(&fw->lock);
 }
@@ -3032,41 +3003,25 @@
  * been accumulated this will also stop the cron-job and trigger
  * sending the cummulative reply via RPC.
  */
-static void rpc_dht_findValue_callback(const DataContainer * value,
-                                      RPC_DHT_FindValue_Context * fw) {
+static int rpc_dht_findValue_callback(const HashCode160 * key,
+                                     const DataContainer * value,
+                                     RPC_DHT_FindValue_Context * fw) {
   ENTER();
   MUTEX_LOCK(&fw->lock);
   GROW(fw->results,
        fw->count,
        fw->count+1);
-  fw->results[fw->count-1].dataLength = value->dataLength;
-  fw->results[fw->count-1].data = MALLOC(value->dataLength);
-  memcpy(fw->results[fw->count-1].data,
-        value->data,
-        value->dataLength);
+  fw->results[fw->count-1] = MALLOC(ntohl(value->size));
+  memcpy(fw->results[fw->count-1],
+        value,
+        ntohl(value->size));
   MUTEX_UNLOCK(&fw->lock);
-  if (stop) {
-    /* don't wait for timeout, run now! */
-    advanceCronJob((CronJob) &rpc_DHT_findValue_abort,
-                  0,
-                  fw);
-  }
+  return OK;
 }
 
-static int addToFindValueResults(const HashCode160 * key,
-                                const DataContainer * data,
-                                RPC_DHT_FindValue_Context * ctx) {
-  MUTEX_LOCK(&ctx->lock);
-  GROW(ctx->results,
-       ctx->count,
-       ctx->count+1);
-  ctx->results[ctx->count-1]
-    = MALLOC(ntohl(data->size));
-  memcpy(ctx->results[ctx->count-1],
-        data,
-        ntohl(data->size));
-  MUTEX_UNLOCK(&ctx->lock);
-  return OK;
+static void rpc_dht_findValue_complete(RPC_DHT_FindValue_Context * ctx) {
+  /* FIXME! */
+  
 }
 
 /**
@@ -3091,6 +3046,7 @@
   unsigned long long * timeout;
   unsigned int * type;
   unsigned int keysLength;
+  unsigned int dataLength;
   RPC_DHT_FindValue_Context * fw_context;
   
   ENTER();
@@ -3138,13 +3094,15 @@
   fw_context->get_record 
     = dht_get_async_start(table,
                          ntohl(*type),
-                         keySize / sizeof(HashCode160),
+                         keysLength / sizeof(HashCode160),
                          keys,
                          ntohll(*timeout),
-                         NULL, /* FIXME: resultCallback required! */
+                         (DataProcessor) &rpc_dht_findValue_callback,
                          fw_context,
-                         &rpc_dht_findValue_callback,
+                         (DHT_OP_Complete) &rpc_dht_findValue_complete,
                          fw_context);
+  /* FIXME: manage abort properly, also fix
+     rpc_dht_findValue_complete! */
   addAbortJob((CronJob)&rpc_DHT_findValue_abort,
              fw_context);
   addCronJob((CronJob)&rpc_DHT_findValue_abort,
@@ -3163,8 +3121,6 @@
  */
 static void rpc_DHT_store_abort(RPC_DHT_store_Context * fw) {
   RPC_Param * results;
-  int errorCode;
-  int i;
 
   ENTER();
   delAbortJob((CronJob) &rpc_DHT_store_abort,
@@ -3178,23 +3134,14 @@
   fw->put_record = NULL;
 
   /* build RPC reply, call RPC callback */ 
-  results = RPC_paramNew();
-  if (fw->count > 0) {
-    errorCode = RPC_ERROR_OK;
-    for (i=fw->count-1;i>=0;i--) 
-      RPC_paramAdd(results,
-                  "peer",
-                  sizeof(PeerIdentity),
-                  &fw->peers[i]);
-  } else {
-    errorCode = RPC_ERROR_TIMEOUT;
-  }
-  addOptionalFields(results);
-  if (fw->callback != NULL)
+  if (fw->callback != NULL) {
+    results = RPC_paramNew();
+    addOptionalFields(results);
     fw->callback(results,
-                errorCode,
+                OK,
                 fw->rpc_context);
-  RPC_paramFree(results);
+    RPC_paramFree(results);
+  }
   fw->done = YES;
   MUTEX_UNLOCK(&fw->lock);
 }
@@ -3205,11 +3152,8 @@
  * the value, this will also stop the cron-job and trigger
  * sending the cummulative reply via RPC.
  */
-static void rpc_dht_store_callback(const PeerIdentity * store,
-                                  RPC_DHT_store_Context * fw) {
-  MUTEX_LOCK(&fw->lock);
-  fw->confirmed_stores++;
-  MUTEX_UNLOCK(&fw->lock);
+static void rpc_dht_store_callback(RPC_DHT_store_Context * fw) {
+  /* FIXME: shutdown coordination! */
 }
 
 static void rpc_DHT_store(const PeerIdentity * sender,
@@ -3242,10 +3186,8 @@
                                   &dataLength,
                                   (void**) &timeout)) ||
        (dataLength != sizeof(unsigned long long)) ||
-       (OK != RPC_paramValueByName(arguments,
-                                  "value",
-                                  &value.dataLength,
-                                  (void**) &value.data)) ) {
+       ((NULL == (value = RPC_paramDataContainerByName(arguments,
+                                                      "value")))) ) {
     LOG(LOG_WARNING,
        _("Received invalid RPC '%s'.\n"),
        "DHT_store");
@@ -3263,12 +3205,8 @@
        "DHT_store");
   }
   MUTEX_UNLOCK(lock);
-  fw_context->count
-    = 0;
   fw_context->done
     = NO;
-  fw_context->peers
-    = NULL;
   fw_context->callback
     = callback;
   fw_context->rpc_context
@@ -3276,16 +3214,20 @@
   fw_context->put_record 
     = dht_put_async_start(table,
                          key,
+                         0, /* FIXME: type */
                          ntohll(*timeout),
-                         &value,
-                         &rpc_dht_store_callback,
+                         value,
+                         (DHT_OP_Complete) &rpc_dht_store_callback,
                          fw_context);
+  /* FIXME: fix shutdown
+     (also fix rpc_dht_store_callback) */
   addAbortJob((CronJob)&rpc_DHT_store_abort,
              fw_context);
   addCronJob((CronJob)&rpc_DHT_store_abort,
             ntohll(*timeout),
             0,
             fw_context);
+  FREE(value);
 }
 
 /**
@@ -3298,8 +3240,6 @@
  */
 static void rpc_DHT_remove_abort(RPC_DHT_remove_Context * fw) {
   RPC_Param * results;
-  int errorCode;
-  int i;
 
   ENTER();
   delAbortJob((CronJob) &rpc_DHT_remove_abort,
@@ -3314,20 +3254,10 @@
 
   /* build RPC reply, call RPC callback */ 
   results = RPC_paramNew();
-  if (fw->count > 0) {
-    errorCode = RPC_ERROR_OK;
-    for (i=fw->count-1;i>=0;i--) 
-      RPC_paramAdd(results,
-                  "peer",
-                  sizeof(PeerIdentity),
-                  &fw->peers[i]);
-  } else {
-    errorCode = RPC_ERROR_TIMEOUT;
-  }
   addOptionalFields(results);
   if (fw->callback != NULL)
     fw->callback(results,
-                errorCode,
+                OK,
                 fw->rpc_context);
   RPC_paramFree(results);
   fw->done = YES;
@@ -3340,12 +3270,8 @@
  * number of replicas this will also stop the cron-job and trigger
  * sending the cummulative reply via RPC.
  */
-static void rpc_dht_remove_callback(const PeerIdentity * store,
-                                   RPC_DHT_remove_Context * fw) {
-  ENTER();
-  MUTEX_LOCK(&fw->lock);
-  fw->confirmed_stores++;
-  MUTEX_UNLOCK(&fw->lock);
+static void rpc_dht_remove_callback(RPC_DHT_remove_Context * fw) {
+  /* FIXME: shutdown sequence! */
 }
 
 /**
@@ -3395,12 +3321,8 @@
     return;
   }   
     
-  if (OK != RPC_paramValueByName(arguments,
-                                "value",
-                                &value.dataLength,
-                                (void**) &value.data))
-    value.dataLength = 0;
-
+  value = RPC_paramDataContainerByName(arguments,
+                                      "value");
   fw_context 
     = MALLOC(sizeof(RPC_DHT_remove_Context));
   MUTEX_CREATE_RECURSIVE(&fw_context->lock);
@@ -3412,12 +3334,8 @@
        "DHT_removed");
   }
   MUTEX_UNLOCK(lock);
-  fw_context->count
-    = 0;
   fw_context->done
     = NO;
-  fw_context->peers
-    = NULL;
   fw_context->callback
     = callback;
   fw_context->rpc_context
@@ -3425,16 +3343,19 @@
   fw_context->remove_record 
     = dht_remove_async_start(table,
                             key,
+                            0, /* FIXME, type */
                             ntohll(*timeout),
-                            (value.dataLength==0) ? NULL : &value,
-                            &rpc_dht_remove_callback,
+                            value,
+                            (DHT_OP_Complete) &rpc_dht_remove_callback,
                             fw_context);
+  /* FIXME: shutdown sequence! */
   addAbortJob((CronJob)&rpc_DHT_remove_abort,
              fw_context);
   addCronJob((CronJob)&rpc_DHT_remove_abort,
             ntohll(*timeout),
             0,
             fw_context);
+  FREE(value);
 }
 
 /**
@@ -3530,8 +3451,11 @@
 
   /* for all of our tables, do a PUT on the master table */
   request_param = vectorNew(4);
-  value.dataLength = sizeof(PeerIdentity);
-  value.data = coreAPI->myIdentity;
+  value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
+  value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
+  memcpy(&value[1],
+        coreAPI->myIdentity,
+        sizeof(PeerIdentity));
 #if DEBUG_DHT
   LOG(LOG_CRON,
       "'%s' issues DHT_PUTs to advertise tables this peer participates in.\n",
@@ -3553,15 +3477,16 @@
       putRecords[putRecordsSize-1] 
        = dht_put_async_start(&masterTableId,
                              &tables[i].id,
+                             0, /* FIXME: type */
                              DHT_MAINTAIN_BUCKET_FREQUENCY,
-                             &value,
-                             ALPHA,
+                             value,
                              NULL,
                              NULL);
       putTimes[putTimesSize-1] = now;    
     }
   }
   vectorFree(request_param);
+  FREE(value);
   
   /*
     for each table that we have joined gather OUR neighbours
@@ -3717,8 +3642,7 @@
   masterTableDatastore 
     = create_datastore_dht_master(i);
   dht_join(masterTableDatastore,
-          &masterTableId,
-          0);
+          &masterTableId);
   addCronJob(&dhtMaintainJob,
             0,
             DHT_MAINTAIN_FREQUENCY,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]