gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r160 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r160 - GNUnet/src/applications/dht/module
Date: Wed, 2 Feb 2005 04:33:56 -0800 (PST)

Author: grothoff
Date: 2005-02-02 04:33:55 -0800 (Wed, 02 Feb 2005)
New Revision: 160

Modified:
   GNUnet/src/applications/dht/module/cs.c
Log:
making cs.c compile

Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c     2005-02-02 11:40:23 UTC (rev 
159)
+++ GNUnet/src/applications/dht/module/cs.c     2005-02-02 12:33:55 UTC (rev 
160)
@@ -104,16 +104,14 @@
   ClientHandle client;
   struct DHT_PUT_RECORD * put_record;
   DHT_TableId table;
-  unsigned int replicas;
-  unsigned int maxReplicas;
+  unsigned int replicas; /* confirmed puts? */
 } CS_PUT_RECORD;
 
 typedef struct {
   ClientHandle client;
   struct DHT_REMOVE_RECORD * remove_record;
   DHT_TableId table;
-  unsigned int replicas;
-  unsigned int maxReplicas;
+  unsigned int replicas; /* confirmed dels? */
 } CS_REMOVE_RECORD;
 
 typedef struct {
@@ -121,8 +119,6 @@
   struct DHT_GET_RECORD * get_record;
   DHT_TableId table;
   unsigned int count;
-  unsigned int replyCount;
-  DataContainer ** replies;
 } CS_GET_RECORD;
 
 static CS_GET_RECORD ** getRecords;
@@ -165,6 +161,7 @@
  */
 static int tcp_get(void * closure,
                   unsigned int type,
+                  unsigned int prio,
                   unsigned int keyCount,
                   const HashCode160 * keys,
                   DataProcessor resultCallback,
@@ -193,6 +190,7 @@
   req->header.size = htons(size);
   req->header.type = htons(DHT_CS_PROTO_REQUEST_GET);
   req->type = htonl(type);
+  req->priority = htonl(prio);
   req->table = handlers->table;
   memcpy(&req->keys,
         keys,
@@ -216,12 +214,14 @@
  *
  * @param key the key of the item
  * @param value the value to store
+ * @param prio the priority for the store
  * @return OK if the value could be stored, SYSERR if not (i.e. out of space)
  */
 static int tcp_put(void * closure,
                   const HashCode160 * key,
                   unsigned int type,
-                  const DataContainer * value) {
+                  const DataContainer * value,
+                  unsigned int prio) {
   DHT_CS_REQUEST_PUT * req;
   CS_TableHandlers * handlers = closure;
   int ret;
@@ -236,6 +236,7 @@
   req->table = handlers->table;
   req->key = *key;
   req->timeout = htonl(0);
+  req->priority = htonl(prio);
   memcpy(&req[1],
         value,
         ntohl(value->size));
@@ -375,8 +376,7 @@
   ptr->prereply   = SEMAPHORE_NEW(0);
   ptr->postreply  = SEMAPHORE_NEW(0);
   ret = dhtAPI->join(ptr->store,
-                    &req->table,
-                    ntohl(req->timeout));
+                    &req->table);
   if (ret == OK) {
     GROW(csHandlers,
         csHandlersCount,
@@ -461,7 +461,8 @@
                    &record->table,
                    record->replicas)) {
     LOG(LOG_FAILURE,
-       _("sendAck failed.  Terminating connection to client.\n"));
+       _("'%s' failed.  Terminating connection to client.\n"),
+       "sendAck");
     coreAPI->terminateClientConnection(record->client);
   }    
   for (i=putRecordsSize-1;i>=0;i--)
@@ -481,18 +482,13 @@
  */
 static void cs_put_complete_callback(const PeerIdentity * store,
                                     CS_PUT_RECORD * record) {
-  int mark = 0;
   MUTEX_LOCK(&csLock);
   record->replicas++;
-  if (record->replicas == record->maxReplicas) 
-    mark = 1;
   MUTEX_UNLOCK(&csLock);
-  if (mark == 1) {
-    /* trigger cron-job early if replication count reached. */
-    advanceCronJob((CronJob) &cs_put_abort,
-                  0,
-                  record);
-  }
+  /* trigger cron-job early if replication confirmed. */
+  advanceCronJob((CronJob) &cs_put_abort,
+                0,
+                record);  
 }
 
 struct CSPutClosure {
@@ -508,21 +504,28 @@
   DHT_CS_REQUEST_PUT * req;
   DataContainer * data;
   CS_PUT_RECORD * ptr;
+  unsigned int size;
 
   req = cpc->message;
   client = cpc->client;
   FREE(cpc);
-  cont.dataLength = ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_PUT);
-  if (cont.dataLength == 0)
-    cont.data = NULL;
-  else
-    cont.data = &((DHT_CS_REQUEST_PUT_GENERIC*)req)->value[0];
-
+  size = ntohs(req->header.size) 
+    - sizeof(DHT_CS_REQUEST_PUT)
+    + sizeof(DataContainer);
+  GNUNET_ASSERT(size < 0xFFFF);
+  if (size == 0) {
+    data = NULL;
+  } else {
+    data = MALLOC(size);
+    data->size = htonl(size);
+    memcpy(&data[1],
+          &req[1],
+          size - sizeof(DataContainer));
+  }
   ptr = MALLOC(sizeof(CS_PUT_RECORD));
   ptr->client = client;
   ptr->replicas = 0;
   ptr->table = req->table;
-  ptr->maxReplicas = 7;
   ptr->put_record = NULL;
 
   MUTEX_LOCK(&csLock);
@@ -537,11 +540,12 @@
   MUTEX_UNLOCK(&csLock);
   ptr->put_record = dhtAPI->put_start(&req->table,
                                      &req->key,
+                                     ntohl(req->type),
                                      ntohll(req->timeout),
-                                     &cont,
-                                     ptr->maxReplicas,
-                                     (DHT_PUT_Complete) 
&cs_put_complete_callback,
+                                     data,
+                                     (DHT_OP_Complete) 
&cs_put_complete_callback,
                                      ptr);
+  FREE(data);
   FREE(req);
 }
 
