gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: [datastore] Combine put and


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: [datastore] Combine put and update plugin APIs
Date: Sun, 16 Apr 2017 19:42:49 +0200

This is an automated email from the git hooks/post-receive script.

david-barksdale pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 4907330f5 [datastore] Combine put and update plugin APIs
4907330f5 is described below

commit 4907330f51ffd48af1f7bac6f43c7c7f78c37818
Author: David Barksdale <address@hidden>
AuthorDate: Sun Apr 16 12:39:43 2017 -0500

    [datastore] Combine put and update plugin APIs
    
    This resolves issue #4965.
---
 src/datastore/gnunet-service-datastore.c  | 194 ++++--------------------------
 src/datastore/perf_plugin_datastore.c     |  13 +-
 src/datastore/plugin_datastore_heap.c     | 170 ++++++++++++++++----------
 src/datastore/plugin_datastore_mysql.c    | 127 ++++++++-----------
 src/datastore/plugin_datastore_postgres.c | 129 +++++++++-----------
 src/datastore/plugin_datastore_sqlite.c   | 149 ++++++++++-------------
 src/datastore/plugin_datastore_template.c |  49 ++------
 src/datastore/test_plugin_datastore.c     |  36 ++----
 src/include/gnunet_datastore_plugin.h     |  75 ++----------
 9 files changed, 345 insertions(+), 597 deletions(-)

