gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r35419 - in gnunet/src: datastore include


From: gnunet
Subject: [GNUnet-SVN] r35419 - in gnunet/src: datastore include
Date: Sat, 21 Mar 2015 04:38:29 +0100

Author: amatus
Date: 2015-03-21 04:38:29 +0100 (Sat, 21 Mar 2015)
New Revision: 35419

Modified:
   gnunet/src/datastore/gnunet-service-datastore.c
   gnunet/src/datastore/perf_plugin_datastore.c
   gnunet/src/datastore/plugin_datastore_heap.c
   gnunet/src/datastore/plugin_datastore_mysql.c
   gnunet/src/datastore/plugin_datastore_postgres.c
   gnunet/src/datastore/plugin_datastore_sqlite.c
   gnunet/src/datastore/plugin_datastore_template.c
   gnunet/src/datastore/test_plugin_datastore.c
   gnunet/src/include/gnunet_datastore_plugin.h
Log:
Convert datastore plugin API to asynchronous


Modified: gnunet/src/datastore/gnunet-service-datastore.c
===================================================================
--- gnunet/src/datastore/gnunet-service-datastore.c     2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/gnunet-service-datastore.c     2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -829,38 +829,24 @@
 };
 
 
-/**
- * Actually put the data message.
- *
- * @param client sender of the message
- * @param dm message with the data to store
- */
 static void
-execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
+put_continuation (void *cls, const struct GNUNET_HashCode *key, uint32_t size,
+                  int status, char *msg)
 {
-  uint32_t size;
-  char *msg;
-  int ret;
+  struct GNUNET_SERVER_Client *client = cls;
 
-  size = ntohl (dm->size);
-  msg = NULL;
-  ret =
-      plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
-                        ntohl (dm->type), ntohl (dm->priority),
-                        ntohl (dm->anonymity), ntohl (dm->replication),
-                        GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
-  if (GNUNET_OK == ret)
+  if (GNUNET_OK == status)
   {
     GNUNET_STATISTICS_update (stats,
                               gettext_noop ("# bytes stored"), size,
                               GNUNET_YES);
-    GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
+    GNUNET_CONTAINER_bloomfilter_add (filter, key);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Successfully stored %u bytes of type %u under key `%s'\n",
-                size, ntohl (dm->type), GNUNET_h2s (&dm->key));
+                "Successfully stored %u bytes under key `%s'\n",
+                size, GNUNET_h2s (key));
   }
-  transmit_status (client, ret, msg);
-  GNUNET_free_non_null (msg);
+  transmit_status (client, status, msg);
+  GNUNET_SERVER_client_drop (client);
   if (quota - reserved - cache_size < payload)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -872,7 +858,34 @@
   }
 }
 
+/**
+ * Actually put the data message.
+ *
+ * @param client sender of the message
+ * @param dm message with the data to store
+ */
+static void
+execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
+{
+  GNUNET_SERVER_client_keep (client);
+  plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1],
+                    ntohl (dm->type), ntohl (dm->priority),
+                    ntohl (dm->anonymity), ntohl (dm->replication),
+                    GNUNET_TIME_absolute_ntoh (dm->expiration),
+                    &put_continuation, client);
+}
 