@@ -597,18 +601,13 @@
  */
 static void cs_remove_complete_callback(const PeerIdentity * store,
                                        CS_REMOVE_RECORD * record) {
-  int mark = 0;
   MUTEX_LOCK(&csLock);
   record->replicas++;
-  if (record->replicas == record->maxReplicas)
-    mark = 1;
   MUTEX_UNLOCK(&csLock);
-  if (mark == 1) {
-    /* trigger cron-job early if replication count reached. */
-    advanceCronJob((CronJob) &cs_remove_abort,
-                  0,
-                  record);
-  }
+  /* trigger cron-job early if remove confirmed. */
+  advanceCronJob((CronJob) &cs_remove_abort,
+                0,
+                record);
 }
 
 struct CSRemoveClosure {
@@ -621,24 +620,31 @@
  */
 static void csRemoveJob(struct CSRemoveClosure * cpc) {
   DHT_CS_REQUEST_REMOVE * req;
-  DataContainer cont;
+  DataContainer * data;
   CS_REMOVE_RECORD * ptr;
   ClientHandle client;
+  unsigned int size;
 
   req = cpc->message;
   client = cpc->client;
   FREE(cpc);
-  cont.dataLength = ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_REMOVE);
-  if (cont.dataLength == 0)
-    cont.data = NULL;
-  else
-    cont.data = &((DHT_CS_REQUEST_REMOVE_GENERIC*)req)->value[0];
-  
+  size = ntohs(req->header.size) 
+    - sizeof(DHT_CS_REQUEST_REMOVE)
+    + sizeof(DataContainer);
+  GNUNET_ASSERT(size < 0xFFFF);
+  if (size == 0) {
+    data = NULL;
+  } else {
+    data = MALLOC(size);
+    data->size = htonl(size);
+    memcpy(&data[1],
+          &req[1],
+          size - sizeof(DataContainer));
+  }
   ptr = MALLOC(sizeof(CS_REMOVE_RECORD));
   ptr->client = client;
   ptr->replicas = 0;
   ptr->table = req->table;
-  ptr->maxReplicas = 7;
   ptr->remove_record = NULL;
   addCronJob((CronJob) &cs_remove_abort,
             ntohll(req->timeout),
@@ -652,12 +658,13 @@
   MUTEX_UNLOCK(&csLock);
   ptr->remove_record = dhtAPI->remove_start(&req->table,
                                            &req->key,
+                                           ntohl(req->type),
                                            ntohll(req->timeout),
-                                           &cont,
-                                           ptr->maxReplicas,
-                                           (DHT_REMOVE_Complete) 
&cs_remove_complete_callback,
+                                           data,
+                                           (DHT_OP_Complete) 
&cs_remove_complete_callback,
                                            ptr);
   FREE(req);
+  FREE(data);
 }
 
 /**
@@ -683,51 +690,61 @@
 }
 
 
-static void cs_get_abort(CS_GET_RECORD * record) {
-  int i;
+
+static int cs_get_result_callback(const HashCode160 * key,
+                                 const DataContainer * value,
+                                 CS_GET_RECORD * record) {
   DHT_CS_REPLY_RESULTS * msg;
   size_t n;
+  
+  n = sizeof(DHT_CS_REPLY_RESULTS) + ntohl(value->size);
+  msg = MALLOC(n);
+  msg->key = *key;
+  memcpy(&msg[1],
+        value,
+        ntohl(value->size));
+  LOG(LOG_DEBUG,
+      "'%s' processes reply '%.*s'\n",
+      __FUNCTION__,
+      ntohl(value->size) - sizeof(DataContainer),
+      &value[1]);
+  msg->table = record->table;
+  msg->header.size = htons(n);
+  msg->header.type = htons(DHT_CS_PROTO_REPLY_GET);
+  if (OK != coreAPI->sendToClient(record->client,
+                                 &msg->header)) {
+    LOG(LOG_FAILURE,
+       _("'%s' failed. Terminating connection to client.\n"),
+       "sendToClient");
+    coreAPI->terminateClientConnection(record->client);
+  }
+  FREE(msg);
+  return OK;
+}
+                                  
+static void cs_get_abort(CS_GET_RECORD * record) {
+  int i;
 
   dhtAPI->get_stop(record->get_record);
-  for (i=0;i<record->count;i++) {
-    n = sizeof(DHT_CS_REPLY_RESULTS) + record->replies[i].dataLength;
-    msg = MALLOC(n);
-    memcpy(&((DHT_CS_REPLY_RESULTS_GENERIC*)msg)->data[0],
-          record->replies[i].data,
-          record->replies[i].dataLength);
-    LOG(LOG_DEBUG,
-       "'%s' processes reply '%.*s'\n",
-       __FUNCTION__,
-       record->replies[i].dataLength,
-       record->replies[i].data);
-    FREENONNULL(record->replies[i].data);
-    msg->totalResults = htonl(record->count);
-    msg->table = record->table;
-    msg->header.size = htons(n);
-    msg->header.type = htons(DHT_CS_PROTO_REPLY_GET);
-    if (OK != coreAPI->sendToClient(record->client,
-                                   &msg->header)) {
+  if (record->count == 0) {
+    if (OK != sendAck(record->client,
+                     &record->table,
+                     SYSERR)) {
       LOG(LOG_FAILURE,
          _("'%s' failed. Terminating connection to client.\n"),
-         "sendToClient");
+         "sendAck");
       coreAPI->terminateClientConnection(record->client);
-    }
-  }
-  GROW(record->replies,
-       record->count,
-       0);
-  if (record->count == 0) {
+    }   
+  } else {
     if (OK != sendAck(record->client,
                      &record->table,
-                     SYSERR)) {
+                     record->count)) {
       LOG(LOG_FAILURE,
          _("'%s' failed. Terminating connection to client.\n"),
          "sendAck");
       coreAPI->terminateClientConnection(record->client);
     }   
   }
-
-
   MUTEX_LOCK(&csLock);
   for (i=getRecordsSize-1;i>=0;i--)
     if (getRecords[i] == record) {
@@ -742,37 +759,13 @@
 }
 
 /**
- * Notification: peer 'store' agreed to store data.
+ * Notification: peer 'get' operation complete (or timeout)
  */
-static void cs_get_complete_callback(const DataContainer * value,
+static void cs_get_complete_callback(const PeerIdentity * peer,
                                     CS_GET_RECORD * record) {
-  DataContainer * copy;
-  int mark = 0;
-
-  LOG(LOG_EVERYTHING,
-      "'%s' called with result '%.*s'!\n",
-      __FUNCTION__,
-      value->dataLength,
-      value->data);
-  MUTEX_LOCK(&csLock);
-  GROW(record->replies,
-       record->count,
-       record->count+1);
-  copy = &record->replies[record->count-1];
-  copy->dataLength = value->dataLength;
-  copy->data = MALLOC(copy->dataLength);
-  memcpy(copy->data,
-        value->data,
-        copy->dataLength);
-  if (record->count == record->maxReplies)
-    mark = 1;
-  MUTEX_UNLOCK(&csLock);
-  if (mark == 1) {
-    /* trigger cron-job early if maxResult count reached. */
-    advanceCronJob((CronJob) &cs_get_abort,
-                  0,
-                  record);
-  }
+  advanceCronJob((CronJob) &cs_get_abort,
+                0,
+                record);  
 }
 
 struct CSGetClosure {
@@ -787,15 +780,16 @@
   DHT_CS_REQUEST_GET * req;
   CS_GET_RECORD * ptr;
   ClientHandle client;
+  unsigned int keyCount;
 
   client = cpc->client;
   req = cpc->message;
   FREE(cpc);
   
+  keyCount = 1 + ((ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_GET)) / 
sizeof(HashCode160));
   ptr = MALLOC(sizeof(CS_GET_RECORD));
   ptr->client = client;
   ptr->count = 0;
-  ptr->maxReplies = 7;
   ptr->table = req->table;
   ptr->get_record = NULL;
 
@@ -810,10 +804,13 @@
   getRecords[getRecordsSize-1] = ptr;
   MUTEX_UNLOCK(&csLock);
   ptr->get_record = dhtAPI->get_start(&req->table,
-                                     &req->key,
+                                     ntohl(req->type),
+                                     keyCount,
+                                     &req->keys,
                                      ntohll(req->timeout),
-                                     ptr->maxReplies,
-                                     (DHT_GET_Complete) 
&cs_get_complete_callback,
+                                     (DataProcessor) &cs_get_result_callback,
+                                     ptr,
+                                     (DHT_OP_Complete) 
&cs_get_complete_callback,
                                      ptr);
   return OK;
 }
@@ -856,7 +853,8 @@
     return SYSERR;
   req =(DHT_CS_REPLY_ACK*) message;
   LOG(LOG_EVERYTHING,
-      "ACK received from client.\n");
+      "'%s' received from client.\n",
+      "DHT_CS_REPLY_ACK");
   MUTEX_LOCK(&csLock);
   for (i=0;i<csHandlersCount;i++) {
     if ( (csHandlers[i]->handler == client) &&
@@ -872,32 +870,36 @@
   }
   MUTEX_UNLOCK(&csLock);
   LOG(LOG_ERROR,
-      _("Failed to deliver csACK signal.\n"));
+      _("Failed to deliver '%s' message.\n"),
+      "DHT_CS_REPLY_ACK");
   return SYSERR; /* failed to signal */
 }
 
 /**
  * CS handler for results.  Finds the appropriate record
- * and appends the new result.  If all results have been
+ * and passes on the new result.  If all results have been
  * collected, signals using the semaphore.
  */
 static int csResults(ClientHandle client,
                     const CS_HEADER * message) {
   DHT_CS_REPLY_RESULTS * req;
   CS_TableHandlers * ptr;
-  unsigned int tot;
   unsigned int dataLength;
-  DataContainer * cont;
   int i;
 
-  if (ntohs(message->size) < sizeof(DHT_CS_REPLY_RESULTS))
+  if (ntohs(message->size) < sizeof(DHT_CS_REPLY_RESULTS)) {
+    BREAK();
     return SYSERR;
+  }
   req = (DHT_CS_REPLY_RESULTS*) message;
-  tot = ntohl(req->totalResults);
   dataLength = ntohs(message->size) - sizeof(DHT_CS_REPLY_RESULTS);
+  if (dataLength != ntohl(req->data.size)) {
+    BREAK();
+    return SYSERR;
+  }
   LOG(LOG_EVERYTHING,
-      "%d RESULTS received from client.\n",
-      tot);  
+      "'%s' received from client.\n",
+      "DHT_CS_REPLY_RESULTS");  
   MUTEX_LOCK(&csLock);
   for (i=0;i<csHandlersCount;i++) {
     if ( (csHandlers[i]->handler == client) &&
@@ -905,32 +907,24 @@
                            &req->table)) ) {     
       ptr = csHandlers[i];
       SEMAPHORE_DOWN(ptr->postreply);
-      if ( (ptr->status == ptr->maxResults) ||
-          (tot > ptr->maxResults) ) {
-       MUTEX_UNLOCK(&csLock);
-       LOG(LOG_ERROR,
-           _("Received more results than allowed!\n"));
-       return SYSERR;
-      }
       LOG(LOG_EVERYTHING,
          "'%s' received result '%.*s'!\n",
          __FUNCTION__,
-         dataLength,
-         &((DHT_CS_REPLY_RESULTS_GENERIC*)req)->data[0]);
+         dataLength - sizeof(DataContainer),
+         &(&req->data)[1]);
       
