gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11355 - gnunet/src/datastore


From: gnunet
Subject: [GNUnet-SVN] r11355 - gnunet/src/datastore
Date: Wed, 12 May 2010 17:01:03 +0200

Author: grothoff
Date: 2010-05-12 17:01:03 +0200 (Wed, 12 May 2010)
New Revision: 11355

Modified:
   gnunet/src/datastore/datastore_api.c
   gnunet/src/datastore/plugin_datastore_sqlite.c
   gnunet/src/datastore/test_datastore_api.c
   gnunet/src/datastore/test_datastore_api_data.conf
Log:
adding support for request queueing to datastore API

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2010-05-12 14:30:04 UTC (rev 
11354)
+++ gnunet/src/datastore/datastore_api.c        2010-05-12 15:01:03 UTC (rev 
11355)
@@ -67,12 +67,12 @@
   /**
    * Function to call after transmission of the request.
    */
-  GNUNET_DATASTORE_ContinuationWithStatus cont;
+  GNUNET_DATASTORE_ContinuationWithStatus contX;
    
   /**
    * Closure for 'cont'.
    */
-  void *cont_cls;
+  void *cont_clsX;
 
   /**
    * Task for timeout signalling.
@@ -296,8 +296,6 @@
   GNUNET_CONTAINER_DLL_remove (h->queue_head,
                               h->queue_tail,
                               qe);
-  if (qe->cont != NULL)
-    qe->cont (qe->cont_cls, GNUNET_NO, _("timeout"));
   if (qe->response_proc != NULL)
     qe->response_proc (qe, NULL);
   GNUNET_free (qe);
@@ -314,8 +312,6 @@
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout timeout for the operation
- * @param cont continuation to call when done with request transmission (can 
be NULL)
- * @param cont_cls closure for cont
  * @param response_proc function to call with replies (can be NULL)
  * @param client_ctx client context (NOT a closure for response_proc)
  * @return NULL if the queue is full (and this entry was dropped)
@@ -326,8 +322,6 @@
                  unsigned int queue_priority,
                  unsigned int max_queue_size,
                  struct GNUNET_TIME_Relative timeout,
-                 GNUNET_DATASTORE_ContinuationWithStatus cont,
-                 void *cont_cls,
                  GNUNET_CLIENT_MessageHandler response_proc,            
                  void *client_ctx)
 {
@@ -368,8 +362,6 @@
   ret->h = h;
   ret->response_proc = response_proc;
   ret->client_ctx = client_ctx;
-  ret->cont = cont;
-  ret->cont_cls = cont_cls;  
   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
                                            timeout,
                                            &timeout_queue_entry,
@@ -391,8 +383,6 @@
                                       pos);
          GNUNET_SCHEDULER_cancel (h->sched,
                                   pos->task);
-         if (pos->cont != NULL)
-           pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full"));
          if (pos->response_proc != NULL)
            pos->response_proc (pos, NULL);
          GNUNET_free (pos);
@@ -494,26 +484,13 @@
     }
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
-  if (qe->cont != NULL)
-    qe->cont (qe->cont_cls, GNUNET_OK, NULL);
   GNUNET_SCHEDULER_cancel (h->sched,
                           qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
-  if (qe->response_proc == NULL)
-    {
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
-      process_queue (h);
-    }
-  else
-    {
-      GNUNET_CLIENT_receive (h->client,
-                            qe->response_proc,
-                            qe,
-                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
-    }  
+  GNUNET_CLIENT_receive (h->client,
+                        qe->response_proc,
+                        qe,
+                        GNUNET_TIME_absolute_get_remaining (qe->timeout));
   return msize;
 }
 
@@ -546,77 +523,8 @@
 }
 
 
-/**
- * Store an item in the datastore.  If the item is already present,
- * the priorities are summed up and the higher expiration time and
- * lower anonymity level is used.
- *
- * @param h handle to the datastore
- * @param rid reservation ID to use (from "reserve"); use 0 if no
- *            prior reservation was made
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param cont continuation to call when done
- * @param cont_cls closure for cont
- */
-void
-GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
-                     int rid,
-                      const GNUNET_HashCode * key,
-                      uint32_t size,
-                      const void *data,
-                      enum GNUNET_BLOCK_Type type,
-                      uint32_t priority,
-                      uint32_t anonymity,
-                      struct GNUNET_TIME_Absolute expiration,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                      struct GNUNET_TIME_Relative timeout,
-                     GNUNET_DATASTORE_ContinuationWithStatus cont,
-                     void *cont_cls)
-{
-  struct QueueEntry *qe;
-  struct DataMessage *dm;
-  size_t msize;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to put %u bytes of data under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
-#endif
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  qe = make_queue_entry (h, msize,
-                        queue_priority, max_queue_size, timeout,
-                        cont, cont_cls, NULL, NULL);
-  if (qe == NULL)
-    return;
-  dm = (struct DataMessage* ) &qe[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(rid);
-  dm->size = htonl(size);
-  dm->type = htonl(type);
-  dm->priority = htonl(priority);
-  dm->anonymity = htonl(anonymity);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(expiration);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
-  process_queue (h);
-}
 
-
 /**
  * Context for processing status messages.
  */
@@ -636,6 +544,20 @@
 
 
 /**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result 
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls, int result, const char *emsg)
+{
+  /* do nothing */
+}
+
+
+/**
  * Type of a function to call when we receive a message
  * from the service.
  *
@@ -711,6 +633,81 @@
 
 
 /**
+ * Store an item in the datastore.  If the item is already present,
+ * the priorities are summed up and the higher expiration time and
+ * lower anonymity level is used.
+ *
+ * @param h handle to the datastore
+ * @param rid reservation ID to use (from "reserve"); use 0 if no
+ *            prior reservation was made
+ * @param key key for the value
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout timeout for the operation
+ * @param cont continuation to call when done
+ * @param cont_cls closure for cont
+ */
+void
+GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
+                     int rid,
+                      const GNUNET_HashCode * key,
+                      uint32_t size,
+                      const void *data,
+                      enum GNUNET_BLOCK_Type type,
+                      uint32_t priority,
+                      uint32_t anonymity,
+                      struct GNUNET_TIME_Absolute expiration,
+                     unsigned int queue_priority,
+                     unsigned int max_queue_size,
+                      struct GNUNET_TIME_Relative timeout,
+                     GNUNET_DATASTORE_ContinuationWithStatus cont,
+                     void *cont_cls)
+{
+  struct StatusContext *scont;
+  struct QueueEntry *qe;
+  struct DataMessage *dm;
+  size_t msize;
+
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asked to put %u bytes of data under key `%s'\n",
+             size,
+             GNUNET_h2s (key));
+#endif
+  msize = sizeof(struct DataMessage) + size;
+  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  scont = GNUNET_malloc (sizeof (struct StatusContext));
+  scont->cont = cont;
+  scont->cont_cls = cont_cls;
+  qe = make_queue_entry (h, msize,
+                        queue_priority, max_queue_size, timeout,
+                        &process_status_message, scont);
+  if (qe == NULL)
+    return;
+  dm = (struct DataMessage* ) &qe[1];
+  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
+  dm->header.size = htons(msize);
+  dm->rid = htonl(rid);
+  dm->size = htonl(size);
+  dm->type = htonl(type);
+  dm->priority = htonl(priority);
+  dm->anonymity = htonl(anonymity);
+  dm->uid = GNUNET_htonll(0);
+  dm->expiration = GNUNET_TIME_absolute_hton(expiration);
+  dm->key = *key;
+  memcpy (&dm[1], data, size);
+  process_queue (h);
+}
+
+
+/**
  * Reserve space in the datastore.  This function should be used
  * to avoid "out of space" failures during a longer sequence of "put"
  * operations (for example, when a file is being inserted).
@@ -740,6 +737,8 @@
   struct ReserveMessage *rm;
   struct StatusContext *scont;
 
+  if (cont == NULL)
+    cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to reserve %llu bytes of data and %u entries'\n",
@@ -751,7 +750,7 @@
   scont->cont_cls = cont_cls;
   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_status_message, scont);
+                        &process_status_message, scont);
   if (qe == NULL)
     return;
   rm = (struct ReserveMessage*) &qe[1];
@@ -794,18 +793,19 @@
   struct ReleaseReserveMessage *rrm;
   struct StatusContext *scont;
 
+  if (cont == NULL)
+    cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to reserve %llu bytes of data and %u entries'\n",
-             (unsigned long long) amount,
-             (unsigned int) entries);
+             "Asked to release reserve %d\n",
+             rid);
 #endif
   scont = GNUNET_malloc (sizeof (struct StatusContext));
   scont->cont = cont;
   scont->cont_cls = cont_cls;
   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_status_message, scont);
+                        &process_status_message, scont);
   if (qe == NULL)
     return;
   rrm = (struct ReleaseReserveMessage*) &qe[1];
@@ -845,6 +845,8 @@
   struct UpdateMessage *um;
   struct StatusContext *scont;
 
+  if (cont == NULL)
+    cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to update entry %llu raising priority by %u and expiration 
to %llu\n",
@@ -857,7 +859,7 @@
   scont->cont_cls = cont_cls;
   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_status_message, scont);
+                        &process_status_message, scont);
   if (qe == NULL)
     return;
   um = (struct UpdateMessage*) &qe[1];
@@ -904,6 +906,8 @@
   size_t msize;
   struct StatusContext *scont;
 
+  if (cont == NULL)
+    cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to remove %u bytes under key `%s'\n",
@@ -917,7 +921,7 @@
   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qe = make_queue_entry (h, msize,
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_status_message, scont);
+                        &process_status_message, scont);
   if (qe == NULL)
     return;
   dm = (struct DataMessage*) &qe[1];
