gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36096 - in gnunet/src: include psyc psycstore


From: gnunet
Subject: [GNUnet-SVN] r36096 - in gnunet/src: include psyc psycstore
Date: Sat, 18 Jul 2015 02:03:06 +0200

Author: tg
Date: 2015-07-18 02:03:06 +0200 (Sat, 18 Jul 2015)
New Revision: 36096

Modified:
   gnunet/src/include/gnunet_psycstore_plugin.h
   gnunet/src/include/gnunet_psycstore_service.h
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc_util_lib.c
   gnunet/src/psycstore/Makefile.am
   gnunet/src/psycstore/gnunet-service-psycstore.c
   gnunet/src/psycstore/plugin_psycstore_sqlite.c
   gnunet/src/psycstore/psycstore.h
   gnunet/src/psycstore/psycstore_api.c
   gnunet/src/psycstore/test_plugin_psycstore.c
   gnunet/src/psycstore/test_psycstore.c
Log:
psyc/store: apply state modifiers

Modified: gnunet/src/include/gnunet_psycstore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_plugin.h        2015-07-18 00:03:00 UTC 
(rev 36095)
+++ gnunet/src/include/gnunet_psycstore_plugin.h        2015-07-18 00:03:06 UTC 
(rev 36096)
@@ -240,9 +240,10 @@
    * @return #GNUNET_OK on success, else #GNUNET_SYSERR
    */
   int
-  (*state_modify_set) (void *cls,
-                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                       const char *name, const void *value, size_t value_size);
+  (*state_modify_op) (void *cls,
+                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                      enum GNUNET_ENV_Operator op,
+                      const char *name, const void *value, size_t value_size);
 
 
   /**
@@ -270,11 +271,11 @@
                          const struct GNUNET_CRYPTO_EddsaPublicKey 
*channel_key);
 
   /**
-   * Set the value of a state variable while synchronizing state.
+   * Assign value of a state variable while synchronizing state.
    *
    * The state synchronization process is started with state_sync_begin(),
    * which is followed by one or more calls to this function,
-   * and finished with state_sync_end().
+   * and finished using state_sync_end().
    *
    * @see GNUNET_PSYCSTORE_state_sync()
    *
@@ -281,9 +282,9 @@
    * @return #GNUNET_OK on success, else #GNUNET_SYSERR
    */
   int