diff --git a/src/datastore/gnunet-service-datastore.c 
b/src/datastore/gnunet-service-datastore.c
index 277530843..d965ad8e0 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -733,41 +733,23 @@ check_data (const struct DataMessage *dm)
 
 
 /**
- * Context for a PUT request used to see if the content is
- * already present.
- */
-struct PutContext
-{
-  /**
-   * Client to notify on completion.
-   */
-  struct GNUNET_SERVICE_Client *client;
-
-#if ! HAVE_UNALIGNED_64_ACCESS
-  void *reserved;
-#endif
-
-  /* followed by the 'struct DataMessage' */
-};
-
-
-/**
  * Put continuation.
  *
  * @param cls closure
  * @param key key for the item stored
  * @param size size of the item stored
- * @param status #GNUNET_OK or #GNUNET_SYSERROR
+ * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
+ *        or #GNUNET_SYSERROR if error
  * @param msg error message on error
  */
 static void
 put_continuation (void *cls,
-                 const struct GNUNET_HashCode *key,
-                 uint32_t size,
+                  const struct GNUNET_HashCode *key,
+                  uint32_t size,
                   int status,
-                 const char *msg)
+                  const char *msg)
 {
-  struct PutContext *pc = cls;
+  struct GNUNET_SERVICE_Client *client = cls;
 
   if (GNUNET_OK == status)
   {
@@ -782,10 +764,9 @@ put_continuation (void *cls,
                 size,
                 GNUNET_h2s (key));
   }
-  transmit_status (pc->client,
-                   status,
+  transmit_status (client,
+                   GNUNET_SYSERR == status ? GNUNET_SYSERR : GNUNET_OK,
                    msg);
-  GNUNET_free (pc);
   if (quota - reserved - cache_size < payload)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -799,125 +780,6 @@ put_continuation (void *cls,
 
 
 /**
- * Actually put the data message.
- *
- * @param pc put context
- */
-static void
-execute_put (struct PutContext *pc)
-{
-  const struct DataMessage *dm;
-
-  dm = (const struct DataMessage *) &pc[1];
-  plugin->api->put (plugin->api->cls,
-                    &dm->key,
-                    ntohl (dm->size),
-                    &dm[1],
-                    ntohl (dm->type),
-                    ntohl (dm->priority),
-                    ntohl (dm->anonymity),
-                    ntohl (dm->replication),
-                    GNUNET_TIME_absolute_ntoh (dm->expiration),
-                    &put_continuation,
-                    pc);
-}
-
-
-/**
- *
- * @param cls closure
- * @param status #GNUNET_OK or #GNUNET_SYSERR
- * @param msg error message on error
- */
-static void
-check_present_continuation (void *cls,
-                           int status,
-                           const char *msg)
-{
-  struct GNUNET_SERVICE_Client *client = cls;
-
-  transmit_status (client,
-                   GNUNET_NO,
-                   NULL);
-}
-
-
-/**
- * Function that will check if the given datastore entry
- * matches the put and if none match executes the put.
- *
- * @param cls closure, pointer to the client (of type `struct PutContext`).
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param replication replication-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
- * @return #GNUNET_OK usually
- *         #GNUNET_NO to delete the item
- */
-static int
-check_present (void *cls,
-               const struct GNUNET_HashCode *key,
-               uint32_t size,
-               const void *data,
-               enum GNUNET_BLOCK_Type type,
-               uint32_t priority,
-               uint32_t anonymity,
-               uint32_t replication,
-               struct GNUNET_TIME_Absolute expiration,
-               uint64_t uid)
-{
-  struct PutContext *pc = cls;
-  const struct DataMessage *dm;
-
-  dm = (const struct DataMessage *) &pc[1];
-  if (key == NULL)
-  {
-    execute_put (pc);
-    return GNUNET_OK;
-  }
-  if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
-       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
-       ( (size == ntohl (dm->size)) &&
-         (0 == memcmp (&dm[1],
-                       data,
-                       size)) ) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Result already present in datastore\n");
-    if ( (ntohl (dm->priority) > 0) ||
-         (ntohl (dm->replication) > 0) ||
-         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
-          expiration.abs_value_us) )
-      plugin->api->update (plugin->api->cls,
-                           uid,
-                           ntohl (dm->priority),
-                           ntohl (dm->replication),
-                           GNUNET_TIME_absolute_ntoh (dm->expiration),
-                           &check_present_continuation,
-                           pc->client);
-    else
-    {
-      transmit_status (pc->client,
-                       GNUNET_NO,
-                       NULL);
-    }
-    GNUNET_free (pc);
-  }
-  else
-  {
-    execute_put (pc);
-  }
-  return GNUNET_OK;
-}
-
-
-/**
  * Verify PUT-message.
  *
  * @param cls identification of the client
@@ -950,8 +812,6 @@ handle_put (void *cls,
   struct GNUNET_SERVICE_Client *client = cls;
   int rid;
   struct ReservationList *pos;
-  struct PutContext *pc;
-  struct GNUNET_HashCode vhash;
   uint32_t size;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -979,30 +839,20 @@ handle_put (void *cls,
                              GNUNET_NO);
     }
   }
-  pc = GNUNET_malloc (sizeof (struct PutContext) + size +
-                      sizeof (struct DataMessage));
-  pc->client = client;
-  GNUNET_memcpy (&pc[1],
-                 dm,
-                 size + sizeof (struct DataMessage));
-  if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
-                                                       &dm->key))
-  {
-    GNUNET_CRYPTO_hash (&dm[1],
-                        size,
-                        &vhash);
-    plugin->api->get_key (plugin->api->cls,
-                          0,
-                          false,
-                          &dm->key,
-                          &vhash,
-                          ntohl (dm->type),
-                          &check_present,
-                          pc);
-    GNUNET_SERVICE_client_continue (client);
-    return;
-  }
-  execute_put (pc);
+  bool absent = GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (filter,
+                                                                &dm->key);
+  plugin->api->put (plugin->api->cls,
+                    &dm->key,
+                    absent,
+                    ntohl (dm->size),
+                    &dm[1],
+                    ntohl (dm->type),
+                    ntohl (dm->priority),
+                    ntohl (dm->anonymity),
+                    ntohl (dm->replication),
+                    GNUNET_TIME_absolute_ntoh (dm->expiration),
+                    &put_continuation,
+                    client);
   GNUNET_SERVICE_client_continue (client);
 }
 
diff --git a/src/datastore/perf_plugin_datastore.c 
b/src/datastore/perf_plugin_datastore.c
index 2f9502989..d6f44bf9f 100644
--- a/src/datastore/perf_plugin_datastore.c
+++ b/src/datastore/perf_plugin_datastore.c
@@ -181,8 +181,14 @@ do_put (struct CpsRunContext *crc)
   value[0] = crc->i;
   GNUNET_memcpy (&value[4], &i, sizeof (i));
   prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
-  crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ ,
-                 prio, i % 4 /* anonymity */ ,
+  crc->api->put (crc->api->cls,
+                 &key,
+                 false /* absent */,
+                 size,
+                 value,
+                 1 + i % 4 /* type */ ,
+                 prio,
+                 i % 4 /* anonymity */ ,
                  0 /* replication */ ,
                  GNUNET_TIME_relative_to_absolute
                  (GNUNET_TIME_relative_multiply
@@ -190,7 +196,8 @@ do_put (struct CpsRunContext *crc)
                     60 * 60 * 60 * 1000 +
                     GNUNET_CRYPTO_random_u32
                       (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
-                 put_continuation, crc);
+                 put_continuation,
+                 crc);
   i++;
 }
 
diff --git a/src/datastore/plugin_datastore_heap.c 
b/src/datastore/plugin_datastore_heap.c
index d04c1cf60..6dbc15ebd 100644
--- a/src/datastore/plugin_datastore_heap.c
+++ b/src/datastore/plugin_datastore_heap.c
@@ -195,10 +195,89 @@ heap_plugin_estimate_size (void *cls, unsigned long long 
*estimate)
 
 
 /**
+ * Closure for iterator for updating.
+ */
+struct UpdateContext
+{
+  /**
+   * Number of bytes in 'data'.
+   */
+  uint32_t size;
+
+  /**
+   * Pointer to the data.
+   */
+  const void *data;
+
+  /**
+   * Priority of the value.
+   */
+  uint32_t priority;
+
+  /**
+   * Replication level for the value.
+   */
+  uint32_t replication;
+
+  /**
+   * Expiration time for this value.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * True if the value was found and updated.
+   */
+  bool updated;
+};
+
+
+/**
+ * Update the matching value.
+ *
+ * @param cls the 'struct UpdateContext'
+ * @param key unused
+ * @param val the 'struct Value'
+ * @return GNUNET_YES (continue iteration), GNUNET_NO if value was found
+ */
+static int
+update_iterator (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 void *val)
+{
+  struct UpdateContext *uc = cls;
+  struct Value *value = val;
+
+  if (value->size != uc->size)
+    return GNUNET_YES;
+  if (0 != memcmp (value->data, uc->data, uc->size))
+    return GNUNET_YES;
+  uc->expiration = GNUNET_TIME_absolute_max (value->expiration,
+                                             uc->expiration);
+  if (value->expiration.abs_value_us != uc->expiration.abs_value_us)
+  {
+    value->expiration = uc->expiration;
+    GNUNET_CONTAINER_heap_update_cost (value->expire_heap,
+                                       value->expiration.abs_value_us);
+  }
+  /* Saturating adds, don't overflow */
+  if (value->priority > UINT32_MAX - uc->priority)
+    value->priority = UINT32_MAX;
+  else
+    value->priority += uc->priority;
+  if (value->replication > UINT32_MAX - uc->replication)
+    value->replication = UINT32_MAX;
+  else
+    value->replication += uc->replication;
+  uc->updated = true;
+  return GNUNET_NO;
+}
+
+/**
  * Store an item in the datastore.
  *
  * @param cls closure
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in data
  * @param data content stored
  * @param type type of the content
@@ -211,19 +290,40 @@ heap_plugin_estimate_size (void *cls, unsigned long long 
*estimate)
  */
 static void
 heap_plugin_put (void *cls,
-                const struct GNUNET_HashCode * key,
-                uint32_t size,
-                const void *data,
-                enum GNUNET_BLOCK_Type type,
-                uint32_t priority, uint32_t anonymity,
-                uint32_t replication,
-                struct GNUNET_TIME_Absolute expiration,
-                PluginPutCont cont,
-                void *cont_cls)
+                 const struct GNUNET_HashCode *key,
+                 bool absent,
+                 uint32_t size,
+                 const void *data,
+                 enum GNUNET_BLOCK_Type type,
+                 uint32_t priority,
+                 uint32_t anonymity,
+                 uint32_t replication,
+                 struct GNUNET_TIME_Absolute expiration,
+                 PluginPutCont cont,
+                 void *cont_cls)
 {
   struct Plugin *plugin = cls;
   struct Value *value;
 
+  if (!absent) {
+    struct UpdateContext uc;
+
+    uc.size = size;
+    uc.data = data;
+    uc.priority = priority;
+    uc.replication = replication;
+    uc.expiration = expiration;
+    uc.updated = false;
+    GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
+                                                key,
+                                                &update_iterator,
+                                                &uc);
+    if (uc.updated)
+    {
+      cont (cont_cls, key, size, GNUNET_NO, NULL);
+      return;
+    }
+  }
   value = GNUNET_malloc (sizeof (struct Value) + size);
   value->key = *key;
   value->data = &value[1];