@@ -952,13 +956,6 @@
    */
   void *iter_cls;
 
-  /**
-   * Automatically get the next result, or wait for a call to
-   * GNUNET_DATASTORE_get_next?  GNUNET_YES means we automatically
-   * get the next one (if there are more).
-   */
-  int get_next;
-
 };
 
 
@@ -1048,11 +1045,6 @@
            ntohl(dm->anonymity),
            GNUNET_TIME_absolute_ntoh(dm->expiration),  
            GNUNET_ntohll(dm->uid));
-  if (rc->get_next == GNUNET_YES)
-    GNUNET_CLIENT_receive (h->client,
-                          qe->response_proc,
-                          qe,
-                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
 }
 
 
@@ -1089,10 +1081,9 @@
   rcont = GNUNET_malloc (sizeof (struct ResultContext));
   rcont->iter = iter;
   rcont->iter_cls = iter_cls;
-  rcont->get_next = GNUNET_YES;
   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_result_message, rcont);
+                        &process_result_message, rcont);
   if (qe == NULL)
     return;
   m = (struct GNUNET_MessageHeader*) &qe[1];
@@ -1144,10 +1135,9 @@
   rcont = GNUNET_malloc (sizeof (struct ResultContext));
   rcont->iter = iter;
   rcont->iter_cls = iter_cls;