+
+static void
+check_present_continuation (void *cls, int status, char *msg)
+{
+  struct GNUNET_SERVER_Client *client = cls;
+
+  transmit_status (client, GNUNET_NO, NULL);
+  GNUNET_SERVER_client_drop (client);
+}
+
+
 /**
  * Function that will check if the given datastore entry
  * matches the put and if none match executes the put.
@@ -921,9 +934,13 @@
          expiration.abs_value_us))
       plugin->api->update (plugin->api->cls, uid,
                            (int32_t) ntohl (dm->priority),
-                           GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
-    transmit_status (pc->client, GNUNET_NO, NULL);
-    GNUNET_SERVER_client_drop (pc->client);
+                           GNUNET_TIME_absolute_ntoh (dm->expiration),
+                           check_present_continuation, pc->client);
+    else
+    {
+      transmit_status (pc->client, GNUNET_NO, NULL);
+      GNUNET_SERVER_client_drop (pc->client);
+    }
     GNUNET_free (pc);
   }
   else
@@ -1051,6 +1068,16 @@
 }
 
 
+static void
+update_continuation (void *cls, int status, char *msg)
+{
+  struct GNUNET_SERVER_Client *client = cls;
+
+  transmit_status (client, status, msg);
+  GNUNET_SERVER_client_drop (client);
+}
+
+
 /**
  * Handle UPDATE-message.
  *
@@ -1063,21 +1090,17 @@
                const struct GNUNET_MessageHeader *message)
 {
   const struct UpdateMessage *msg;
-  int ret;
-  char *emsg;
 
   GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
                             1, GNUNET_NO);
   msg = (const struct UpdateMessage *) message;
-  emsg = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
-  ret =
-      plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
-                           (int32_t) ntohl (msg->priority),
-                           GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
-  transmit_status (client, ret, emsg);
-  GNUNET_free_non_null (emsg);
+  GNUNET_SERVER_client_keep (client);
+  plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
+                       (int32_t) ntohl (msg->priority),
+                       GNUNET_TIME_absolute_ntoh (msg->expiration),
+                       update_continuation, client);
 }
 
 
@@ -1336,6 +1359,29 @@
 }
 
 
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+  {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
+   sizeof (struct ReserveMessage)},
+  {&handle_release_reserve, NULL,
+   GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
+   sizeof (struct ReleaseReserveMessage)},
+  {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
+  {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
+   sizeof (struct UpdateMessage)},
+  {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
+  {&handle_get_replication, NULL,
+   GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
+   sizeof (struct GNUNET_MessageHeader)},
+  {&handle_get_zero_anonymity, NULL,
+   GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
+   sizeof (struct GetZeroAnonymityMessage)},
+  {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
+  {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
+   sizeof (struct GNUNET_MessageHeader)},
+  {NULL, NULL, 0, 0}
+};
+
+
 /**
  * Adds a given @a key to the bloomfilter in @a cls @a count times.
  *
@@ -1350,6 +1396,19 @@
 {
   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
 
+  if (NULL == key)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               _("Bloomfilter construction complete.\n"));
+    GNUNET_SERVER_add_handlers (server, handlers);
+    GNUNET_SERVER_resume (server);
+    expired_kill_task
+      = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                            &delete_expired,
+                                            NULL);
+    return;
+  }
+
   while (0 < count--)
     GNUNET_CONTAINER_bloomfilter_add (bf, key);
 }
@@ -1365,27 +1424,6 @@
 static void
 process_stat_done (void *cls, int success)
 {
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
-     sizeof (struct ReserveMessage)},
-    {&handle_release_reserve, NULL,
-     GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
-     sizeof (struct ReleaseReserveMessage)},
-    {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
-    {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
-     sizeof (struct UpdateMessage)},
-    {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
-    {&handle_get_replication, NULL,
-     GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
-     sizeof (struct GNUNET_MessageHeader)},
-    {&handle_get_zero_anonymity, NULL,
-     GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
-     sizeof (struct GetZeroAnonymityMessage)},
-    {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
-    {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
-     sizeof (struct GNUNET_MessageHeader)},
-    {NULL, NULL, 0, 0}
-  };
 
   stat_get = NULL;
   plugin = load_plugin ();
@@ -1411,9 +1449,12 @@
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                _("Rebuilding bloomfilter.  Please be patient.\n"));
     if (NULL != plugin->api->get_keys)
+    {
       plugin->api->get_keys (plugin->api->cls,
                              &add_key_to_bloomfilter,
                              filter);
+      return;
+    }
     else
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  _("Plugin does not support get_keys function. Please 
fix!\n"));

Modified: gnunet/src/datastore/perf_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/perf_plugin_datastore.c        2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/perf_plugin_datastore.c        2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -99,15 +99,60 @@
 
 
 static void
-putValue (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
+test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+static void
+put_continuation (void *cls, const struct GNUNET_HashCode *key,
+                  uint32_t size, int status, char *msg)
 {
+  struct CpsRunContext *crc = cls;
+
+  if (GNUNET_OK != status)
+  {
+    FPRINTF (stderr, "ERROR: `%s'\n", msg);
+  }
+  else
+  {
+    stored_bytes += size;
+    stored_ops++;
+    stored_entries++;
+  }
+  GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+static void
+do_put (struct CpsRunContext *crc)
+{
   char value[65536];
   size_t size;
   static struct GNUNET_HashCode key;
-  static int ic;
-  char *msg;
+  static int i;
   unsigned int prio;
 
+  if (0 == i)
+    crc->start = GNUNET_TIME_absolute_get ();
+  if (PUT_10 == i)
+  {
+    i = 0;
+    crc->end = GNUNET_TIME_absolute_get ();
+    {
+      printf ("%s took %s for %llu items\n", "Storing an item",
+             GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_difference (crc->start,
+                                                                               
           crc->end),
+                                                     GNUNET_YES),
+              PUT_10);
+      if (PUT_10 > 0)
+        GAUGER (category, "Storing an item",
+                (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / 
PUT_10,
+                "ms/item");
+    }
+    crc->i++;
+    crc->start = GNUNET_TIME_absolute_get ();
+    crc->phase++;
+    GNUNET_SCHEDULER_add_now (&test, crc);
+    return;
+  }
   /* most content is 32k */
   size = 32 * 1024;
   if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16) == 0)   /* but 
some of it is less! */
@@ -120,34 +165,23 @@
   memset (value, i, size);
   if (i > 255)
     memset (value, i - 255, size / 2);