@@ -551,57 +651,6 @@ heap_plugin_get_expiration (void *cls, 
PluginDatumProcessor proc,
 
 
 /**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our `struct Plugin *`
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-heap_plugin_update (void *cls,
-                    uint64_t uid,
-                    uint32_t priority,
-                    uint32_t replication,
-                    struct GNUNET_TIME_Absolute expire,
-                    PluginUpdateCont cont,
-                    void *cont_cls)
-{
-  struct Value *value;
-
-  value = (struct Value*) (intptr_t) uid;
-  GNUNET_assert (NULL != value);
-  if (value->expiration.abs_value_us != expire.abs_value_us)
-  {
-    value->expiration = expire;
-    GNUNET_CONTAINER_heap_update_cost (value->expire_heap,
-                                      expire.abs_value_us);
-  }
-  /* Saturating adds, don't overflow */
-  if (value->priority > UINT32_MAX - priority)
-    value->priority = UINT32_MAX;
-  else
-    value->priority += priority;
-  if (value->replication > UINT32_MAX - replication)
-    value->replication = UINT32_MAX;
-  else
-    value->replication += replication;
-  cont (cont_cls, GNUNET_OK, NULL);
-}
-
-
-/**
  * Call the given processor on an item with zero anonymity.
  *
  * @param cls our "struct Plugin*"
@@ -758,7 +807,6 @@ libgnunet_plugin_datastore_heap_init (void *cls)
   api->cls = plugin;
   api->estimate_size = &heap_plugin_estimate_size;
   api->put = &heap_plugin_put;
-  api->update = &heap_plugin_update;
   api->get_key = &heap_plugin_get_key;
   api->get_replication = &heap_plugin_get_replication;
   api->get_expiration = &heap_plugin_get_expiration;
diff --git a/src/datastore/plugin_datastore_mysql.c 
b/src/datastore/plugin_datastore_mysql.c
index 6f2a76499..edc459272 100644
--- a/src/datastore/plugin_datastore_mysql.c
+++ b/src/datastore/plugin_datastore_mysql.c
@@ -198,8 +198,8 @@ struct Plugin
 #define UPDATE_ENTRY "UPDATE gn090 SET "\
   "prio = prio + ?, "\
   "repl = repl + ?, "\
-  "expire = IF(expire >= ?, expire, ?) "\
-  "WHERE uid = ?"
+  "expire = GREATEST(expire, ?) "\
+  "WHERE hash = ? AND vhash = ?"
   struct GNUNET_MYSQL_StatementHandle *update_entry;
 
 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
@@ -330,6 +330,7 @@ mysql_plugin_estimate_size (void *cls,
  *
  * @param cls closure
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
@@ -343,6 +344,7 @@ mysql_plugin_estimate_size (void *cls,
 static void
 mysql_plugin_put (void *cls,
                   const struct GNUNET_HashCode *key,
+                  bool absent,
                   uint32_t size,
                   const void *data,
                   enum GNUNET_BLOCK_Type type,
@@ -355,9 +357,54 @@ mysql_plugin_put (void *cls,
 {
   struct Plugin *plugin = cls;
   uint64_t lexpiration = expiration.abs_value_us;
+  struct GNUNET_HashCode vhash;
+
+  GNUNET_CRYPTO_hash (data,
+                      size,
+                      &vhash);
+  if (!absent)
+  {
+    struct GNUNET_MY_QueryParam params_update[] = {
+      GNUNET_MY_query_param_uint32 (&priority),
+      GNUNET_MY_query_param_uint32 (&replication),
+      GNUNET_MY_query_param_uint64 (&lexpiration),
+      GNUNET_MY_query_param_auto_from_type (key),
+      GNUNET_MY_query_param_auto_from_type (&vhash),
+      GNUNET_MY_query_param_end
+    };
+
+    if (GNUNET_OK !=
+        GNUNET_MY_exec_prepared (plugin->mc,
+                                 plugin->update_entry,
+                                 params_update))
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_SYSERR,
+            _("MySQL statement run failure"));
+      return;
+    }
+
+    MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->update_entry);
+    my_ulonglong rows = mysql_stmt_affected_rows (stmt);
+
+    GNUNET_break (GNUNET_NO ==
+                  GNUNET_MY_extract_result (plugin->update_entry,
+                                            NULL));
+    if (0 != rows)
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_NO,
+            NULL);
+      return;
+    }
+  }
+
   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                UINT64_MAX);
-  struct GNUNET_HashCode vhash;
   struct GNUNET_MY_QueryParam params_insert[] = {
     GNUNET_MY_query_param_uint32 (&replication),
     GNUNET_MY_query_param_uint32 (&type),
@@ -377,9 +424,6 @@ mysql_plugin_put (void *cls,
     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
     return;
   }
-  GNUNET_CRYPTO_hash (data,
-                      size,
-                      &vhash);
 
   if (GNUNET_OK !=
       GNUNET_MY_exec_prepared (plugin->mc,
@@ -412,76 +456,6 @@ mysql_plugin_put (void *cls,
 
 
 /**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-mysql_plugin_update (void *cls,
-                     uint64_t uid,
-                     uint32_t priority,
-                     uint32_t replication,
-                     struct GNUNET_TIME_Absolute expire,
-                     PluginUpdateCont cont,
-                     void *cont_cls)
-{
-  struct Plugin *plugin = cls;
-  uint64_t lexpire = expire.abs_value_us;
-  int ret;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Updating value %llu adding %d to priority %d to replication and 
maxing exp at %s\n",
-              (unsigned long long) uid,
-              priority,
-              replication,
-              GNUNET_STRINGS_absolute_time_to_string (expire));
-
-  struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint32 (&priority),
-    GNUNET_MY_query_param_uint32 (&replication),
-    GNUNET_MY_query_param_uint64 (&lexpire),
-    GNUNET_MY_query_param_uint64 (&lexpire),
-    GNUNET_MY_query_param_uint64 (&uid),
-    GNUNET_MY_query_param_end
-  };
-
-  ret = GNUNET_MY_exec_prepared (plugin->mc,
-                                 plugin->update_entry,
-                                 params_update);
-
-  if (GNUNET_OK != ret)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Failed to update value %llu\n",
-                (unsigned long long) uid);
-  }
-  else
-  {
-    GNUNET_break (GNUNET_NO ==
-                  GNUNET_MY_extract_result (plugin->update_entry,
-                                            NULL));
-  }
-  cont (cont_cls,
-        ret,
-        NULL);
-}
-
-
-/**
  * Run the given select statement and call 'proc' on the resulting
  * values (which must be in particular positions).
  *
@@ -1197,7 +1171,6 @@ libgnunet_plugin_datastore_mysql_init (void *cls)
   api->cls = plugin;
   api->estimate_size = &mysql_plugin_estimate_size;
   api->put = &mysql_plugin_put;
-  api->update = &mysql_plugin_update;
   api->get_key = &mysql_plugin_get_key;
   api->get_replication = &mysql_plugin_get_replication;
   api->get_expiration = &mysql_plugin_get_expiration;
diff --git a/src/datastore/plugin_datastore_postgres.c 
b/src/datastore/plugin_datastore_postgres.c
index 87a7acbdc..349848ae6 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -195,8 +195,8 @@ init_connection (struct Plugin *plugin)
                    "UPDATE gn090 "
                    "SET prio = prio + $1, "
                    "repl = repl + $2, "
-                   "expire = CASE WHEN expire < $3 THEN $3 ELSE expire END "
-                   "WHERE oid = $4", 4)) ||
+                   "expire = GREATEST(expire, $3) "
+                   "WHERE hash = $4 AND vhash = $5", 5)) ||
       (GNUNET_OK !=
        GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
                    "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
@@ -288,6 +288,7 @@ postgres_plugin_estimate_size (void *cls, unsigned long 
long *estimate)
  *
  * @param cls closure with the `struct Plugin`
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in data
  * @param data content stored
  * @param type type of the content
@@ -300,23 +301,70 @@ postgres_plugin_estimate_size (void *cls, unsigned long 
long *estimate)
  */
 static void
 postgres_plugin_put (void *cls,
-                    const struct GNUNET_HashCode *key,
-                    uint32_t size,
+                     const struct GNUNET_HashCode *key,
+                     bool absent,
+                     uint32_t size,
                      const void *data,
-                    enum GNUNET_BLOCK_Type type,
+                     enum GNUNET_BLOCK_Type type,
                      uint32_t priority,
-                    uint32_t anonymity,
+                     uint32_t anonymity,
                      uint32_t replication,
                      struct GNUNET_TIME_Absolute expiration,
-                    PluginPutCont cont,
+                     PluginPutCont cont,
                      void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  uint32_t utype = type;
   struct GNUNET_HashCode vhash;
+  PGresult *ret;
+
+  GNUNET_CRYPTO_hash (data,
+                      size,
+                      &vhash);
+
+  if (!absent)
+  {
+    struct GNUNET_PQ_QueryParam params[] = {
+      GNUNET_PQ_query_param_uint32 (&priority),
+      GNUNET_PQ_query_param_uint32 (&replication),
+      GNUNET_PQ_query_param_absolute_time (&expiration),
+      GNUNET_PQ_query_param_auto_from_type (key),
+      GNUNET_PQ_query_param_auto_from_type (&vhash),
+      GNUNET_PQ_query_param_end
+    };
+    ret = GNUNET_PQ_exec_prepared (plugin->dbh,
+                                   "update",
+                                   params);
+    if (GNUNET_OK !=
+        GNUNET_POSTGRES_check_result (plugin->dbh,
+                                      ret,
+                                      PGRES_COMMAND_OK,
+                                      "PQexecPrepared",
+                                      "update"))
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_SYSERR,
+            _("Postgress exec failure"));
+      return;
+    }
+    /* What an awful API, this function really does return a string */
+    bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
+    PQclear (ret);
+    if (affected)
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_NO,
+            NULL);
+      return;
+    }
+  }
+
+  uint32_t utype = type;
   uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
                                               UINT64_MAX);