-  (*state_sync_set) (void *cls,
-                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                     const char *name, const void *value, size_t value_size);
+  (*state_sync_assign) (void *cls,
+                        const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                        const char *name, const void *value, size_t 
value_size);
 
 
   /**
@@ -296,7 +297,8 @@
   int
   (*state_sync_end) (void *cls,
                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                     uint64_t message_id);
+                     uint64_t max_state_message_id,
+                     uint64_t state_hash_message_id);
 
 
   /**

Modified: gnunet/src/include/gnunet_psycstore_service.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_service.h       2015-07-18 00:03:00 UTC 
(rev 36095)
+++ gnunet/src/include/gnunet_psycstore_service.h       2015-07-18 00:03:06 UTC 
(rev 36096)
@@ -494,10 +494,6 @@
  *        ID of the message that contains the @a modifiers.
  * @param state_delta
  *        Value of the @e state_delta PSYC header variable of the message.
- * @param modifier_count
- *        Number of elements in the @a modifiers array.
- * @param modifiers
- *        List of modifiers to apply.
  * @param rcb
  *        Callback to call with the result of the operation.
  * @param rcb_cls
@@ -510,8 +506,6 @@
                                const struct GNUNET_CRYPTO_EddsaPublicKey 
*channel_key,
                                uint64_t message_id,
                                uint64_t state_delta,
-                               size_t modifier_count,
-                               const struct GNUNET_ENV_Modifier *modifiers,
                                GNUNET_PSYCSTORE_ResultCallback rcb,
                                void *rcb_cls);
 
@@ -523,7 +517,9 @@
  *        Handle for the PSYCstore.
  * @param channel_key
  *        The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ *        ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
  *        ID of the message that contains the state_hash PSYC header variable.
  * @param modifier_count
  *        Number of elements in the @a modifiers array.
@@ -539,7 +535,8 @@
 struct GNUNET_PSYCSTORE_OperationHandle *
 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
                              const struct GNUNET_CRYPTO_EddsaPublicKey 
*channel_key,
-                             uint64_t message_id,
+                             uint64_t max_state_message_id,
+                             uint64_t state_hash_message_id,
                              size_t modifier_count,
                              const struct GNUNET_ENV_Modifier *modifiers,
                              GNUNET_PSYCSTORE_ResultCallback rcb,

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2015-07-18 00:03:00 UTC (rev 
36095)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2015-07-18 00:03:06 UTC (rev 
36096)
@@ -416,6 +416,8 @@
 static void
 transmit_message (struct Channel *chn);
 
+static uint64_t
+message_queue_run (struct Channel *chn);
 
 static uint64_t
 message_queue_drop (struct Channel *chn);
@@ -1274,6 +1276,39 @@
 }
 
 
+struct StateModifyClosure
+{
+  struct Channel *chn;
+  struct FragmentQueue *fragq;
+  uint64_t message_id;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+                                const char *err_msg, uint16_t err_msg_size)
+{
+  struct StateModifyClosure *mcls = cls;
+  struct Channel *chn = mcls->chn;
+  struct FragmentQueue *fragq = mcls->fragq;
+  uint64_t msg_id = mcls->message_id;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " 
(%.*s)\n",
+              chn, result, err_msg_size, err_msg);
+
+  if (GNUNET_OK == result)
+  {
+    chn->max_state_message_id = msg_id;
+    chn->max_message_id = msg_id;
+
+    fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == 
fragq->state);
+    GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+    message_queue_run (chn);
+  }
+}
+
+
 /**
  * Run message queue.
  *
@@ -1294,6 +1329,7 @@
               "%p Running message queue.\n", chn);
   uint64_t n = 0;
   uint64_t msg_id;
+
   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
                                                     &msg_id))
   {
@@ -1325,7 +1361,7 @@
                       "%p Out of order message. "
                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
                       chn, msg_id, chn->max_message_id);
-          break;
+          continue;
         }
       }
       else
@@ -1336,14 +1372,19 @@
                       "%p Out of order stateful message. "
                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
                       chn, msg_id, fragq->state_delta, 
chn->max_state_message_id);
-          break;
+          continue;
         }
-#if TODO
-        /* FIXME: apply modifiers to state in PSYCstore */
-        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
-                                       store_recv_state_modify_result, cls);
-#endif
-        chn->max_state_message_id = msg_id;
+
+        struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+        mcls->chn = chn;
+        mcls->fragq = fragq;
+        mcls->message_id = msg_id;
+
+        /* Apply modifiers to state in PSYCstore */
+        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+                                       fragq->state_delta,
+                                       store_recv_state_modify_result, mcls);
+        break;
       }
       chn->max_message_id = msg_id;
     }
@@ -1351,6 +1392,7 @@
     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
   return n;
@@ -2039,6 +2081,11 @@
     {
       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
     }
+
+    if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+    {
+      /// @todo add state_hash to PSYC header
+    }
   }
 }
 

Modified: gnunet/src/psyc/psyc_util_lib.c
===================================================================
--- gnunet/src/psyc/psyc_util_lib.c     2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psyc/psyc_util_lib.c     2015-07-18 00:03:06 UTC (rev 36096)
@@ -343,7 +343,7 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing message part of type %u and size %u (end: %u)).\n",
-       ntohs (msg->type), size, end);
+       NULL != msg ? ntohs (msg->type) : 0, size, end);
 
   if (NULL != tmit->msg)
   {
@@ -917,7 +917,8 @@
     }
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part from PSYC.\n");
+                "Received message part of type %u and size %u from PSYC.\n",
+                ptype, psize);
     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
 
     switch (ptype)
@@ -1118,7 +1119,7 @@
                   ptype, psize);
       return GNUNET_SYSERR;
     }
-    /* FIXME: check message part order */
+    /** @todo FIXME: check message part order */
   }
   return parts;
 }

Modified: gnunet/src/psycstore/Makefile.am
===================================================================
--- gnunet/src/psycstore/Makefile.am    2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psycstore/Makefile.am    2015-07-18 00:03:06 UTC (rev 36096)
@@ -49,6 +49,7 @@
 gnunet_service_psycstore_LDADD = \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/psyc/libgnunetpsycutil.la \
   $(GN_LIBINTL)
 
 plugin_LTLIBRARIES = \