-  value[0] = k;
+  value[0] = crc->i;
   memcpy (&value[4], &i, sizeof (i));
-  msg = NULL;
   prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
-  if (GNUNET_OK != api->put (api->cls, &key, size, value, 1 + i % 4 /* type */ 
,
-                             prio, i % 4 /* anonymity */ ,
-                             0 /* replication */ ,
-                             GNUNET_TIME_relative_to_absolute
-                             (GNUNET_TIME_relative_multiply
-                              (GNUNET_TIME_UNIT_MILLISECONDS,
-                               60 * 60 * 60 * 1000 +
-                               GNUNET_CRYPTO_random_u32
-                               (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg))
-  {
-    FPRINTF (stderr, "ERROR: `%s'\n", msg);
-    GNUNET_free_non_null (msg);
-    return;
-  }
-  ic++;
-  stored_bytes += size;
-  stored_ops++;
-  stored_entries++;
+  crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ ,
+                 prio, i % 4 /* anonymity */ ,
+                 0 /* replication */ ,
+                 GNUNET_TIME_relative_to_absolute
+                 (GNUNET_TIME_relative_multiply
+                   (GNUNET_TIME_UNIT_MILLISECONDS,
+                    60 * 60 * 60 * 1000 +
+                    GNUNET_CRYPTO_random_u32
+                      (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
+                 put_continuation, crc);
+  i++;
 }
 
-static void
-test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
-
 static int
 iterate_zeros (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
                const void *data, enum GNUNET_BLOCK_Type type, uint32_t 
priority,
@@ -342,7 +376,6 @@
 test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct CpsRunContext *crc = cls;
-  int j;
 
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
   {
@@ -361,25 +394,7 @@
                                         &cleaning_task, crc);
     break;
   case RP_PUT:
-    crc->start = GNUNET_TIME_absolute_get ();
-    for (j = 0; j < PUT_10; j++)
-      putValue (crc->api, j, crc->i);
-    crc->end = GNUNET_TIME_absolute_get ();
-    {
-      printf ("%s took %s for %llu items\n", "Storing an item",
-             GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_difference (crc->start,
-                                                                               
           crc->end),
-                                                     GNUNET_YES),
-              PUT_10);
-      if (PUT_10 > 0)
-        GAUGER (category, "Storing an item",
-                (crc->end.abs_value_us - crc->start.abs_value_us) / 1000LL / 
PUT_10,
-                "ms/item");
-    }
-    crc->i++;
-    crc->start = GNUNET_TIME_absolute_get ();
-    crc->phase++;
-    GNUNET_SCHEDULER_add_now (&test, crc);
+    do_put (crc);
     break;
   case RP_REP_GET:
     crc->api->get_replication (crc->api->cls, &replication_get, crc);

Modified: gnunet/src/datastore/plugin_datastore_heap.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_heap.c        2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_heap.c        2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -206,10 +206,10 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
+static void
 heap_plugin_put (void *cls,
                 const struct GNUNET_HashCode * key,
                 uint32_t size,
@@ -217,7 +217,9 @@
                 enum GNUNET_BLOCK_Type type,
                 uint32_t priority, uint32_t anonymity,
                 uint32_t replication,
-                struct GNUNET_TIME_Absolute expiration, char **msg)
+                struct GNUNET_TIME_Absolute expiration,
+                PluginPutCont cont,
+                void *cont_cls)
 {
   struct Plugin *plugin = cls;
   struct Value *value;
@@ -267,7 +269,7 @@
                                     value,
                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   plugin->size += size;
-  return GNUNET_OK;
+  cont (cont_cls, key, size, GNUNET_OK, NULL);
 }
 
 
@@ -615,14 +617,16 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-static int
+static void
 heap_plugin_update (void *cls,
                    uint64_t uid,
                    int delta,
-                   struct GNUNET_TIME_Absolute expire, char **msg)
+                   struct GNUNET_TIME_Absolute expire,
+                   PluginUpdateCont cont,
+                   void *cont_cls)
 {
   struct Plugin *plugin = cls;
   struct Value *value;
@@ -640,7 +644,7 @@
     value->priority = 0;
   else
     value->priority += delta;
-  return GNUNET_OK;
+  cont (cont_cls, GNUNET_OK, NULL);
 }
 
 
@@ -778,6 +782,7 @@
   GNUNET_CONTAINER_multihashmap_iterate (plugin->keyvalue,
                                         &return_value,
                                         &gac);
+  proc (proc_cls, NULL, 0);
 }
 
 

Modified: gnunet/src/datastore/plugin_datastore_mysql.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_mysql.c       2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_mysql.c       2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -280,14 +280,15 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
+static void
 mysql_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
                   const void *data, enum GNUNET_BLOCK_Type type,
                   uint32_t priority, uint32_t anonymity, uint32_t replication,
-                  struct GNUNET_TIME_Absolute expiration, char **msg)
+                  struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
+                  void *cont_cls)
 {
   struct Plugin *plugin = cls;
   unsigned int irepl = replication;
@@ -305,7 +306,8 @@
   if (size > MAX_DATUM_SIZE)
   {
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
+    return;
   }
   hashSize = sizeof (struct GNUNET_HashCode);
   hashSize2 = sizeof (struct GNUNET_HashCode);
@@ -322,13 +324,16 @@
                               MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
                               MYSQL_TYPE_BLOB, &vhash, hashSize2, &hashSize2,
                               MYSQL_TYPE_BLOB, data, lsize, &lsize, -1))