-  rcont->get_next = GNUNET_NO;
   qe = make_queue_entry (h, sizeof(struct GetMessage),
                         queue_priority, max_queue_size, timeout,
-                        NULL, NULL, &process_result_message, rcont);
+                        &process_result_message, rcont);
   if (qe == NULL)
     return;
   gm = (struct GetMessage*) &qe[1];
@@ -1183,7 +1173,6 @@
 
   GNUNET_assert (NULL != qe);
   GNUNET_assert (&process_result_message == qe->response_proc);
-  GNUNET_assert (rc->get_next == GNUNET_NO);
   if (GNUNET_YES == more)
     {     
       GNUNET_CLIENT_receive (h->client,

Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c      2010-05-12 14:30:04 UTC 
(rev 11354)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c      2010-05-12 15:01:03 UTC 
(rev 11355)
@@ -258,11 +258,21 @@
                       "datastore-sqlite");
       return GNUNET_SYSERR;
     }
-  if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+  if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
     {
-      GNUNET_break (0);
-      GNUNET_free (afsdir);
-      return GNUNET_SYSERR;
+      if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
+       {
+         GNUNET_break (0);
+         GNUNET_free (afsdir);
+         return GNUNET_SYSERR;
+       }
+      /* database is new or got deleted, reset payload to zero! */
+      if (plugin->stat_get != NULL)
+       {
+         GNUNET_STATISTICS_get_cancel (plugin->stat_get);
+         plugin->stat_get = NULL;
+       }
+      plugin->payload = 0;
     }
   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
 #ifdef ENABLE_NLS