Modified: gnunet/src/psycstore/gnunet-service-psycstore.c
===================================================================
--- gnunet/src/psycstore/gnunet-service-psycstore.c     2015-07-18 00:03:00 UTC 
(rev 36095)
+++ gnunet/src/psycstore/gnunet-service-psycstore.c     2015-07-18 00:03:06 UTC 
(rev 36096)
@@ -32,6 +32,7 @@
 #include "gnunet_constants.h"
 #include "gnunet_protocols.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_psyc_util_lib.h"
 #include "gnunet_psycstore_service.h"
 #include "gnunet_psycstore_plugin.h"
 #include "psycstore.h"
@@ -493,8 +494,137 @@
 }
 
 
-/** @todo FIXME: stop processing further state modify messages after an error 
*/
+struct StateModifyClosure
+{
+  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
+  struct GNUNET_PSYC_ReceiveHandle *recv;
+  enum GNUNET_PSYC_MessageState msg_state;
+  char mod_oper;
+  char *mod_name;
+  char *mod_value;
+  uint64_t mod_value_size;
+  uint64_t mod_value_remaining;
+};
+
+
 static void
+recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
+                         uint32_t flags, const struct GNUNET_MessageHeader 
*msg)
+{
+  struct StateModifyClosure *scls = cls;
+  uint16_t psize;
+  if (NULL == msg)
+  {
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+    return;
+  }
+
+  switch (ntohs (msg->type))
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+  {
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
+    break;
+  }
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+  {
+    struct GNUNET_PSYC_MessageModifier *
+      pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
+    psize = ntohs (pmod->header.size);
+    uint16_t name_size = ntohs (pmod->name_size);
+    uint16_t value_size = ntohs (pmod->value_size);
+
+    const char *name = (const char *) &pmod[1];
+    const void *value = name + name_size;
+
+    if (GNUNET_ENV_OP_SET != pmod->oper)
+    { // Apply non-transient operation.
+      if (psize == sizeof (*pmod) + name_size + value_size)
+      {
+        db->state_modify_op (db->cls, scls->channel_key,
+                             pmod->oper, name, value, value_size);
+      }
+      else
+      {
+        scls->mod_oper = pmod->oper;
+        scls->mod_name = GNUNET_malloc (name_size);
+        memcpy (scls->mod_name, name, name_size);
+
+        scls->mod_value_size = value_size;
+        scls->mod_value = GNUNET_malloc (scls->mod_value_size);
+        scls->mod_value_remaining
+          = scls->mod_value_size - (psize - sizeof (*pmod) - name_size);
+        memcpy (scls->mod_value, value, value_size - 
scls->mod_value_remaining);
+      }
+    }
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+    break;
+  }
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+    if (GNUNET_ENV_OP_SET != scls->mod_oper)
+    {
+      if (scls->mod_value_remaining == 0)
+      {
+        GNUNET_break_op (0);
+        scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+      }
+      psize = ntohs (msg->size);
+      memcpy (scls->mod_value + (scls->mod_value_size - 
scls->mod_value_remaining),
+              &msg[1], psize - sizeof (*msg));
+      scls->mod_value_remaining -= psize - sizeof (*msg);
+      if (0 == scls->mod_value_remaining)
+      {
+        db->state_modify_op (db->cls, scls->channel_key,
+                             scls->mod_oper, scls->mod_name,
+                             scls->mod_value, scls->mod_value_size);
+        GNUNET_free (scls->mod_name);
+        GNUNET_free (scls->mod_value);
+        scls->mod_oper = 0;
+        scls->mod_name = NULL;
+        scls->mod_value = NULL;
+        scls->mod_value_size = 0;
+      }
+    }
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
+    break;
+
+  default:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+  }
+}
+
+
+static int
+recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
+                     enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+  struct StateModifyClosure *scls = cls;
+
+  if (NULL == scls->recv)
+  {
+    scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part,
+                                             scls);
+  }
+
+  const struct GNUNET_PSYC_MessageHeader *
+    pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1];
+  GNUNET_PSYC_receive_message (scls->recv, pmsg);
+
+  return GNUNET_YES;
+}
+
+
+static void
 handle_state_modify (void *cls,
                      struct GNUNET_SERVER_Client *client,
                      const struct GNUNET_MessageHeader *msg)
@@ -502,65 +632,36 @@
   const struct StateModifyRequest *req
     = (const struct StateModifyRequest *) msg;
 