-    return GNUNET_SYSERR;
+  {
+    cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run 
failure"));
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Inserted value `%s' with size %u into gn090 table\n",
               GNUNET_h2s (key), (unsigned int) size);
   if (size > 0)
     plugin->env->duc (plugin->env->cls, size);
-  return GNUNET_OK;
+  cont (cont_cls, key, size, GNUNET_OK, NULL);
 }
 
 
@@ -352,12 +357,13 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-static int
+static void
 mysql_plugin_update (void *cls, uint64_t uid, int delta,
-                     struct GNUNET_TIME_Absolute expire, char **msg)
+                     struct GNUNET_TIME_Absolute expire,
+                     PluginUpdateCont cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
   unsigned long long vkey = uid;
@@ -379,7 +385,7 @@
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
                 vkey);
   }
-  return ret;
+  cont (cont_cls, ret, NULL);
 }
 
 
@@ -778,6 +784,7 @@
   if (statement == NULL)
   {
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    proc (proc_cls, NULL, 0);
     return;
   }
   if (mysql_stmt_prepare (statement, query, strlen (query)))
@@ -785,6 +792,7 @@
     GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "mysql",
                      _("Failed to prepare statement `%s'\n"), query);
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    proc (proc_cls, NULL, 0);
     return;
   }
   GNUNET_assert (proc != NULL);
@@ -795,6 +803,7 @@
                 "mysql_stmt_execute", query, __FILE__, __LINE__,
                 mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    proc (proc_cls, NULL, 0);
     return;
   }
   memset (cbind, 0, sizeof (cbind));
@@ -810,6 +819,7 @@
                 "mysql_stmt_bind_result", __FILE__, __LINE__,
                 mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
+    proc (proc_cls, NULL, 0);
     return;
   }
   while (0 == (ret = mysql_stmt_fetch (statement)))