@@ -779,6 +789,9 @@
       LOG_SQLITE (plugin, msg,
                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_step");
       sqlite3_reset (stmt);
+      database_shutdown (plugin);
+      database_setup (plugin->env->cfg,
+                     plugin);
       return GNUNET_SYSERR;
     }
   if (SQLITE_OK != sqlite3_reset (stmt))

Modified: gnunet/src/datastore/test_datastore_api.c
===================================================================
--- gnunet/src/datastore/test_datastore_api.c   2010-05-12 14:30:04 UTC (rev 
11354)
+++ gnunet/src/datastore/test_datastore_api.c   2010-05-12 15:01:03 UTC (rev 
11355)
@@ -109,7 +109,8 @@
     RP_GET_MULTIPLE_DONE,
     RP_UPDATE,
     RP_UPDATE_VALIDATE,
-    RP_UPDATE_DONE
+    RP_UPDATE_DONE,
+    RP_ERROR
   };
 
 
@@ -139,9 +140,13 @@
 {
   struct CpsRunContext *crc = cls;
   if (GNUNET_OK != success)
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-               "%s\n", msg);
-  GNUNET_assert (GNUNET_OK == success);
+    {
+      ok = 42;
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "%s\n", msg);
+      GNUNET_SCHEDULER_shutdown (crc->sched);
+      return;
+    }
   GNUNET_free_non_null (crc->data);
   crc->data = NULL;
   GNUNET_SCHEDULER_add_continuation (crc->sched,
@@ -221,7 +226,16 @@
   struct CpsRunContext *crc = cls;
   if (key == NULL)
     {
-      crc->phase = RP_DO_DEL;
+      if (crc->data == NULL)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                     "Content not found!\n");
+         crc->phase = RP_ERROR;
+       }
+      else
+       {
+         crc->phase = RP_DO_DEL;
+       }
       GNUNET_SCHEDULER_add_continuation (crc->sched,
                                         &run_continuation,
                                         crc,
@@ -386,6 +400,7 @@
                  "DEL",
                  crc->i);
 #endif
+      crc->data = NULL;
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
       GNUNET_DATASTORE_get (datastore, 
                            &crc->key,
@@ -515,6 +530,12 @@
       GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES);
       GNUNET_free (crc);
       ok = 0;
+      break;
+    case RP_ERROR:
+      GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES);
+      GNUNET_free (crc);
+      ok = 43;
+      break;
     }
 }
 

Modified: gnunet/src/datastore/test_datastore_api_data.conf
===================================================================
--- gnunet/src/datastore/test_datastore_api_data.conf   2010-05-12 14:30:04 UTC 
(rev 11354)
+++ gnunet/src/datastore/test_datastore_api_data.conf   2010-05-12 15:01:03 UTC 
(rev 11355)
@@ -30,6 +30,7 @@
 # REJECT_FROM =
 # REJECT_FROM6 =
 # PREFIX =
+# DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-datastore
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]