-  int ret = GNUNET_SYSERR;
-  const char *name = (const char *) &req[1];
-  uint16_t name_size = ntohs (req->name_size);
+  uint64_t message_id = GNUNET_ntohll (req->message_id);
+  uint64_t state_delta = GNUNET_ntohll (req->state_delta);
+  uint64_t ret_frags = 0;
 
-  if (name_size <= 2 || '\0' != name[name_size - 1])
+  struct StateModifyClosure scls = { 0 };
+
+  if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
+                                           message_id, state_delta))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Tried to set invalid state variable name!\n"));
-    GNUNET_break_op (0);
+                _("Failed to begin modifying state!\n"));
+    GNUNET_break (0);
   }
-  else
+
+  int ret = db->message_get (db->cls, &req->channel_key,
+                             message_id, message_id,
+                             &ret_frags, &recv_state_fragment, &scls);
+
+  if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, 
message_id))
   {
-    ret = GNUNET_OK;
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("Failed to end modifying state!\n"));
+    GNUNET_break (0);
+  }
 
-    if (req->flags & STATE_OP_FIRST)
-    {
-      ret = db->state_modify_begin (db->cls, &req->channel_key,
-                                    GNUNET_ntohll (req->message_id),
-                                    GNUNET_ntohll (req->state_delta));
-    }
-    if (ret != GNUNET_OK)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  _("Failed to begin modifying state!\n"));
-    }
-    else
-    {
-      switch (req->oper)
-      {
-      case GNUNET_ENV_OP_ASSIGN:
-        ret = db->state_modify_set (db->cls, &req->channel_key,
-                                    (const char *) &req[1],
-                                    name + ntohs (req->name_size),
-                                    ntohs (req->header.size) - sizeof (*req)
-                                    - ntohs (req->name_size));
-        break;
-      default:
-#if TODO
-        ret = GNUNET_ENV_operation ((const char *) &req[1],
-                                    current_value, current_value_size,
-                                    req->oper, name + ntohs (req->name_size),
-                                    ntohs (req->header.size) - sizeof (*req)
-                                    - ntohs (req->name_size), &value, 
&value_size);
-#endif
-        ret = GNUNET_SYSERR;
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    _("Unknown operator: %c\n"), req->oper);
-      }
-    }
+  if (NULL != scls.recv)
+  {
+    GNUNET_PSYC_receive_destroy (scls.recv);
+  }
 
-    if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
-    {
-      ret = db->state_modify_end (db->cls, &req->channel_key,
-                                  GNUNET_ntohll (req->message_id));
-      if (ret != GNUNET_OK)
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    _("Failed to end modifying state!\n"));
-    }
-  }
   send_result_code (client, req->op_id, ret, NULL);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -600,16 +701,17 @@
     }
     else
     {
-      ret = db->state_sync_set (db->cls, &req->channel_key, name,
-                                name + ntohs (req->name_size),
-                                ntohs (req->header.size) - sizeof (*req)
-                                - ntohs (req->name_size));
+      ret = db->state_sync_assign (db->cls, &req->channel_key, name,
+                                   name + ntohs (req->name_size),
+                                   ntohs (req->header.size) - sizeof (*req)
+                                   - ntohs (req->name_size));
     }
 
     if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
     {
       ret = db->state_sync_end (db->cls, &req->channel_key,
-                                GNUNET_ntohll (req->message_id));
+                                GNUNET_ntohll (req->max_state_message_id),
+                                GNUNET_ntohll (req->state_hash_message_id));
       if (ret != GNUNET_OK)
         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                     _("Failed to end synchronizing state!\n"));

Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c      2015-07-18 00:03:00 UTC 
(rev 36095)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c      2015-07-18 00:03:06 UTC 
(rev 36096)
@@ -35,6 +35,7 @@
 #include "gnunet_psycstore_service.h"
 #include "gnunet_multicast_service.h"
 #include "gnunet_crypto_lib.h"
+#include "gnunet_env_lib.h"
 #include "psycstore.h"
 #include <sqlite3.h>
 
@@ -172,15 +173,9 @@
    */
   sqlite3_stmt *update_max_state_message_id;
 
-
   /**
-   * Precompiled SQL for message_modify_begin()
+   * Precompiled SQL for state_modify_op()
    */