-      ptr->resultCallback(data,
+      ptr->resultCallback(&req->key,
+                         &req->data,                     
                          ptr->resultCallbackClosure);
       ptr->status++;
-      if (ptr->status == tot)
-       SEMAPHORE_UP(ptr->prereply); /* all replies received, signal! */
       MUTEX_UNLOCK(&csLock);
       return OK;
     }
   }
   MUTEX_UNLOCK(&csLock);
   LOG(LOG_ERROR,
-      _("Failed to deliver '%s' content.\n"),
-      "CS_REPLY_GET");
+      _("Failed to deliver '%s' message.\n"),
+      "DHT_CS_REPLY_RESULTS");
   return SYSERR; /* failed to deliver */ 
 }
 
@@ -940,7 +934,6 @@
  */
 static void csClientExit(ClientHandle client) {
   int i;
-  int j;
   CS_GET_RECORD * gr;
   CS_PUT_RECORD * pr;
   CS_REMOVE_RECORD * rr;
@@ -973,11 +966,6 @@
                 0,
                 gr);
       dhtAPI->get_stop(gr->get_record);
-      for (j=0;j<gr->count;j++)
-       FREENONNULL(gr->replies[j].data);
-      GROW(gr->replies,
-          gr->count,
-          0);      
       getRecords[i] = getRecords[getRecordsSize-1];
       GROW(getRecords,
           getRecordsSize,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]