@@ -817,6 +827,7 @@
     if (sizeof (struct GNUNET_HashCode) == length)
       proc (proc_cls, &key, 1);
   }
+  proc (proc_cls, NULL, 0);
   if (ret != MYSQL_NO_DATA)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,

Modified: gnunet/src/datastore/plugin_datastore_postgres.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_postgres.c    2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_postgres.c    2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -279,15 +279,16 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
+static void
 postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t 
size,
                      const void *data, enum GNUNET_BLOCK_Type type,
                      uint32_t priority, uint32_t anonymity,
                      uint32_t replication,
-                     struct GNUNET_TIME_Absolute expiration, char **msg)
+                     struct GNUNET_TIME_Absolute expiration, PluginPutCont 
cont,
+                     void *cont_cls)
 {
   struct Plugin *plugin = cls;
   struct GNUNET_HashCode vhash;
@@ -326,12 +327,15 @@
                       paramFormats, 1);
   if (GNUNET_OK !=
       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, 
"PQexecPrepared", "put"))
-    return GNUNET_SYSERR;
+  {
+    cont (cont_cls, key, size, GNUNET_SYSERR, _("Postgress exec failure"));
+    return;
+  }
   PQclear (ret);
   plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
                    "Stored %u bytes in database\n", (unsigned int) size);
-  return GNUNET_OK;
+  cont (cont_cls, key, size, GNUNET_OK, NULL);
 }
 
 
@@ -753,12 +757,13 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-static int
+static void
 postgres_plugin_update (void *cls, uint64_t uid, int delta,
-                        struct GNUNET_TIME_Absolute expire, char **msg)
+                        struct GNUNET_TIME_Absolute expire,
+                        PluginUpdateCont cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
   PGresult *ret;
@@ -783,9 +788,12 @@
                       paramFormats, 1);
   if (GNUNET_OK !=
       GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, 
"PQexecPrepared", "update"))
-    return GNUNET_SYSERR;
+  {
+    cont (cont_cls, GNUNET_SYSERR, NULL);
+    return;
+  }
   PQclear (ret);
-  return GNUNET_OK;
+  cont (cont_cls, GNUNET_OK, NULL);
 }
 
 
@@ -819,6 +827,7 @@
     }
   }
   PQclear (res);
+  proc (proc_cls, NULL, 0);
 }
 
 

Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c      2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c      2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -470,10 +470,10 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to an error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
+static void
 sqlite_plugin_put (void *cls,
                    const struct GNUNET_HashCode *key,
                    uint32_t size,
@@ -483,7 +483,8 @@
                    uint32_t anonymity,
                    uint32_t replication,
                    struct GNUNET_TIME_Absolute expiration,
-                   char **msg)
+                   PluginPutCont cont,
+                   void *cont_cls)
 {
   struct Plugin *plugin = cls;
   int n;
@@ -491,9 +492,13 @@
   sqlite3_stmt *stmt;
   struct GNUNET_HashCode vhash;
   uint64_t rvalue;
+  char *msg = NULL;
 
   if (size > MAX_ITEM_SIZE)
-    return GNUNET_SYSERR;
+  {
+    cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
+    return;
+  }
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
                    "Storing in database block with type %u/key `%s'/priority 
%u/expiration in %s (%s).\n",
                    type,
@@ -519,13 +524,15 @@
                           SQLITE_TRANSIENT)) ||
       (SQLITE_OK != sqlite3_bind_blob (stmt, 9, data, size, SQLITE_TRANSIENT)))
   {
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind_XXXX");
     if (SQLITE_OK != sqlite3_reset (stmt))
       LOG_SQLITE (plugin, NULL,
                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                   "sqlite3_reset");
-    return GNUNET_SYSERR;
+    cont (cont_cls, key, size, GNUNET_SYSERR, msg);
+    GNUNET_free_non_null(msg);
+    return;
   }
   n = sqlite3_step (stmt);
   switch (n)
@@ -539,12 +546,12 @@
     break;
   case SQLITE_BUSY:
     GNUNET_break (0);
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
     ret = GNUNET_SYSERR;
     break;
   default:
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
     if (SQLITE_OK != sqlite3_reset (stmt))
       LOG_SQLITE (plugin, NULL,
@@ -552,12 +559,15 @@
                   "sqlite3_reset");
     database_shutdown (plugin);
     database_setup (plugin->env->cfg, plugin);