-  sqlite3_stmt *select_message_state_delta;
-
-  /**
-   * Precompiled SQL for state_modify_set()
-   */
   sqlite3_stmt *insert_state_current;
 
   /**
@@ -353,8 +348,8 @@
             "CREATE TABLE IF NOT EXISTS channels (\n"
             "  id INTEGER PRIMARY KEY,\n"
             "  pub_key BLOB UNIQUE,\n"
-            "  max_state_message_id INTEGER,\n"
-            "  state_hash_message_id INTEGER\n"
+            "  max_state_message_id INTEGER,\n" // last applied state message 
ID
+            "  state_hash_message_id INTEGER\n" // last message ID with a 
state hash
             ");");
 
   sql_exec (plugin->dbh,
@@ -543,17 +538,6 @@
                &plugin->update_state_hash_message_id);
 
   sql_prepare (plugin->dbh,
-               "SELECT 1\n"
-               "FROM channels AS c\n"
-               "LEFT JOIN messages AS m\n"
-               "ON c.id = m.channel_id\n"
-               "WHERE c.pub_key = ?\n"
-               "      AND ((? < c.state_hash_message_id AND 
c.state_hash_message_id < ?)\n"
-               "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
-               "LIMIT 1;",
-               &plugin->select_message_state_delta);
-
-  sql_prepare (plugin->dbh,
                "INSERT OR REPLACE INTO state\n"
                "  (channel_id, name, value_current, value_signed)\n"
                "SELECT new.channel_id, new.name,\n"
@@ -1447,14 +1431,14 @@
 
 
 /**
- * Set a state variable to the given value.
+ * Assign a value to a state variable.
  *
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
-           const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-           const char *name, const void *value, size_t value_size)
+state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
+              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+              const char *name, const void *value, size_t value_size)
 {
   int ret = GNUNET_SYSERR;
 
@@ -1527,50 +1511,25 @@
                     uint64_t message_id, uint64_t state_delta)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt = plugin->select_message_state_delta;
 
   if (state_delta > 0)
   {
-    int ret = GNUNET_SYSERR;
-    if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
-                                        sizeof (*channel_key), SQLITE_STATIC)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
-                                            message_id - state_delta)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
-                                            message_id)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
-                                            message_id - state_delta)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
-                                            
GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
-    {
-      LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sqlite3_bind");
-    }
-    else
-    {
-      switch (sqlite3_step (stmt))
-      {
-      case SQLITE_DONE:
-        ret = GNUNET_NO;
-        break;
-      case SQLITE_ROW:
-        ret = GNUNET_OK;
-        break;
-      default:
-        LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                    "sqlite3_step");
-      }
-    }
-    if (SQLITE_OK != sqlite3_reset (stmt))
-    {
-      ret = GNUNET_SYSERR;
-      LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sqlite3_reset");
-     }
+    /**
+     * We can only apply state modifiers in the current message if modifiers in
+     * the previous stateful message (message_id - state_delta) were already
+     * applied.
+     */
+
+    uint64_t max_state_message_id = 0;
+    int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
     if (GNUNET_OK != ret)
       return ret;
+
+    if (message_id - state_delta != max_state_message_id)
+      return GNUNET_NO;
   }
 
+  // Make sure no other transaction is going on.
   if (TRANSACTION_NONE != plugin->transaction)
       if (GNUNET_OK != transaction_rollback (plugin))
           return GNUNET_SYSERR;
@@ -1587,16 +1546,24 @@
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_modify_set (void *cls,
-                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                  const char *name, const void *value, size_t value_size)
+state_modify_op (void *cls,
+                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                 enum GNUNET_ENV_Operator op,
+                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
 
-  return state_set (plugin, plugin->insert_state_current, channel_key,
-                    name, value, value_size);
+  switch (op)
+  {
+  case GNUNET_ENV_OP_ASSIGN:
+    return state_assign (plugin, plugin->insert_state_current, channel_key,
+                         name, value, value_size);
 
+  /// @todo implement more state operations
+  default:
+    return GNUNET_SYSERR;
+  }
 }
 
 