-  PGresult *ret;
   struct GNUNET_PQ_QueryParam params[] = {
     GNUNET_PQ_query_param_uint32 (&replication),
     GNUNET_PQ_query_param_uint32 (&utype),
@@ -330,7 +378,6 @@ postgres_plugin_put (void *cls,
     GNUNET_PQ_query_param_end
   };
 
-  GNUNET_CRYPTO_hash (data, size, &vhash);
   ret = GNUNET_PQ_exec_prepared (plugin->dbh,
                                 "put",
                                 params);
@@ -750,67 +797,6 @@ postgres_plugin_get_expiration (void *cls,
 
 
 /**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our `struct Plugin *`
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-postgres_plugin_update (void *cls,
-                        uint64_t uid,
-                        uint32_t priority,
-                        uint32_t replication,
-                        struct GNUNET_TIME_Absolute expire,
-                        PluginUpdateCont cont,
-                        void *cont_cls)
-{
-  struct Plugin *plugin = cls;
-  uint32_t oid = (uint32_t) uid;
-  struct GNUNET_PQ_QueryParam params[] = {
-    GNUNET_PQ_query_param_uint32 (&priority),
-    GNUNET_PQ_query_param_uint32 (&replication),
-    GNUNET_PQ_query_param_absolute_time (&expire),
-    GNUNET_PQ_query_param_uint32 (&oid),
-    GNUNET_PQ_query_param_end
-  };
-  PGresult *ret;
-
-  ret = GNUNET_PQ_exec_prepared (plugin->dbh,
-                                "update",
-                                params);
-  if (GNUNET_OK !=
-      GNUNET_POSTGRES_check_result (plugin->dbh,
-                                   ret,
-                                   PGRES_COMMAND_OK,
-                                   "PQexecPrepared",
-                                   "update"))
-  {
-    cont (cont_cls,
-         GNUNET_SYSERR,
-         NULL);
-    return;
-  }
-  PQclear (ret);
-  cont (cont_cls,
-       GNUNET_OK,
-       NULL);
-}
-
-
-/**
  * Get all of the keys in the datastore.
  *
  * @param cls closure with the `struct Plugin *`
@@ -891,7 +877,6 @@ libgnunet_plugin_datastore_postgres_init (void *cls)
   api->cls = plugin;
   api->estimate_size = &postgres_plugin_estimate_size;
   api->put = &postgres_plugin_put;
-  api->update = &postgres_plugin_update;
   api->get_key = &postgres_plugin_get_key;
   api->get_replication = &postgres_plugin_get_replication;
   api->get_expiration = &postgres_plugin_get_expiration;
diff --git a/src/datastore/plugin_datastore_sqlite.c 
b/src/datastore/plugin_datastore_sqlite.c
index 1f874e190..469dd7717 100644
--- a/src/datastore/plugin_datastore_sqlite.c
+++ b/src/datastore/plugin_datastore_sqlite.c
@@ -95,7 +95,7 @@ struct Plugin
   /**
    * Precompiled SQL for update.
    */
-  sqlite3_stmt *updPrio;
+  sqlite3_stmt *update;
 
   /**
    * Get maximum repl value in database.
@@ -356,8 +356,8 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle 
*cfg,
                     "SET prio = prio + ?, "
                     "repl = repl + ?, "
                     "expire = MAX(expire, ?) "
-                    "WHERE _ROWID_ = ?",
-                    &plugin->updPrio)) ||
+                    "WHERE hash = ? AND vhash = ?",
+                    &plugin->update)) ||
        (SQLITE_OK !=
         sq_prepare (plugin->dbh,
                     "UPDATE gn090 " "SET repl = MAX (0, repl - 1) WHERE 
_ROWID_ = ?",
@@ -450,8 +450,8 @@ database_shutdown (struct Plugin *plugin)
 
   if (NULL != plugin->delRow)
     sqlite3_finalize (plugin->delRow);
-  if (NULL != plugin->updPrio)
-    sqlite3_finalize (plugin->updPrio);
+  if (NULL != plugin->update)
+    sqlite3_finalize (plugin->update);
   if (NULL != plugin->updRepl)
     sqlite3_finalize (plugin->updRepl);
   if (NULL != plugin->selRepl)
@@ -541,6 +541,7 @@ delete_by_rowid (struct Plugin *plugin,
  *
  * @param cls closure
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
@@ -554,6 +555,7 @@ delete_by_rowid (struct Plugin *plugin,
 static void
 sqlite_plugin_put (void *cls,
                    const struct GNUNET_HashCode *key,
+                   bool absent,
                    uint32_t size,
                    const void *data,
                    enum GNUNET_BLOCK_Type type,
@@ -564,8 +566,63 @@ sqlite_plugin_put (void *cls,
                    PluginPutCont cont,
                    void *cont_cls)
 {
-  uint64_t rvalue;
+  struct Plugin *plugin = cls;
   struct GNUNET_HashCode vhash;
+  char *msg = NULL;
+
+  GNUNET_CRYPTO_hash (data,
+                      size,
+                      &vhash);
+
+  if (!absent)
+  {
+    struct GNUNET_SQ_QueryParam params[] = {
+      GNUNET_SQ_query_param_uint32 (&priority),
+      GNUNET_SQ_query_param_uint32 (&replication),
+      GNUNET_SQ_query_param_absolute_time (&expiration),
+      GNUNET_SQ_query_param_auto_from_type (key),
+      GNUNET_SQ_query_param_auto_from_type (&vhash),
+      GNUNET_SQ_query_param_end
+    };
+
+    if (GNUNET_OK !=
+        GNUNET_SQ_bind (plugin->update,
+                        params))
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_SYSERR,
+            _("sqlite bind failure"));
+      return;
+    }
+    if (SQLITE_DONE != sqlite3_step (plugin->update))
+    {
+      LOG_SQLITE_MSG (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | 
GNUNET_ERROR_TYPE_BULK,
+                      "sqlite3_step");
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_SYSERR,
+            msg);
+      GNUNET_free_non_null (msg);
+      return;
+    }
+    int changes = sqlite3_changes (plugin->dbh);
+    GNUNET_SQ_reset (plugin->dbh,
+                     plugin->update);
+    if (0 != changes)
+    {
+      cont (cont_cls,
+            key,
+            size,
+            GNUNET_NO,
+            NULL);
+      return;
+    }
+  }
+
+  uint64_t rvalue;
   uint32_t type32 = (uint32_t) type;
   struct GNUNET_SQ_QueryParam params[] = {
     GNUNET_SQ_query_param_uint32 (&replication),
@@ -579,11 +636,9 @@ sqlite_plugin_put (void *cls,
     GNUNET_SQ_query_param_fixed_size (data, size),
     GNUNET_SQ_query_param_end
   };
-  struct Plugin *plugin = cls;
   int n;
   int ret;
   sqlite3_stmt *stmt;
-  char *msg = NULL;
 
   if (size > MAX_ITEM_SIZE)
   {
@@ -598,15 +653,13 @@ sqlite_plugin_put (void *cls,
                    GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_remaining (expiration),
                                                           GNUNET_YES),
                    GNUNET_STRINGS_absolute_time_to_string (expiration));
-  GNUNET_CRYPTO_hash (data, size, &vhash);
   stmt = plugin->insertContent;
   rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
   if (GNUNET_OK !=
       GNUNET_SQ_bind (stmt,
                       params))
   {
-    cont (cont_cls, key, size, GNUNET_SYSERR, msg);
-    GNUNET_free_non_null(msg);
+    cont (cont_cls, key, size, GNUNET_SYSERR, NULL);
     return;
   }
   n = sqlite3_step (stmt);
@@ -646,79 +699,6 @@ sqlite_plugin_put (void *cls,
 
 
 /**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls the plugin context (state for this module)
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls closure for @a cont
- */
-static void
-sqlite_plugin_update (void *cls,
-                      uint64_t uid,
-                      uint32_t priority,
-                      uint32_t replication,
-                      struct GNUNET_TIME_Absolute expire,
-                      PluginUpdateCont cont,
-                      void *cont_cls)
-{
-  struct Plugin *plugin = cls;
-  struct GNUNET_SQ_QueryParam params[] = {
-    GNUNET_SQ_query_param_uint32 (&priority),
-    GNUNET_SQ_query_param_uint32 (&replication),
-    GNUNET_SQ_query_param_absolute_time (&expire),
-    GNUNET_SQ_query_param_uint64 (&uid),
-    GNUNET_SQ_query_param_end
-  };
-  int n;
-  char *msg = NULL;
-
-  if (GNUNET_OK !=
-      GNUNET_SQ_bind (plugin->updPrio,
-                      params))
-  {
-    cont (cont_cls, GNUNET_SYSERR, msg);
-    GNUNET_free_non_null(msg);
-    return;
-  }
-  n = sqlite3_step (plugin->updPrio);
-  GNUNET_SQ_reset (plugin->dbh,
-                   plugin->updPrio);
-  switch (n)
-  {
-  case SQLITE_DONE:
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n");
-    cont (cont_cls, GNUNET_OK, NULL);
-    return;
-  case SQLITE_BUSY:
-    LOG_SQLITE_MSG (plugin, &msg,
-                    GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
-                    "sqlite3_step");
-    cont (cont_cls, GNUNET_NO, msg);
-    GNUNET_free_non_null(msg);
-    return;
-  default:
-    LOG_SQLITE_MSG (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | 
GNUNET_ERROR_TYPE_BULK,
-                    "sqlite3_step");
-    cont (cont_cls, GNUNET_SYSERR, msg);
-    GNUNET_free_non_null(msg);
-    return;
-  }
-}
-
-
-/**
  * Execute statement that gets a row and call the callback
  * with the result.  Resets the statement afterwards.
  *
@@ -1300,7 +1280,6 @@ libgnunet_plugin_datastore_sqlite_init (void *cls)
   api->cls = &plugin;
   api->estimate_size = &sqlite_plugin_estimate_size;
   api->put = &sqlite_plugin_put;
-  api->update = &sqlite_plugin_update;
   api->get_key = &sqlite_plugin_get_key;
   api->get_replication = &sqlite_plugin_get_replication;
   api->get_expiration = &sqlite_plugin_get_expiration;
diff --git a/src/datastore/plugin_datastore_template.c 
b/src/datastore/plugin_datastore_template.c
index 8e44f020d..704d586bc 100644
--- a/src/datastore/plugin_datastore_template.c
+++ b/src/datastore/plugin_datastore_template.c
@@ -62,6 +62,7 @@ template_plugin_estimate_size (void *cls, unsigned long long 
*estimate)
  *
  * @param cls closure
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in data
  * @param data content stored
  * @param type type of the content
@@ -73,11 +74,17 @@ template_plugin_estimate_size (void *cls, unsigned long 
long *estimate)
  * @param cont_cls continuation closure
  */
 static void
-template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t 
size,
-                     const void *data, enum GNUNET_BLOCK_Type type,
-                     uint32_t priority, uint32_t anonymity,
+template_plugin_put (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     bool absent,
+                     uint32_t size,
+                     const void *data,
+                     enum GNUNET_BLOCK_Type type,
+                     uint32_t priority,
+                     uint32_t anonymity,
                      uint32_t replication,
-                     struct GNUNET_TIME_Absolute expiration, PluginPutCont 
cont,
+                     struct GNUNET_TIME_Absolute expiration,
+                     PluginPutCont cont,
                      void *cont_cls)
 {
   GNUNET_break (0);
@@ -151,39 +158,6 @@ template_plugin_get_expiration (void *cls, 
PluginDatumProcessor proc,
 
 
 /**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-template_plugin_update (void *cls,
-                        uint64_t uid,
-                        uint32_t priority,
-                        uint32_t replication,
-                        struct GNUNET_TIME_Absolute expire,
-                        PluginUpdateCont cont,
-                        void *cont_cls)
-{
-  GNUNET_break (0);
-  cont (cont_cls, GNUNET_SYSERR, "not implemented");
-}
-
-
-/**
  * Call the given processor on an item with zero anonymity.
  *
  * @param cls our "struct Plugin*"
@@ -248,7 +222,6 @@ libgnunet_plugin_datastore_template_init (void *cls)
   api->cls = plugin;
   api->estimate_size = &template_plugin_estimate_size;
   api->put = &template_plugin_put;
-  api->update = &template_plugin_update;
   api->get_key = &template_plugin_get_key;
   api->get_replication = &template_plugin_get_replication;
   api->get_expiration = &template_plugin_get_expiration;
diff --git a/src/datastore/test_plugin_datastore.c 
b/src/datastore/test_plugin_datastore.c
index 1867d6755..0c34a5f66 100644
--- a/src/datastore/test_plugin_datastore.c
+++ b/src/datastore/test_plugin_datastore.c
@@ -49,7 +49,6 @@ enum RunPhase
   RP_ERROR = 0,
   RP_PUT,
   RP_GET,
-  RP_UPDATE,
   RP_ITER_ZERO,
   RP_REPL_GET,
   RP_EXPI_GET,
@@ -168,8 +167,13 @@ do_put (struct CpsRunContext *crc)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "putting type %u, anon %u under key %s\n", i + 1, i,
              GNUNET_h2s (&key));
-  crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ ,
-                 prio, i /* anonymity */ ,
+  crc->api->put (crc->api->cls,
+                 &key,
+                 false /* absent */,
+                 size,
+                 value, i + 1 /* type */ ,
+                 prio,
+                 i /* anonymity */ ,
                  0 /* replication */ ,
                  GNUNET_TIME_relative_to_absolute
                    (GNUNET_TIME_relative_multiply
@@ -177,7 +181,8 @@ do_put (struct CpsRunContext *crc)
                       60 * 60 * 60 * 1000 +
                       GNUNET_CRYPTO_random_u32
                       (GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
-                 put_continuation, crc);
+                 put_continuation,
+                 crc);
   i++;
 }
 
@@ -264,19 +269,6 @@ cleaning_task (void *cls)
 
 
 static void
-update_continuation (void *cls,
-                    int status,
-                    const char *msg)
-{
-  struct CpsRunContext *crc = cls;
-
-  GNUNET_assert (GNUNET_OK == status);
-  crc->phase++;
-  GNUNET_SCHEDULER_add_now (&test, crc);
-}
-
-
-static void
 test (void *cls)
 {
   struct CpsRunContext *crc = cls;
@@ -316,16 +308,6 @@ test (void *cls)
                        &iterate_one_shot,
                        crc);
     break;
-  case RP_UPDATE:
-    crc->api->update (crc->api->cls,
-                      guid,
-                      1,
-                      1,
-                      GNUNET_TIME_UNIT_ZERO_ABS,
-                      &update_continuation,
-                      crc);
-    break;
-
   case RP_ITER_ZERO:
     if (crc->cnt == 1)
     {
diff --git a/src/include/gnunet_datastore_plugin.h 
b/src/include/gnunet_datastore_plugin.h
index 516ba525c..28c8241b1 100644
--- a/src/include/gnunet_datastore_plugin.h
+++ b/src/include/gnunet_datastore_plugin.h
@@ -134,7 +134,8 @@ typedef void
  * @param cls closure
  * @param key key for the item stored
  * @param size size of the item stored
- * @param status #GNUNET_OK or #GNUNET_SYSERROR
+ * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
+ *        or #GNUNET_SYSERROR if error
  * @param msg error message on error
  */
 typedef void
@@ -152,6 +153,7 @@ typedef void
  *
  * @param cls closure
  * @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
  * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
@@ -165,15 +167,16 @@ typedef void
 typedef void
 (*PluginPut) (void *cls,
               const struct GNUNET_HashCode *key,
-             uint32_t size,
-             const void *data,
-             enum GNUNET_BLOCK_Type type,
-             uint32_t priority,
-             uint32_t anonymity,
-             uint32_t replication,
-             struct GNUNET_TIME_Absolute expiration,
-             PluginPutCont cont,
-             void *cont_cls);
+              bool absent,
+              uint32_t size,
+              const void *data,
+              enum GNUNET_BLOCK_Type type,
+              uint32_t priority,
+              uint32_t anonymity,
+              uint32_t replication,
+              struct GNUNET_TIME_Absolute expiration,
+              PluginPutCont cont,
+              void *cont_cls);
 
 
 /**
@@ -248,48 +251,6 @@ typedef void
 
 
 /**
- * Update continuation.
- *
- * @param cls closure
- * @param status #GNUNET_OK or #GNUNET_SYSERR
- * @param msg error message on error
- */
-typedef void
-(*PluginUpdateCont) (void *cls,
-                    int status,
-                    const char *msg);
-
-
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore.  If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept.  The specified priority and replication is added
- * to the existing value.
- *
- * @param cls closure
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- *     change?
- * @param replication by how much should the replication
- *     change?
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-typedef void
-(*PluginUpdate) (void *cls,
-                 uint64_t uid,
-                 uint32_t priority,
-                 uint32_t replication,
-                 struct GNUNET_TIME_Absolute expire,
-                 PluginUpdateCont cont,
-                 void *cont_cls);
-
-
-/**
  * Select a single item from the datastore (among those applicable).
  *
  * @param cls closure
@@ -342,16 +303,6 @@ struct GNUNET_DATASTORE_PluginFunctions
   PluginPut put;
 
   /**
-   * Update the priority for a particular key in the datastore.  If
-   * the expiration time in value is different than the time found in
-   * the datastore, the higher value should be kept.  For the
-   * anonymity level, the lower value is to be used.  The specified
-   * priority should be added to the existing priority, ignoring the
-   * priority in value.
-   */
-  PluginUpdate update;
-
-  /**
    * Get a particular datum matching a given hash from the datastore.
    */
   PluginGetKey get_key;

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]