-    return GNUNET_SYSERR;
+    cont (cont_cls, key, size, GNUNET_SYSERR, msg);
+    GNUNET_free_non_null(msg);
+    return;
   }
   if (SQLITE_OK != sqlite3_reset (stmt))
     LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_reset");
-  return ret;
+  cont (cont_cls, key, size, ret, msg);
+  GNUNET_free_non_null(msg);
 }
 
 
@@ -581,31 +591,34 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to an error message
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-static int
+static void
 sqlite_plugin_update (void *cls,
                       uint64_t uid,
                       int delta,
                       struct GNUNET_TIME_Absolute expire,
-                      char **msg)
+                      PluginUpdateCont cont,
+                      void *cont_cls)
 {
   struct Plugin *plugin = cls;
   int n;
+  char *msg = NULL;
 
   if ((SQLITE_OK != sqlite3_bind_int (plugin->updPrio, 1, delta)) ||
       (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 2, 
expire.abs_value_us))
       || (SQLITE_OK != sqlite3_bind_int64 (plugin->updPrio, 3, uid)))
   {
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_bind_XXXX");
     if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
       LOG_SQLITE (plugin, NULL,
                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                   "sqlite3_reset");
-    return GNUNET_SYSERR;
-
+    cont (cont_cls, GNUNET_SYSERR, msg);
+    GNUNET_free_non_null(msg);
+    return;
   }
   n = sqlite3_step (plugin->updPrio);
   if (SQLITE_OK != sqlite3_reset (plugin->updPrio))
@@ -615,15 +628,21 @@
   {
   case SQLITE_DONE:
     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n");
-    return GNUNET_OK;
+    cont (cont_cls, GNUNET_OK, NULL);
+    return;
   case SQLITE_BUSY:
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_WARNING | 
GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg,
+                GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
-    return GNUNET_NO;
+    cont (cont_cls, GNUNET_NO, msg);
+    GNUNET_free_non_null(msg);
+    return;
   default:
-    LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+    LOG_SQLITE (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "sqlite3_step");
-    return GNUNET_SYSERR;
+    cont (cont_cls, GNUNET_SYSERR, msg);
+    GNUNET_free_non_null(msg);
+    return;
   }
 }
 
@@ -1098,6 +1117,7 @@
   {
     LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                "sqlite_prepare");
+    proc (proc_cls, NULL, 0);
     return;
   }
   while (SQLITE_ROW == (ret = sqlite3_step (stmt)))
@@ -1111,6 +1131,7 @@
   if (SQLITE_DONE != ret)
     LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_step");
   sqlite3_finalize (stmt);
+  proc (proc_cls, NULL, 0);
 }
 
 

Modified: gnunet/src/datastore/plugin_datastore_template.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_template.c    2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/plugin_datastore_template.c    2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -69,19 +69,19 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-static int
+static void
 template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t 
size,
                      const void *data, enum GNUNET_BLOCK_Type type,
                      uint32_t priority, uint32_t anonymity,
                      uint32_t replication,
-                     struct GNUNET_TIME_Absolute expiration, char **msg)
+                     struct GNUNET_TIME_Absolute expiration, PluginPutCont 
cont,
+                     void *cont_cls)
 {
   GNUNET_break (0);
-  *msg = GNUNET_strdup ("not implemented");
-  return GNUNET_SYSERR;
+  cont (cont_cls, key, size, GNUNET_SYSERR, "not implemented");
 }
 
 
@@ -170,16 +170,16 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-static int
+static void
 template_plugin_update (void *cls, uint64_t uid, int delta,
-                        struct GNUNET_TIME_Absolute expire, char **msg)
+                        struct GNUNET_TIME_Absolute expire,
+                        PluginUpdateCont cont, void *cont_cls)
 {
   GNUNET_break (0);
-  *msg = GNUNET_strdup ("not implemented");
-  return GNUNET_SYSERR;
+  cont (cont_cls, GNUNET_SYSERR, "not implemented");
 }
 
 
@@ -226,6 +226,7 @@
                   PluginKeyProcessor proc,
                   void *proc_cls)
 {
+  proc (proc_cls, NULL, 0);
 }
 
 