@@ -1634,7 +1601,7 @@
 
 
 /**
- * Set the current value of state variable.
+ * Assign current value of a state variable.
  *
  * @see GNUNET_PSYCSTORE_state_modify()
  *
@@ -1641,13 +1608,13 @@
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_sync_set (void *cls,
+state_sync_assign (void *cls,
                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_set (cls, plugin->insert_state_sync, channel_key,
-                    name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync, channel_key,
+                       name, value, value_size);
 }
 
 
@@ -1657,7 +1624,8 @@
 static int
 state_sync_end (void *cls,
                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                uint64_t message_id)
+                uint64_t max_state_message_id,
+                uint64_t state_hash_message_id)
 {
   struct Plugin *plugin = cls;
   int ret = GNUNET_SYSERR;
@@ -1670,7 +1638,10 @@
                                   channel_key)
     && GNUNET_OK == update_message_id (plugin,
                                        plugin->update_state_hash_message_id,
-                                       channel_key, message_id)
+                                       channel_key, state_hash_message_id)
+    && GNUNET_OK == update_message_id (plugin,
+                                       plugin->update_max_state_message_id,
+                                       channel_key, max_state_message_id)
     && GNUNET_OK == transaction_commit (plugin)
     ? ret = GNUNET_OK
     : transaction_rollback (plugin);
@@ -1679,7 +1650,7 @@
 
 
 /**
- * Reset the state of a channel.
+ * Delete the whole state.
  *
  * @see GNUNET_PSYCSTORE_state_reset()
  *
@@ -1922,10 +1893,10 @@
   api->counters_message_get = &counters_message_get;
   api->counters_state_get = &counters_state_get;
   api->state_modify_begin = &state_modify_begin;
-  api->state_modify_set = &state_modify_set;
+  api->state_modify_op = &state_modify_op;
   api->state_modify_end = &state_modify_end;
   api->state_sync_begin = &state_sync_begin;
-  api->state_sync_set = &state_sync_set;
+  api->state_sync_assign = &state_sync_assign;
   api->state_sync_end = &state_sync_end;
   api->state_reset = &state_reset;
   api->state_update_signed = &state_update_signed;

Modified: gnunet/src/psycstore/psycstore.h
===================================================================
--- gnunet/src/psycstore/psycstore.h    2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psycstore/psycstore.h    2015-07-18 00:03:06 UTC (rev 36096)
@@ -441,35 +441,24 @@
   struct GNUNET_MessageHeader header;
 
   /**
-   * Size of name, including NUL terminator.
+   * Operation ID.
    */
-  uint16_t name_size GNUNET_PACKED;
+  uint64_t op_id GNUNET_PACKED;
 
   /**
-   * OR'd StateOpFlags
+   * ID of the message to apply the state changes in.
    */
-  uint8_t flags;
+  uint64_t message_id GNUNET_PACKED;
 
   /**
-   * enum GNUNET_ENV_Operator
+   * State delta of the message with ID @a message_id.
    */
-  uint8_t oper;
+  uint64_t state_delta GNUNET_PACKED;
 
   /**
-   * Operation ID.
-   */
-  uint64_t op_id GNUNET_PACKED;
-
-  /**
    * Channel's public key.
    */
   struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
-
-  uint64_t message_id GNUNET_PACKED;
-
-  uint64_t state_delta GNUNET_PACKED;
-
-  /* Followed by NUL-terminated name, then the value. */
 };
 
 
@@ -495,8 +484,6 @@
 
   uint8_t reserved;
 
-  uint64_t message_id GNUNET_PACKED;
-
   /**
    * Operation ID.
    */
@@ -503,6 +490,16 @@
   uint64_t op_id GNUNET_PACKED;
 
   /**
+   * ID of the message that contains the state_hash PSYC header variable.
+   */
+  uint64_t state_hash_message_id GNUNET_PACKED;
+
+  /**
+   * ID of the last stateful message before @a state_hash_message_id.
+   */
+  uint64_t max_state_message_id GNUNET_PACKED;
+
+  /**
    * Channel's public key.
    */
   struct GNUNET_CRYPTO_EddsaPublicKey channel_key;

Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c        2015-07-18 00:03:00 UTC (rev 
36095)
+++ gnunet/src/psycstore/psycstore_api.c        2015-07-18 00:03:06 UTC (rev 
36096)
@@ -302,16 +302,9 @@
       GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
       if (NULL != op->res_cb)
       {
-        const struct StateModifyRequest *smreq;
         const struct StateSyncRequest *ssreq;
         switch (ntohs (op->msg->type))
         {
-        case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
-          smreq = (const struct StateModifyRequest *) op->msg;
-          if (!(smreq->flags & STATE_OP_LAST
-                || GNUNET_OK != result_code))
-            op->res_cb = NULL;
-          break;
         case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
           ssreq = (const struct StateSyncRequest *) op->msg;
           if (!(ssreq->flags & STATE_OP_LAST
@@ -1234,10 +1227,6 @@
  *        ID of the message that contains the @a modifiers.
  * @param state_delta
  *        Value of the _state_delta PSYC header variable of the message.
- * @param modifier_count
- *        Number of elements in the @a modifiers array.
- * @param modifiers
- *        List of modifiers to apply.
  * @param rcb
  *        Callback to call with the result of the operation.
  * @param rcb_cls
@@ -1250,50 +1239,31 @@
                                const struct GNUNET_CRYPTO_EddsaPublicKey 
*channel_key,
                                uint64_t message_id,
                                uint64_t state_delta,
-                               size_t modifier_count,
-                               const struct GNUNET_ENV_Modifier *modifiers,
                                GNUNET_PSYCSTORE_ResultCallback rcb,
                                void *rcb_cls)
 {
   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
-  size_t i;
+  struct StateModifyRequest *req;
 
-  for (i = 0; i < modifier_count; i++) {
-    struct StateModifyRequest *req;
-    uint16_t name_size = strlen (modifiers[i].name) + 1;
+  op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+  op->h = h;
+  op->res_cb = rcb;
+  op->cls = rcb_cls;
 
-    op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
-                        modifiers[i].value_size);
-    op->h = h;
-    op->res_cb = rcb;
-    op->cls = rcb_cls;
+  req = (struct StateModifyRequest *) &op[1];
+  op->msg = (struct GNUNET_MessageHeader *) req;
+  req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
+  req->header.size = htons (sizeof (*req));
+  req->channel_key = *channel_key;
+  req->message_id = GNUNET_htonll (message_id);
+  req->state_delta = GNUNET_htonll (state_delta);
 
-    req = (struct StateModifyRequest *) &op[1];
-    op->msg = (struct GNUNET_MessageHeader *) req;
-    req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
-    req->header.size = htons (sizeof (*req) + name_size
-                              + modifiers[i].value_size);
-    req->channel_key = *channel_key;
-    req->message_id = GNUNET_htonll (message_id);
-    req->state_delta = GNUNET_htonll (state_delta);
-    req->oper = modifiers[i].oper;
-    req->name_size = htons (name_size);
-    req->flags
-      = 0 == i
-      ? STATE_OP_FIRST
-      : modifier_count - 1 == i
-      ? STATE_OP_LAST
-      : 0;
+  op->op_id = get_next_op_id (h);
+  req->op_id = GNUNET_htonll (op->op_id);
 
-    memcpy (&req[1], modifiers[i].name, name_size);
-    memcpy ((char *) &req[1] + name_size, modifiers[i].value, 
modifiers[i].value_size);
+  GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+  transmit_next (h);
 
-    op->op_id = get_next_op_id (h);
-    req->op_id = GNUNET_htonll (op->op_id);
-
-    GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
-    transmit_next (h);
-  }
   return op;
   /* FIXME: only the last operation is returned,
    *        operation_cancel() should be able to cancel all of them.
@@ -1308,7 +1278,9 @@
  *        Handle for the PSYCstore.
  * @param channel_key
  *        The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ *        ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
  *        ID of the message that contains the state_hash PSYC header variable.
  * @param modifier_count
  *        Number of elements in the @a modifiers array.
@@ -1324,7 +1296,8 @@
 struct GNUNET_PSYCSTORE_OperationHandle *
 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
                              const struct GNUNET_CRYPTO_EddsaPublicKey 
*channel_key,
-                             uint64_t message_id,
+                             uint64_t max_state_message_id,
+                             uint64_t state_hash_message_id,
                              size_t modifier_count,
                              const struct GNUNET_ENV_Modifier *modifiers,
                              GNUNET_PSYCSTORE_ResultCallback rcb,
@@ -1349,7 +1322,8 @@
     req->header.size = htons (sizeof (*req) + name_size
                               + modifiers[i].value_size);
     req->channel_key = *channel_key;
-    req->message_id = GNUNET_htonll (message_id);
+    req->max_state_message_id = GNUNET_htonll (max_state_message_id);
+    req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
     req->name_size = htons (name_size);
     req->flags
       = (0 == i)

Modified: gnunet/src/psycstore/test_plugin_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_plugin_psycstore.c        2015-07-18 00:03:00 UTC 
(rev 36095)
+++ gnunet/src/psycstore/test_plugin_psycstore.c        2015-07-18 00:03:06 UTC 
(rev 36096)
@@ -85,7 +85,7 @@
   struct GNUNET_PSYCSTORE_PluginFunctions *ret;
   char *libname;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' psycstore plugin\n"),
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Loading `%s' psycstore plugin\n"),
               plugin_name);
   GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name);
   if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg)))
@@ -306,15 +306,17 @@
 
   message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
   GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, 
&channel_pub_key,
-                                                      message_id, 1));
+                                                      message_id, 0));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_foo",
-                                                    C2ARG("one two three")));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_foo",
+                                                   C2ARG("one two three")));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_foo_bar", slave_key,
-                                                    sizeof (*slave_key)));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_foo_bar", slave_key,
+                                                   sizeof (*slave_key)));
 
   GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
                                                     message_id));
@@ -366,15 +368,16 @@
 
   GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls, 
&channel_pub_key));
 
-  GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
-                                                  "_sync_bar", scls.value[0],
-                                                  scls.value_size[0]));
+  GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+                                                     "_sync_bar", 
scls.value[0],
+                                                     scls.value_size[0]));
 
-  GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
-                                                  "_sync_foo", scls.value[1],
-                                                  scls.value_size[1]));
+  GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+                                                     "_sync_foo", 
scls.value[1],
+                                                     scls.value_size[1]));
 
   GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key,
+                                                  max_state_msg_id,
                                                   INT64_MAX - 5));
 
   GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key,
@@ -394,11 +397,13 @@
 
   message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
   GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, 
&channel_pub_key,
-                                                      message_id, 3));
+                                                      message_id,
+                                                      message_id - 
max_state_msg_id));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_sync_foo",
-                                                    C2ARG("five six seven")));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_sync_foo",
+                                                   C2ARG("five six seven")));
 
   GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
                                                     message_id));

Modified: gnunet/src/psycstore/test_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_psycstore.c       2015-07-18 00:03:00 UTC (rev 
36095)
+++ gnunet/src/psycstore/test_psycstore.c       2015-07-18 00:03:06 UTC (rev 
36096)
@@ -224,8 +224,8 @@
   scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
 
   scls.name[1] = "_sync_foo";
-  scls.value[1] = "one two three";
-  scls.value_size[1] = sizeof ("one two three") - 1;
+  scls.value[1] = "three two one";
+  scls.value_size[1] = sizeof ("three two one") - 1;
 
   op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
                                           &state_result,
@@ -253,11 +253,11 @@
   GNUNET_assert (result == 1);
 
   scls.n = 0;
-  scls.name[0] = "_bar";
-  scls.value[0] = "four five six";
-  scls.value_size[0] = sizeof ("four five six") - 1;
+  scls.name[0] = "_sync_bar";
+  scls.value[0] = "ten eleven twelve";
+  scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
 
-  op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz",
+  op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_sync_bar_x_yy_zzz",
                                    &state_result, &state_get_result, &scls);
 }
 
@@ -284,22 +284,9 @@
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
   GNUNET_assert (GNUNET_OK == result);
 
-  modifiers[0] = (struct GNUNET_ENV_Modifier) {
-    .oper = '=',
-    .name = "_sync_foo",
-    .value = "one two three",
-    .value_size = sizeof ("one two three") - 1
-  };
-  modifiers[1] = (struct GNUNET_ENV_Modifier) {
-    .oper = '=',
-    .name = "_bar",
-    .value = "four five six",
-    .value_size = sizeof ("four five six") - 1
-  };
-
   op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
-                                      GNUNET_ntohll 
(fcls->msg[0]->message_id), 0,
-                                      2, modifiers, state_modify_result, fcls);
+                                      GNUNET_ntohll (fcls->msg[0]->message_id),
+                                      0, state_modify_result, fcls);
 }
 
 
@@ -356,6 +343,7 @@
 
   op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
                                     GNUNET_ntohll (fcls->msg[0]->message_id) + 
1,
+                                    GNUNET_ntohll (fcls->msg[0]->message_id) + 
2,
                                     2, modifiers, state_sync_result, fcls);
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]