Modified: gnunet/src/datastore/test_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/test_plugin_datastore.c        2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/datastore/test_plugin_datastore.c        2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -84,6 +84,35 @@
 
 
 static void
+test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+static void
+put_continuation (void *cls, const struct GNUNET_HashCode *key,
+                  uint32_t size, int status, char *msg)
+{
+  struct CpsRunContext *crc = cls;
+  static unsigned long long os;
+  unsigned long long cs;
+
+  if (GNUNET_OK != status)
+  {
+    FPRINTF (stderr, "ERROR: `%s'\n", msg);
+  }
+  else
+  {
+    crc->api->estimate_size (crc->api->cls, &cs);
+    GNUNET_assert (os <= cs);
+    os = cs;
+    stored_bytes += size;
+    stored_ops++;
+    stored_entries++;
+  }
+  GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+
+static void
 gen_key (int i, struct GNUNET_HashCode * key)
 {
   memset (key, 0, sizeof (struct GNUNET_HashCode));
@@ -93,14 +122,21 @@
 
 
 static void
-put_value (struct GNUNET_DATASTORE_PluginFunctions *api, int i, int k)
+do_put (struct CpsRunContext *crc)
 {
   char value[65536];
   size_t size;
   struct GNUNET_HashCode key;
-  char *msg;
   unsigned int prio;
+  static int i;
 
+  if (PUT_10 == i)
+  {
+    i = 0;
+    crc->phase++;
+    GNUNET_SCHEDULER_add_now (&test, crc);
+    return;
+  }
   /* most content is 32k */
   size = 32 * 1024;
 
@@ -113,36 +149,25 @@
   memset (value, i, size);
   if (i > 255)
     memset (value, i - 255, size / 2);
-  value[0] = k;
-  msg = NULL;
+  value[0] = crc->i;
   prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "putting type %u, anon %u under key %s\n", i + 1, i,
              GNUNET_h2s (&key));
-  if (GNUNET_OK != api->put (api->cls, &key, size, value, i + 1 /* type */ ,
-                             prio, i /* anonymity */ ,
-                             0 /* replication */ ,
-                             GNUNET_TIME_relative_to_absolute
-                             (GNUNET_TIME_relative_multiply
-                              (GNUNET_TIME_UNIT_MILLISECONDS,
-                               60 * 60 * 60 * 1000 +
-                               GNUNET_CRYPTO_random_u32
-                               (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), &msg))
-  {
-    FPRINTF (stderr, "ERROR: `%s'\n", msg);
-    GNUNET_free_non_null (msg);
-    return;
-  }
-  stored_bytes += size;
-  stored_ops++;
-  stored_entries++;
+  crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ ,
+                 prio, i /* anonymity */ ,
+                 0 /* replication */ ,
+                 GNUNET_TIME_relative_to_absolute
+                   (GNUNET_TIME_relative_multiply
+                     (GNUNET_TIME_UNIT_MILLISECONDS,
+                      60 * 60 * 60 * 1000 +
+                      GNUNET_CRYPTO_random_u32
+                      (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
+                 put_continuation, crc);
+  i++;
 }
 
 
-static void
-test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
 static uint64_t guid;
 
 
@@ -213,12 +238,20 @@
 
 
 static void
+update_continuation (void *cls, int status, char *msg)
+{
+  struct CpsRunContext *crc = cls;
+
+  GNUNET_assert (GNUNET_OK == status);
+  crc->phase++;
+  GNUNET_SCHEDULER_add_now (&test, crc);
+}
+
+
+static void
 test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct CpsRunContext *crc = cls;
-  int j;
-  unsigned long long os;
-  unsigned long long cs;
   struct GNUNET_HashCode key;
 
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -237,16 +270,7 @@
     GNUNET_SCHEDULER_add_now (&cleaning_task, crc);
     break;
   case RP_PUT:
-    os = 0;
-    for (j = 0; j < PUT_10; j++)
-    {
-      put_value (crc->api, j, crc->i);
-      crc->api->estimate_size (crc->api->cls, &cs);
-      GNUNET_assert (os <= cs);
-      os = cs;
-    }
-    crc->phase++;
-    GNUNET_SCHEDULER_add_now (&test, crc);
+    do_put (crc);
     break;
   case RP_GET:
     if (crc->cnt == 1)
@@ -261,11 +285,8 @@
                        GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc);
     break;
   case RP_UPDATE:
-    GNUNET_assert (GNUNET_OK ==
-                   crc->api->update (crc->api->cls, guid, 1,
-                                     GNUNET_TIME_UNIT_ZERO_ABS, NULL));
-    crc->phase++;
-    GNUNET_SCHEDULER_add_now (&test, crc);
+    crc->api->update (crc->api->cls, guid, 1, GNUNET_TIME_UNIT_ZERO_ABS,
+                      update_continuation, crc);
     break;
 
   case RP_ITER_ZERO:

Modified: gnunet/src/include/gnunet_datastore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_datastore_plugin.h        2015-03-20 23:44:57 UTC 
(rev 35418)
+++ gnunet/src/include/gnunet_datastore_plugin.h        2015-03-21 03:38:29 UTC 
(rev 35419)
@@ -113,6 +113,19 @@
 
 
 /**
+ * Put continuation.
+ *
+ * @param cls closure
+ * @param key key for the item stored
+ * @param size size of the item stored
+ * @param status GNUNET_OK or GNUNET_SYSERROR
+ * @param msg error message on error
+ */
+typedef void (*PluginPutCont) (void *cls, const struct GNUNET_HashCode *key,
+                               uint32_t size, int status, char *msg);
+
+
+/**
  * Store an item in the datastore.  If the item is already present,
  * the priorities and replication levels are summed up and the higher
  * expiration time and lower anonymity level is used.
@@ -126,22 +139,23 @@
  * @param anonymity anonymity-level for the content
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
- * @param msg set to an error message (on failure)
- * @return #GNUNET_OK on success,
- *         #GNUNET_SYSERR on failure
+ * @param cont continuation called with success or failure status
+ * @param cont_cls continuation closure
  */
-typedef int (*PluginPut) (void *cls, const struct GNUNET_HashCode * key, 
uint32_t size,
-                          const void *data, enum GNUNET_BLOCK_Type type,
-                          uint32_t priority, uint32_t anonymity,
-                          uint32_t replication,
-                          struct GNUNET_TIME_Absolute expiration, char **msg);
+typedef void (*PluginPut) (void *cls, const struct GNUNET_HashCode * key,
+                           uint32_t size,
+                           const void *data, enum GNUNET_BLOCK_Type type,
+                           uint32_t priority, uint32_t anonymity,
+                           uint32_t replication,
+                           struct GNUNET_TIME_Absolute expiration,
+                           PluginPutCont cont, void *cont_cls);
 
 
 /**
  * An processor over a set of keys stored in the datastore.
  *
  * @param cls closure
- * @param key key in the data store
+ * @param key key in the data store, if NULL iteration is finished
  * @param count how many values are stored under this key in the datastore
  */
 typedef void (*PluginKeyProcessor) (void *cls,
@@ -174,8 +188,6 @@
  *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
- * @param min find the smallest key that is larger than the given min,
- *            NULL for no minimum (return smallest key)
  * @param proc function to call on the matching value;
  *        proc should be called with NULL if there is no result
  * @param proc_cls closure for @a proc
@@ -201,6 +213,14 @@
                                  void *proc_cls);
 
 
+/**
+ * Update continuation.
+ *
+ * @param cls closure
+ * @param status GNUNET_OK or GNUNET_SYSERROR
+ * @param msg error message on error
+ */
+typedef void (*PluginUpdateCont) (void *cls, int status, char *msg);
 
 
 /**
@@ -220,11 +240,12 @@
  * @param expire new expiration time should be the
  *     MAX of any existing expiration time and
  *     this value
- * @param msg set to an error message (on error)
- * @return #GNUNET_OK on success
+ * @param cont continuation called with success or failure status
+ * @param cons_cls continuation closure
  */
-typedef int (*PluginUpdate) (void *cls, uint64_t uid, int delta,
-                             struct GNUNET_TIME_Absolute expire, char **msg);
+typedef void (*PluginUpdate) (void *cls, uint64_t uid, int delta,
+                              struct GNUNET_TIME_Absolute expire,
+                              PluginUpdateCont cont, void *cont_cls);
 
 
 /**




reply via email to

[Prev in Thread] Current Thread [Next in Thread]