gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37359 - in gnunet/src: curl datastore fs identity-provider


From: gnunet
Subject: [GNUnet-SVN] r37359 - in gnunet/src: curl datastore fs identity-provider include json jsonapi my nat peerstore pq psycutil social testbed-logger util
Date: Fri, 24 Jun 2016 22:17:40 +0200

Author: grothoff
Date: 2016-06-24 22:17:39 +0200 (Fri, 24 Jun 2016)
New Revision: 37359

Modified:
   gnunet/src/curl/
   gnunet/src/datastore/datastore.h
   gnunet/src/datastore/datastore_api.c
   gnunet/src/datastore/gnunet-datastore.c
   gnunet/src/datastore/gnunet-service-datastore.c
   gnunet/src/datastore/perf_datastore_api.c
   gnunet/src/datastore/plugin_datastore_mysql.c
   gnunet/src/datastore/test_datastore_api.c
   gnunet/src/datastore/test_datastore_api_management.c
   gnunet/src/datastore/test_plugin_datastore.c
   gnunet/src/datastore/test_plugin_datastore_data_mysql.conf
   gnunet/src/fs/fs_publish.c
   gnunet/src/fs/fs_publish_ublock.c
   gnunet/src/fs/fs_unindex.c
   gnunet/src/fs/gnunet-service-fs_cadet_server.c
   gnunet/src/fs/gnunet-service-fs_indexing.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_push.c
   gnunet/src/fs/gnunet-service-fs_put.c
   gnunet/src/identity-provider/
   gnunet/src/include/gnunet_datastore_service.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/json/
   gnunet/src/jsonapi/
   gnunet/src/my/
   gnunet/src/nat/
   gnunet/src/peerstore/
   gnunet/src/pq/
   gnunet/src/psycutil/
   gnunet/src/social/
   gnunet/src/testbed-logger/
   gnunet/src/util/
Log:
refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql 
backend

Index: gnunet/src/curl
===================================================================
--- gnunet/src/curl     2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/curl     2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/curl
___________________________________________________________________
Added: svn:ignore
## -0,0 +1,3 ##
+Makefile.in
+Makefile
+.deps
Modified: gnunet/src/datastore/datastore.h
===================================================================
--- gnunet/src/datastore/datastore.h    2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/datastore/datastore.h    2016-06-24 20:17:39 UTC (rev 37359)
@@ -106,12 +106,10 @@
  * Message to the datastore service asking about specific
  * content.
  */
-struct GetMessage
+struct GetKeyMessage
 {
   /**
-   * Type is GNUNET_MESSAGE_TYPE_DATASTORE_GET.  Size
-   * can either be "sizeof(struct GetMessage)" or
-   * "sizeof(struct GetMessage) - sizeof(struct GNUNET_HashCode)"!
+   * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY.
    */
   struct GNUNET_MessageHeader header;
 
@@ -126,8 +124,7 @@
   uint64_t offset GNUNET_PACKED;
 
   /**
-   * Desired key (optional).  Check the "size" of the
-   * header to see if the key is actually present.
+   * Desired key.
    */
   struct GNUNET_HashCode key;
 
@@ -135,6 +132,30 @@
 
 
 /**
+ * Message to the datastore service asking about specific
+ * content.
+ */
+struct GetMessage
+{
+  /**
+   * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Desired content type.  (actually an enum GNUNET_BLOCK_Type)
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Offset of the result.
+   */
+  uint64_t offset GNUNET_PACKED;
+
+};
+
+
+/**
  * Message to the datastore service asking about zero
  * anonymity content.
  */

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/datastore/datastore_api.c        2016-06-24 20:17:39 UTC (rev 
37359)
@@ -21,7 +21,7 @@
 /**
  * @file datastore/datastore_api.c
  * @brief Management for the datastore for files stored on a GNUnet node.  
Implements
- *        a priority queue for requests (with timeouts).
+ *        a priority queue for requests
  * @author Christian Grothoff
  */
 #include "platform.h"
@@ -95,7 +95,6 @@
 };
 
 
-
 /**
  * Entry in our priority queue.
  */
@@ -118,13 +117,6 @@
   struct GNUNET_DATASTORE_Handle *h;
 
   /**
-   * Response processor (NULL if we are not waiting for a response).
-   * This struct should be used for the closure, function-specific
-   * arguments can be passed via 'qc'.
-   */
-  GNUNET_CLIENT_MessageHandler response_proc;
-
-  /**
    * Function to call after transmission of the request.
    */
   GNUNET_DATASTORE_ContinuationWithStatus cont;
@@ -140,16 +132,12 @@
   union QueueContext qc;
 
   /**
-   * Task for timeout signalling.
+   * Envelope of the request to transmit, NULL after
+   * transmission.
    */
-  struct GNUNET_SCHEDULER_Task *task;
+  struct GNUNET_MQ_Envelope *env;
 
   /**
-   * Timeout for the current operation.
-   */
-  struct GNUNET_TIME_Absolute timeout;
-
-  /**
    * Priority in the queue.
    */
   unsigned int priority;
@@ -161,22 +149,13 @@
   unsigned int max_queue;
 
   /**
-   * Number of bytes in the request message following
-   * this struct.  32-bit value for nicer memory
-   * access (and overall struct alignment).
+   * Expected response type.
    */
-  uint32_t message_size;
+  uint16_t response_type;
 
-  /**
-   * Has this message been transmitted to the service?
-   * Only ever #GNUNET_YES for the head of the queue.
-   * Note that the overall struct should end at a
-   * multiple of 64 bits.
-   */
-  int was_transmitted;
-
 };
 
+
 /**
  * Handle to the datastore service.
  */
@@ -191,7 +170,7 @@
   /**
    * Current connection to the datastore service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Handle for statistics.
@@ -199,11 +178,6 @@
   struct GNUNET_STATISTICS_Handle *stats;
 
   /**
-   * Current transmit handle.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
    * Current head of priority queue.
    */
   struct GNUNET_DATASTORE_QueueEntry *queue_head;
@@ -216,7 +190,7 @@
   /**
    * Task for trying to reconnect.
    */
-  struct GNUNET_SCHEDULER_Task * reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
   /**
    * How quickly should we retry?  Used for exponential back-off on
@@ -237,11 +211,6 @@
   unsigned int result_count;
 
   /**
-   * Are we currently trying to receive from the service?
-   */
-  int in_receive;
-
-  /**
    * We should ignore the next message(s) from the service.
    */
   unsigned int skip_next_messages;
@@ -250,6 +219,110 @@
 
 
 /**
+ * Try reconnecting to the datastore service.
+ *
+ * @param cls the `struct GNUNET_DATASTORE_Handle`
+ */
+static void
+try_reconnect (void *cls);
+
+
+/**
+ * Disconnect from the service and then try reconnecting to the datastore 
service
+ * after some delay.
+ *
+ * @param h handle to datastore to disconnect and reconnect
+ */
+static void
+do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+{
+  if (NULL == h->mq)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_MQ_destroy (h->mq);
+  h->mq = NULL;
+  h->skip_next_messages = 0;
+  h->reconnect_task
+    = GNUNET_SCHEDULER_add_delayed (h->retry_time,
+                                    &try_reconnect,
+                                    h);
+}
+
+
+/**
+ * Free a queue entry.  Removes the given entry from the
+ * queue and releases associated resources.  Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                               h->queue_tail,
+                               qe);
+  h->queue_size--;
+  GNUNET_free (qe);
+}
+
+
+/**
+ * Handle error in sending drop request to datastore.
+ *
+ * @param cls closure with the datastore handle
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "MQ error, reconnecting to DATASTORE\n");
+  do_disconnect (h);
+  qe = h->queue_head;
+  if ( (NULL != qe) &&
+       (NULL == qe->env) )
+  {
+    union QueueContext qc = qe->qc;
+    uint16_t rt = qe->response_type;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to receive response from database.\n");
+    free_queue_entry (qe);
+    switch (rt)
+    {
+    case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
+      if (NULL != qc.sc.cont)
+        qc.sc.cont (qc.sc.cont_cls,
+                    GNUNET_SYSERR,
+                    GNUNET_TIME_UNIT_ZERO_ABS,
+                    _("DATASTORE disconnected"));
+      break;
+    case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
+      if (NULL != qc.rc.proc)
+        qc.rc.proc (qc.rc.proc_cls,
+                    NULL,
+                    0,
+                    NULL, 0, 0, 0,
+                    GNUNET_TIME_UNIT_ZERO_ABS,
+                    0);
+      break;
+    default:
+      GNUNET_break (0);
+    }
+  }
+}
+
+
+/**
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
@@ -258,22 +331,27 @@
 struct GNUNET_DATASTORE_Handle *
 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_CLIENT_Connection *c;
   struct GNUNET_DATASTORE_Handle *h;
 
-  c = GNUNET_CLIENT_connect ("datastore", cfg);
-  if (c == NULL)
-    return NULL;                /* oops */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Establishing DATASTORE connection!\n");
   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
-  h->client = c;
   h->cfg = cfg;
-  h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
+  try_reconnect (h);
+  if (NULL == h->mq)
+  {
+    GNUNET_free (h);
+    return NULL;
+  }
+  h->stats = GNUNET_STATISTICS_create ("datastore-api",
+                                       cfg);
   return h;
 }
 
 
 /**
- * Task used by 'transmit_drop' to disconnect the datastore.
+ * Task used by to disconnect from the datastore after
+ * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
  *
  * @param cls the datastore handle
  */
@@ -282,6 +360,8 @@
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Drop sent, disconnecting\n");
   GNUNET_DATASTORE_disconnect (h,
                                GNUNET_NO);
 }
@@ -288,33 +368,21 @@
 
 
 /**
- * Transmit DROP message to datastore service.
+ * Handle error in sending drop request to datastore.
  *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param size number of bytes that can be copied to @a buf
- * @param buf where to copy the drop message
- * @return number of bytes written to @a buf
+ * @param cls closure with the datastore handle
+ * @param error error code
  */
-static size_t
-transmit_drop (void *cls, size_t size, void *buf)
+static void
+disconnect_on_mq_error (void *cls,
+                        enum GNUNET_MQ_Error error)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
-  struct GNUNET_MessageHeader *hdr;
 
-  if (buf == NULL)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         _("Failed to transmit request to drop database.\n"));
-    GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
-    return 0;
-  }
-  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
-  hdr = buf;
-  hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
-  hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
-  GNUNET_SCHEDULER_add_now (&disconnect_after_drop,
-                            h);
-  return sizeof (struct GNUNET_MessageHeader);
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "Failed to ask datastore to drop tables\n");
+  GNUNET_DATASTORE_disconnect (h,
+                               GNUNET_NO);
 }
 
 
@@ -333,16 +401,11 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Datastore disconnect\n");
-  if (NULL != h->th)
+  if (NULL != h->mq)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
-    h->th = NULL;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  if (NULL != h->client)
-  {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
-  }
   if (NULL != h->reconnect_task)
   {
     GNUNET_SCHEDULER_cancel (h->reconnect_task);
@@ -350,25 +413,52 @@
   }
   while (NULL != (qe = h->queue_head))
   {
-    GNUNET_assert (NULL != qe->response_proc);
-    qe->response_proc (h, NULL);
+    switch (qe->response_type)
+    {
+    case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
+      if (NULL != qe->qc.sc.cont)
+        qe->qc.sc.cont (qe->qc.sc.cont_cls,
+                        GNUNET_SYSERR,
+                        GNUNET_TIME_UNIT_ZERO_ABS,
+                        _("Disconnected from DATASTORE"));
+      break;
+    case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
+      if (NULL != qe->qc.rc.proc)
+        qe->qc.rc.proc (qe->qc.rc.proc_cls,
+                        NULL,
+                        0,
+                        NULL, 0, 0, 0,
+                        GNUNET_TIME_UNIT_ZERO_ABS,
+                        0);
+      break;
+    default:
+      GNUNET_break (0);
+    }
+    free_queue_entry (qe);
   }
   if (GNUNET_YES == drop)
   {
-    h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
-    if (NULL != h->client)
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Re-connecting to issue DROP!\n");
+    GNUNET_assert (NULL == h->mq);
+    h->mq = GNUNET_CLIENT_connecT (h->cfg,
+                                   "datastore",
+                                   NULL,
+                                   &disconnect_on_mq_error,
+                                   h);
+    if (NULL != h->mq)
     {
-      if (NULL !=
-          GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                               sizeof (struct
-                                                       GNUNET_MessageHeader),
-                                               GNUNET_TIME_UNIT_SECONDS,
-                                               GNUNET_YES,
-                                               &transmit_drop,
-                                               h))
-        return;
-      GNUNET_CLIENT_disconnect (h->client);
-      h->client = NULL;
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
+      GNUNET_MQ_notify_sent (env,
+                             &disconnect_after_drop,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
+      return;
     }
     GNUNET_break (0);
   }
@@ -380,67 +470,37 @@
 
 
 /**
- * A request has timed out (before being transmitted to the service).
- *
- * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
- */
-static void
-timeout_queue_entry (void *cls)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-
-  GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop ("# queue entry timeouts"),
-                            1,
-                            GNUNET_NO);
-  qe->task = NULL;
-  GNUNET_assert (GNUNET_NO == qe->was_transmitted);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Timeout of request in datastore queue\n");
-  /* response_proc's expect request at the head of the queue! */
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                               h->queue_tail,
-                               qe);
-  GNUNET_CONTAINER_DLL_insert (h->queue_head,
-                               h->queue_tail,
-                               qe);
-  GNUNET_assert (h->queue_head == qe);
-  qe->response_proc (qe->h, NULL);
-}
-
-
-/**
  * Create a new entry for our priority queue (and possibly discard other 
entires if
  * the queue is getting too long).
  *
  * @param h handle to the datastore
- * @param msize size of the message to queue
+ * @param env envelope with the message to queue
  * @param queue_priority priority of the entry
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param response_proc function to call with replies (can be NULL)
+ * @param expected_type which type of response do we expect,
+ *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
+ *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
  * @param qc client context (NOT a closure for @a response_proc)
  * @return NULL if the queue is full
  */
 static struct GNUNET_DATASTORE_QueueEntry *
 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
-                  size_t msize,
+                  struct GNUNET_MQ_Envelope *env,
                   unsigned int queue_priority,
                   unsigned int max_queue_size,
-                  struct GNUNET_TIME_Relative timeout,
-                  GNUNET_CLIENT_MessageHandler response_proc,
+                  uint16_t expected_type,
                   const union QueueContext *qc)
 {
-  struct GNUNET_DATASTORE_QueueEntry *ret;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_DATASTORE_QueueEntry *pos;
   unsigned int c;
 
   c = 0;
   pos = h->queue_head;
-  while ((pos != NULL) && (c < max_queue_size) &&
-         (pos->priority >= queue_priority))
+  while ( (NULL != pos) &&
+          (c < max_queue_size) &&
+          (pos->priority >= queue_priority) )
   {
     c++;
     pos = pos->next;
@@ -451,18 +511,17 @@
                               gettext_noop ("# queue overflows"),
                               1,
                               GNUNET_NO);
+    GNUNET_MQ_discard (env);
     return NULL;
   }
-  ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
-  ret->h = h;
-  ret->response_proc = response_proc;
-  ret->qc = *qc;
-  ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  ret->priority = queue_priority;
-  ret->max_queue = max_queue_size;
-  ret->message_size = msize;
-  ret->was_transmitted = GNUNET_NO;
-  if (pos == NULL)
+  qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
+  qe->h = h;
+  qe->env = env;
+  qe->response_type = expected_type;
+  qe->qc = *qc;
+  qe->priority = queue_priority;
+  qe->max_queue = max_queue_size;
+  if (NULL == pos)
   {
     /* append at the tail */
     pos = h->queue_tail;
@@ -472,7 +531,8 @@
     pos = pos->prev;
     /* do not insert at HEAD if HEAD query was already
      * transmitted and we are still receiving replies! */
-    if ((pos == NULL) && (h->queue_head->was_transmitted))
+    if ( (NULL == pos) &&
+         (NULL == h->queue_head->env) )
       pos = h->queue_head;
   }
   c++;
@@ -479,42 +539,15 @@
 #if INSANE_STATISTICS
   GNUNET_STATISTICS_update (h->stats,
                             gettext_noop ("# queue entries created"),
-                            1, GNUNET_NO);
+                            1,
+                            GNUNET_NO);
 #endif
   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
                                      h->queue_tail,
                                      pos,
-                                     ret);
+                                     qe);
   h->queue_size++;
-  ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
-                                            &timeout_queue_entry,
-                                            ret);
-  for (pos = ret->next; NULL != pos; pos = pos->next)
-  {
-    if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == 
GNUNET_NO))
-    {
-      GNUNET_assert (NULL != pos->response_proc);
-      /* move 'pos' element to head so that it will be
-       * killed on 'NULL' call below */
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Dropping request from datastore queue\n");
-      /* response_proc's expect request at the head of the queue! */
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                   h->queue_tail,
-                                   pos);
-      GNUNET_CONTAINER_DLL_insert (h->queue_head,
-                                   h->queue_tail,
-                                   pos);
-      GNUNET_STATISTICS_update (h->stats,
-                                gettext_noop
-                                ("# Requests dropped from datastore queue"), 1,
-                                GNUNET_NO);
-      GNUNET_assert (h->queue_head == pos);
-      pos->response_proc (h, NULL);
-      break;
-    }
-  }
-  return ret;
+  return qe;
 }
 
 
@@ -525,78 +558,88 @@
  * @param h handle to the datastore
  */
 static void
-process_queue (struct GNUNET_DATASTORE_Handle *h);
-
-
-/**
- * Try reconnecting to the datastore service.
- *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- */
-static void
-try_reconnect (void *cls)
+process_queue (struct GNUNET_DATASTORE_Handle *h)
 {
-  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
 
-  h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
-  h->reconnect_task = NULL;
-  h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
-  if (h->client == NULL)
+  if (NULL == (qe = h->queue_head))
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
+    /* no entry in queue */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue empty\n");
     return;
   }
-  GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop
-                            ("# datastore connections (re)created"), 1,
-                            GNUNET_NO);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
-  process_queue (h);
+  if (NULL == qe->env)
+  {
+    /* waiting for replies */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Head request already transmitted\n");
+    return;
+  }
+  if (NULL == h->mq)
+  {
+    /* waiting for reconnect */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Not connected\n");
+    return;
+  }
+  GNUNET_MQ_send (h->mq,
+                  qe->env);
+  qe->env = NULL;
 }
 
 
+
+
 /**
- * Disconnect from the service and then try reconnecting to the datastore 
service
- * after some delay.
+ * Function called to check status message from the service.
  *
- * @param h handle to datastore to disconnect and reconnect
+ * @param cls closure
+ * @param sm status message received
+ * @return #GNUNET_OK if the message is well-formed
  */
-static void
-do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+static int
+check_status (void *cls,
+              const struct StatusMessage *sm)
 {
-  if (NULL == h->client)
+  uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
+  int32_t status = ntohl (sm->status);
+
+  if (msize > 0)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Client NULL in disconnect, will not try to reconnect\n");
-    return;
+    const char *emsg = (const char *) &sm[1];
+
+    if ('\0' != emsg[msize - 1])
+    {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
   }
-  GNUNET_CLIENT_disconnect (h->client);
-  h->skip_next_messages = 0;
-  h->client = NULL;
-  h->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (h->retry_time,
-                                    &try_reconnect,
-                                    h);
+  else if (GNUNET_SYSERR == status)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
 }
 
 
 /**
- * Function called whenever we receive a message from
- * the service.  Calls the appropriate handler.
+ * Function called to handle status message from the service.
  *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param msg the received message
+ * @param cls closure
+ * @param sm status message received
  */
 static void
-receive_cb (void *cls,
-            const struct GNUNET_MessageHeader *msg)
+handle_status (void *cls,
+               const struct StatusMessage *sm)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct StatusContext rc;
+  const char *emsg;
+  int32_t status = ntohl (sm->status);
 
-  h->in_receive = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Receiving reply from datastore\n");
   if (h->skip_next_messages > 0)
   {
     h->skip_next_messages--;
@@ -606,256 +649,259 @@
   if (NULL == (qe = h->queue_head))
   {
     GNUNET_break (0);
-    process_queue (h);
+    do_disconnect (h);
     return;
   }
-  qe->response_proc (h, msg);
+  if (NULL != qe->env)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  rc = qe->qc.sc;
+  free_queue_entry (qe);
+  if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
+    emsg = (const char *) &sm[1];
+  else
+    emsg = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received status %d/%s\n",
+       (int) status,
+       emsg);
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# status messages received"),
+                            1,
+                            GNUNET_NO);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  process_queue (h);
+  if (NULL != rc.cont)
+    rc.cont (rc.cont_cls,
+             status,
+            GNUNET_TIME_absolute_ntoh (sm->min_expiration),
+            emsg);
 }
 
 
 /**
- * Transmit request from queue to datastore service.
+ * Check data message we received from the service.
  *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param size number of bytes that can be copied to @a buf
- * @param buf where to copy the drop message
- * @return number of bytes written to @a buf
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param dm message received
  */
-static size_t
-transmit_request (void *cls,
-                  size_t size,
-                  void *buf)
+static int
+check_data (void *cls,
+            const struct DataMessage *dm)
 {
-  struct GNUNET_DATASTORE_Handle *h = cls;
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-  size_t msize;
+  uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
 
-  h->th = NULL;
-  if (NULL == (qe = h->queue_head))
-    return 0;                   /* no entry in queue */
-  if (NULL == buf)
+  if (msize != ntohl (dm->size))
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Failed to transmit request to DATASTORE.\n");
-    GNUNET_STATISTICS_update (h->stats,
-                              gettext_noop ("# transmission request failures"),
-                              1, GNUNET_NO);
-    do_disconnect (h);
-    return 0;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-  if (size < (msize = qe->message_size))
-  {
-    process_queue (h);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting %u byte request to DATASTORE\n",
-       msize);
-  memcpy (buf, &qe[1], msize);
-  qe->was_transmitted = GNUNET_YES;
-  GNUNET_SCHEDULER_cancel (qe->task);
-  qe->task = NULL;
-  GNUNET_assert (GNUNET_NO == h->in_receive);
-  h->in_receive = GNUNET_YES;
-  GNUNET_CLIENT_receive (h->client,
-                         &receive_cb, h,
-                         GNUNET_TIME_absolute_get_remaining (qe->timeout));
-#if INSANE_STATISTICS
-  GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop ("# bytes sent to datastore"), msize,
-                            GNUNET_NO);
-#endif
-  return msize;
+  return GNUNET_OK;
 }
 
 
 /**
- * Process entries in the queue (or do nothing if we are already
- * doing so).
+ * Handle data message we got from the service.
  *
- * @param h handle to the datastore
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param dm message received
  */
 static void
-process_queue (struct GNUNET_DATASTORE_Handle *h)
+handle_data (void *cls,
+             const struct DataMessage *dm)
 {
+  struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct ResultContext rc;
 
-  if (NULL == (qe = h->queue_head))
+  if (h->skip_next_messages > 0)
   {
-    /* no entry in queue */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Queue empty\n");
+    process_queue (h);
     return;
   }
-  if (GNUNET_YES == qe->was_transmitted)
+  qe = h->queue_head;
+  if (NULL == qe)
   {
-    /* waiting for replies */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Head request already transmitted\n");
+    GNUNET_break (0);
+    do_disconnect (h);
     return;
   }
-  if (NULL != h->th)
+  if (NULL != qe->env)
   {
-    /* request pending */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Pending transmission request\n");
+    GNUNET_break (0);
+    do_disconnect (h);
     return;
   }
-  if (NULL == h->client)
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
   {
-    /* waiting for reconnect */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Not connected\n");
+    GNUNET_break (0);
+    do_disconnect (h);
     return;
   }
-  if (GNUNET_YES == h->in_receive)
-  {
-    /* wait for response to previous query */
-    return;
-  }
+#if INSANE_STATISTICS
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# Results received"),
+                            1,
+                            GNUNET_NO);
+#endif
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing %u byte request to DATASTORE\n",
-       qe->message_size);
-  h->th
-    = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                           qe->message_size,
-                                           GNUNET_TIME_absolute_get_remaining 
(qe->timeout),
-                                           GNUNET_YES,
-                                           &transmit_request,
-                                           h);
-  GNUNET_assert (GNUNET_NO == h->in_receive);
-  GNUNET_break (NULL != h->th);
+       "Received result %llu with type %u and size %u with key %s\n",
+       (unsigned long long) GNUNET_ntohll (dm->uid),
+       ntohl (dm->type),
+       ntohl (dm->size),
+       GNUNET_h2s (&dm->key));
+  rc = qe->qc.rc;
+  free_queue_entry (qe);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  process_queue (h);
+  if (NULL != rc.proc)
+    rc.proc (rc.proc_cls,
+             &dm->key,
+             ntohl (dm->size),
+             &dm[1],
+             ntohl (dm->type),
+             ntohl (dm->priority),
+             ntohl (dm->anonymity),
+             GNUNET_TIME_absolute_ntoh (dm->expiration),
+             GNUNET_ntohll (dm->uid));
 }
 
 
 /**
- * Dummy continuation used to do nothing (but be non-zero).
+ * Type of a function to call when we receive a
+ * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
  *
- * @param cls closure
- * @param result result
- * @param min_expiration expiration time
- * @param emsg error message
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param msg message received
  */
 static void
-drop_status_cont (void *cls, int32_t result,
-                 struct GNUNET_TIME_Absolute min_expiration,
-                 const char *emsg)
+handle_data_end (void *cls,
+                 const struct GNUNET_MessageHeader *msg)
 {
-  /* do nothing */
-}
-
-
-/**
- * Free a queue entry.  Removes the given entry from the
- * queue and releases associated resources.  Does NOT
- * call the callback.
- *
- * @param qe entry to free.
- */
-static void
-free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
-{
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                               h->queue_tail,
-                               qe);
-  if (qe->task != NULL)
-  {
-    GNUNET_SCHEDULER_cancel (qe->task);
-    qe->task = NULL;
-  }
-  h->queue_size--;
-  qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
-  GNUNET_free (qe);
-}
-
-
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-process_status_message (void *cls,
-                        const struct GNUNET_MessageHeader *msg)
-{
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct StatusContext rc;
-  const struct StatusMessage *sm;
-  const char *emsg;
-  int32_t status;
-  int was_transmitted;
+  struct ResultContext rc;
 
-  if (NULL == (qe = h->queue_head))
+  if (h->skip_next_messages > 0)
   {
-    GNUNET_break (0);
-    do_disconnect (h);
+    h->skip_next_messages--;
+    process_queue (h);
     return;
   }
-  rc = qe->qc.sc;
-  if (NULL == msg)
+  qe = h->queue_head;
+  if (NULL == qe)
   {
-    was_transmitted = qe->was_transmitted;
-    free_queue_entry (qe);
-    if (was_transmitted == GNUNET_YES)
-      do_disconnect (h);
-    else
-      process_queue (h);
-    if (NULL != rc.cont)
-      rc.cont (rc.cont_cls, GNUNET_SYSERR,
-              GNUNET_TIME_UNIT_ZERO_ABS,
-               _("Failed to receive status response from database."));
+    GNUNET_break (0);
+    do_disconnect (h);
     return;
   }
-  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  free_queue_entry (qe);
-  if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
-      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
+  if (NULL != qe->env)
   {
     GNUNET_break (0);
-    h->retry_time = GNUNET_TIME_UNIT_ZERO;
     do_disconnect (h);
-    if (rc.cont != NULL)
-      rc.cont (rc.cont_cls, GNUNET_SYSERR,
-              GNUNET_TIME_UNIT_ZERO_ABS,
-               _("Error reading response from datastore service"));
     return;
   }
-  sm = (const struct StatusMessage *) msg;
-  status = ntohl (sm->status);
-  emsg = NULL;
-  if (ntohs (msg->size) > sizeof (struct StatusMessage))
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
   {
-    emsg = (const char *) &sm[1];
-    if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
-    {
-      GNUNET_break (0);
-      emsg = _("Invalid error message received from datastore service");
-    }
-  }
-  if ((status == GNUNET_SYSERR) && (emsg == NULL))
-  {
     GNUNET_break (0);
-    emsg = _("Invalid error message received from datastore service");
+    do_disconnect (h);
+    return;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
+  rc = qe->qc.rc;
+  free_queue_entry (qe);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of result set, new queue size is %u\n",
+       h->queue_size);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  h->result_count = 0;
+  process_queue (h);
+  /* signal end of iteration */
+  if (NULL != rc.proc)
+    rc.proc (rc.proc_cls,
+             NULL,
+             0,
+             NULL,
+             0,
+             0,
+             0,
+             GNUNET_TIME_UNIT_ZERO_ABS,
+             0);
+}
+
+
+/**
+ * Try reconnecting to the datastore service.
+ *
+ * @param cls the `struct GNUNET_DATASTORE_Handle`
+ */
+static void
+try_reconnect (void *cls)
+{
+  GNUNET_MQ_hd_var_size (status,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         struct StatusMessage);
+  GNUNET_MQ_hd_var_size (data,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                         struct DataMessage);
+  GNUNET_MQ_hd_fixed_size (data_end,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
+                           struct GNUNET_MessageHeader);
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_status_handler (h),
+    make_data_handler (h),
+    make_data_end_handler (h),
+    GNUNET_MQ_handler_end ()
+  };
+
+  h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
+  h->reconnect_task = NULL;
+  GNUNET_assert (NULL == h->mq);
+  h->mq = GNUNET_CLIENT_connecT (h->cfg,
+                                 "datastore",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+    return;
   GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop ("# status messages received"), 1,
+                            gettext_noop ("# datastore connections 
(re)created"),
+                            1,
                             GNUNET_NO);
-  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Reconnected to DATASTORE\n");
   process_queue (h);
-  if (rc.cont != NULL)
-    rc.cont (rc.cont_cls, status,
-            GNUNET_TIME_absolute_ntoh (sm->min_expiration),
-            emsg);
 }
 
 
 /**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result
+ * @param min_expiration expiration time
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls,
+                  int32_t result,
+                 struct GNUNET_TIME_Absolute min_expiration,
+                 const char *emsg)
+{
+  /* do nothing */
+}
+
+
+/**
  * Store an item in the datastore.  If the item is already present,
  * the priorities are summed up and the higher expiration time and
  * lower anonymity level is used.
@@ -874,7 +920,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -894,15 +939,20 @@
                       struct GNUNET_TIME_Absolute expiration,
                       unsigned int queue_priority,
                       unsigned int max_queue_size,
-                      struct GNUNET_TIME_Relative timeout,
                       GNUNET_DATASTORE_ContinuationWithStatus cont,
                       void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct DataMessage *dm;
-  size_t msize;
   union QueueContext qc;
 
+  if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asked to put %u bytes of data under key `%s' for %s\n",
        size,
@@ -909,18 +959,31 @@
        GNUNET_h2s (key),
        GNUNET_STRINGS_relative_time_to_string 
(GNUNET_TIME_absolute_get_remaining (expiration),
                                               GNUNET_YES));
-  msize = sizeof (struct DataMessage) + size;
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  env = GNUNET_MQ_msg_extra (dm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
+  dm->rid = htonl (rid);
+  dm->size = htonl ((uint32_t) size);
+  dm->type = htonl (type);
+  dm->priority = htonl (priority);
+  dm->anonymity = htonl (anonymity);
+  dm->replication = htonl (replication);
+  dm->reserved = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
+  dm->key = *key;
+  memcpy (&dm[1],
+          data,
+          size);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qe = make_queue_entry (h,
-                         msize,
+                         env,
                          queue_priority,
                          max_queue_size,
-                         timeout,
-                         &process_status_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
                          &qc);
-  if (qe == NULL)
+  if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Could not create queue entry for PUT\n");
@@ -930,20 +993,6 @@
                             gettext_noop ("# PUT requests executed"),
                             1,
                             GNUNET_NO);
-  dm = (struct DataMessage *) &qe[1];
-  dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
-  dm->header.size = htons (msize);
-  dm->rid = htonl (rid);
-  dm->size = htonl ((uint32_t) size);
-  dm->type = htonl (type);
-  dm->priority = htonl (priority);
-  dm->anonymity = htonl (anonymity);
-  dm->replication = htonl (replication);
-  dm->reserved = htonl (0);
-  dm->uid = GNUNET_htonll (0);
-  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
   process_queue (h);
   return qe;
 }
@@ -972,6 +1021,7 @@
                           void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct ReserveMessage *rm;
   union QueueContext qc;
 
@@ -981,14 +1031,18 @@
        "Asked to reserve %llu bytes of data and %u entries\n",
        (unsigned long long) amount,
        (unsigned int) entries);
+  env = GNUNET_MQ_msg (rm,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
+  rm->entries = htonl (entries);
+  rm->amount = GNUNET_htonll (amount);
+
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qe = make_queue_entry (h,
-                         sizeof (struct ReserveMessage),
+                         env,
                          UINT_MAX,
                          UINT_MAX,
-                         GNUNET_TIME_UNIT_FOREVER_REL,
-                         &process_status_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
                          &qc);
   if (NULL == qe)
   {
@@ -1000,11 +1054,6 @@
                             gettext_noop ("# RESERVE requests executed"),
                             1,
                             GNUNET_NO);
-  rm = (struct ReserveMessage *) &qe[1];
-  rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
-  rm->header.size = htons (sizeof (struct ReserveMessage));
-  rm->entries = htonl (entries);
-  rm->amount = GNUNET_htonll (amount);
   process_queue (h);
   return qe;
 }
@@ -1024,7 +1073,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -1036,29 +1084,31 @@
                                   uint32_t rid,
                                   unsigned int queue_priority,
                                   unsigned int max_queue_size,
-                                  struct GNUNET_TIME_Relative timeout,
                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
                                   void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct ReleaseReserveMessage *rrm;
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (NULL == cont)
     cont = &drop_status_cont;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asked to release reserve %d\n",
        rid);
+  env = GNUNET_MQ_msg (rrm,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
+  rrm->rid = htonl (rid);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qe = make_queue_entry (h,
-                         sizeof (struct ReleaseReserveMessage),
+                         env,
                          queue_priority,
                          max_queue_size,
-                         timeout,
-                         &process_status_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
                          &qc);
-  if (qe == NULL)
+  if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Could not create queue entry to release reserve\n");
@@ -1068,10 +1118,6 @@
                             gettext_noop
                             ("# RELEASE RESERVE requests executed"), 1,
                             GNUNET_NO);
-  rrm = (struct ReleaseReserveMessage *) &qe[1];
-  rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
-  rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
-  rrm->rid = htonl (rid);
   process_queue (h);
   return qe;
 }
@@ -1087,7 +1133,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -1101,15 +1146,15 @@
                          struct GNUNET_TIME_Absolute expiration,
                          unsigned int queue_priority,
                          unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
                          GNUNET_DATASTORE_ContinuationWithStatus cont,
                          void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct UpdateMessage *um;
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (NULL == cont)
     cont = &drop_status_cont;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asked to update entry %llu raising priority by %u and expiration to 
%s\n",
@@ -1116,11 +1161,21 @@
        uid,
        (unsigned int) priority,
        GNUNET_STRINGS_absolute_time_to_string (expiration));
+  env = GNUNET_MQ_msg (um,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
+  um->priority = htonl (priority);
+  um->expiration = GNUNET_TIME_absolute_hton (expiration);
+  um->uid = GNUNET_htonll (uid);
+
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
-                         max_queue_size, timeout, &process_status_message, 
&qc);
-  if (qe == NULL)
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         &qc);
+  if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Could not create queue entry for UPDATE\n");
@@ -1129,12 +1184,6 @@
   GNUNET_STATISTICS_update (h->stats,
                             gettext_noop ("# UPDATE requests executed"), 1,
                             GNUNET_NO);
-  um = (struct UpdateMessage *) &qe[1];
-  um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
-  um->header.size = htons (sizeof (struct UpdateMessage));
-  um->priority = htonl (priority);
-  um->expiration = GNUNET_TIME_absolute_hton (expiration);
-  um->uid = GNUNET_htonll (uid);
   process_queue (h);
   return qe;
 }
@@ -1154,7 +1203,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -1168,33 +1216,50 @@
                          const void *data,
                          unsigned int queue_priority,
                          unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
                          GNUNET_DATASTORE_ContinuationWithStatus cont,
                          void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct DataMessage *dm;
-  size_t msize;
+  struct GNUNET_MQ_Envelope *env;
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+  if (NULL == cont)
     cont = &drop_status_cont;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asked to remove %u bytes under key `%s'\n",
        size,
        GNUNET_h2s (key));
+  env = GNUNET_MQ_msg_extra (dm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+  dm->rid = htonl (0);
+  dm->size = htonl (size);
+  dm->type = htonl (0);
+  dm->priority = htonl (0);
+  dm->anonymity = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
+  dm->key = *key;
+  memcpy (&dm[1],
+          data,
+          size);
+
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  msize = sizeof (struct DataMessage) + size;
-  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+
   qe = make_queue_entry (h,
-                         msize,
+                         env,
                          queue_priority,
                          max_queue_size,
-                         timeout,
-                         &process_status_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
                          &qc);
-  if (qe == NULL)
+  if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Could not create queue entry for REMOVE\n");
@@ -1201,129 +1266,15 @@
     return NULL;
   }
   GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop ("# REMOVE requests executed"), 1,
+                            gettext_noop ("# REMOVE requests executed"),
+                            1,
                             GNUNET_NO);
-  dm = (struct DataMessage *) &qe[1];
-  dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
-  dm->header.size = htons (msize);
-  dm->rid = htonl (0);
-  dm->size = htonl (size);
-  dm->type = htonl (0);
-  dm->priority = htonl (0);
-  dm->anonymity = htonl (0);
-  dm->uid = GNUNET_htonll (0);
-  dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
   process_queue (h);
   return qe;
 }
 
 
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_DATASTORE_Handle *h = cls;
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct ResultContext rc;
-  const struct DataMessage *dm;
-  int was_transmitted;
 
-  if (NULL == msg)
-  {
-    qe = h->queue_head;
-    GNUNET_assert (NULL != qe);
-    rc = qe->qc.rc;
-    was_transmitted = qe->was_transmitted;
-    free_queue_entry (qe);
-    if (GNUNET_YES == was_transmitted)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Failed to receive response from database.\n");
-      do_disconnect (h);
-    }
-    else
-    {
-      process_queue (h);
-    }
-    if (NULL != rc.proc)
-      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
-               0);
-    return;
-  }
-  if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
-  {
-    GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
-    qe = h->queue_head;
-    rc = qe->qc.rc;
-    GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-    free_queue_entry (qe);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received end of result set, new queue size is %u\n", h->queue_size);
-    h->retry_time = GNUNET_TIME_UNIT_ZERO;
-    h->result_count = 0;
-    process_queue (h);
-    if (NULL != rc.proc)
-      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
-               0);
-    return;
-  }
-  qe = h->queue_head;
-  GNUNET_assert (NULL != qe);
-  rc = qe->qc.rc;
-  if (GNUNET_YES != qe->was_transmitted)
-  {
-    GNUNET_break (0);
-    free_queue_entry (qe);
-    h->retry_time = GNUNET_TIME_UNIT_ZERO;
-    do_disconnect (h);
-    if (rc.proc != NULL)
-      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
-               0);
-    return;
-  }
-  if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
-      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
-      (ntohs (msg->size) !=
-       sizeof (struct DataMessage) +
-       ntohl (((const struct DataMessage *) msg)->size)))
-  {
-    GNUNET_break (0);
-    free_queue_entry (qe);
-    h->retry_time = GNUNET_TIME_UNIT_ZERO;
-    do_disconnect (h);
-    if (rc.proc != NULL)
-      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
-               0);
-    return;
-  }
-#if INSANE_STATISTICS
-  GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
-                            GNUNET_NO);
-#endif
-  dm = (const struct DataMessage *) msg;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received result %llu with type %u and size %u with key %s\n",
-       (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
-       ntohl (dm->size), GNUNET_h2s (&dm->key));
-  free_queue_entry (qe);
-  h->retry_time = GNUNET_TIME_UNIT_ZERO;
-  process_queue (h);
-  if (rc.proc != NULL)
-    rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
-             ntohl (dm->priority), ntohl (dm->anonymity),
-             GNUNET_TIME_absolute_ntoh (dm->expiration),
-             GNUNET_ntohll (dm->uid));
-}
-
-
 /**
  * Get a random value from the datastore for content replication.
  * Returns a single, random value among those with the highest
@@ -1335,7 +1286,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
@@ -1347,23 +1297,27 @@
 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
                                       unsigned int queue_priority,
                                       unsigned int max_queue_size,
-                                      struct GNUNET_TIME_Relative timeout,
                                       GNUNET_DATASTORE_DatumProcessor proc,
                                       void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_MessageHeader *m;
   union QueueContext qc;
 
   GNUNET_assert (NULL != proc);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to get replication entry in %s\n",
-       GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
+       "Asked to get replication entry\n");
+  env = GNUNET_MQ_msg (m,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
   qc.rc.proc = proc;
   qc.rc.proc_cls = proc_cls;
-  qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
-                         queue_priority, max_queue_size, timeout,
-                         &process_result_message, &qc);
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                         &qc);
   if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1374,9 +1328,6 @@
                             gettext_noop
                             ("# GET REPLICATION requests executed"), 1,
                             GNUNET_NO);
-  m = (struct GNUNET_MessageHeader *) &qe[1];
-  m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
-  m->size = htons (sizeof (struct GNUNET_MessageHeader));
   process_queue (h);
   return qe;
 }
@@ -1393,7 +1344,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param type allowed type for the operation (never zero)
  * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
@@ -1407,12 +1357,12 @@
                                      uint64_t offset,
                                      unsigned int queue_priority,
                                      unsigned int max_queue_size,
-                                     struct GNUNET_TIME_Relative timeout,
                                      enum GNUNET_BLOCK_Type type,
                                      GNUNET_DATASTORE_DatumProcessor proc,
                                      void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
@@ -1419,19 +1369,20 @@
   GNUNET_assert (NULL != proc);
   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
+       "Asked to get %llu-th zero-anonymity entry of type %d\n",
        (unsigned long long) offset,
-       type,
-       GNUNET_STRINGS_relative_time_to_string (timeout,
-                                               GNUNET_YES));
+       type);
+  env = GNUNET_MQ_msg (m,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
+  m->type = htonl ((uint32_t) type);
+  m->offset = GNUNET_htonll (offset);
   qc.rc.proc = proc;
   qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h,
-                         sizeof (struct GetZeroAnonymityMessage),
+                         env,
                          queue_priority,
                          max_queue_size,
-                         timeout,
-                         &process_result_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
                          &qc);
   if (NULL == qe)
   {
@@ -1443,11 +1394,6 @@
                             gettext_noop
                             ("# GET ZERO ANONYMITY requests executed"), 1,
                             GNUNET_NO);
-  m = (struct GetZeroAnonymityMessage *) &qe[1];
-  m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
-  m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
-  m->type = htonl ((uint32_t) type);
-  m->offset = GNUNET_htonll (offset);
   process_queue (h);
   return qe;
 }
@@ -1467,7 +1413,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
  * @param proc_cls closure for @a proc
@@ -1481,11 +1426,12 @@
                           enum GNUNET_BLOCK_Type type,
                           unsigned int queue_priority,
                           unsigned int max_queue_size,
-                          struct GNUNET_TIME_Relative timeout,
                           GNUNET_DATASTORE_DatumProcessor proc,
                           void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
+  struct GetKeyMessage *gkm;
   struct GetMessage *gm;
   union QueueContext qc;
 
@@ -1492,17 +1438,32 @@
   GNUNET_assert (NULL != proc);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Asked to look for data of type %u under key `%s'\n",
-       (unsigned int) type, GNUNET_h2s (key));
+       (unsigned int) type,
+       GNUNET_h2s (key));
+  if (NULL == key)
+  {
+    env = GNUNET_MQ_msg (gm,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_GET);
+    gm->type = htonl (type);
+    gm->offset = GNUNET_htonll (offset);
+  }
+  else
+  {
+    env = GNUNET_MQ_msg (gkm,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
+    gkm->type = htonl (type);
+    gkm->offset = GNUNET_htonll (offset);
+    gkm->key = *key;
+  }
   qc.rc.proc = proc;
   qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h,
-                         sizeof (struct GetMessage),
+                         env,
                          queue_priority,
                          max_queue_size,
-                         timeout,
-                         &process_result_message,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
                          &qc);
-  if (qe == NULL)
+  if (NULL == qe)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Could not queue request for `%s'\n",
@@ -1515,20 +1476,6 @@
                             1,
                             GNUNET_NO);
 #endif
-  gm = (struct GetMessage *) &qe[1];
-  gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
-  gm->type = htonl (type);
-  gm->offset = GNUNET_htonll (offset);
-  if (key != NULL)
-  {
-    gm->header.size = htons (sizeof (struct GetMessage));
-    gm->key = *key;
-  }
-  else
-  {
-    gm->header.size =
-        htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
-  }
   process_queue (h);
   return qe;
 }
@@ -1543,16 +1490,14 @@
 void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
-  struct GNUNET_DATASTORE_Handle *h;
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
 
-  GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
-  h = qe->h;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Pending DATASTORE request %p cancelled (%d, %d)\n",
        qe,
-       qe->was_transmitted,
+       NULL == qe->env,
        h->queue_head == qe);
-  if (GNUNET_YES == qe->was_transmitted)
+  if (NULL == qe->env)
   {
     free_queue_entry (qe);
     h->skip_next_messages++;

Modified: gnunet/src/datastore/gnunet-datastore.c
===================================================================
--- gnunet/src/datastore/gnunet-datastore.c     2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/datastore/gnunet-datastore.c     2016-06-24 20:17:39 UTC (rev 
37359)
@@ -137,7 +137,8 @@
 static void
 do_put (void *cls,
        const struct GNUNET_HashCode *key,
-       size_t size, const void *data,
+       size_t size,
+        const void *data,
        enum GNUNET_BLOCK_Type type,
        uint32_t priority,
        uint32_t anonymity,
@@ -158,7 +159,7 @@
                             priority, anonymity,
                             0 /* FIXME: replication is lost... */,
                             expiration,
-                            0, 1, GNUNET_TIME_UNIT_FOREVER_REL,
+                            0, 1,
                             &do_finish, NULL);
 }
 
@@ -173,7 +174,6 @@
                                 offset,
                                 NULL, GNUNET_BLOCK_TYPE_ANY,
                                 0, 1,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
                                 &do_put, NULL);
 }
 

Modified: gnunet/src/datastore/gnunet-service-datastore.c
===================================================================
--- gnunet/src/datastore/gnunet-service-datastore.c     2016-06-24 20:12:53 UTC 
(rev 37358)
+++ gnunet/src/datastore/gnunet-service-datastore.c     2016-06-24 20:17:39 UTC 
(rev 37359)
@@ -416,16 +416,20 @@
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
-                 const void *data, enum GNUNET_BLOCK_Type type,
-                 uint32_t priority, uint32_t anonymity,
-                 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+quota_processor (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 uint32_t size,
+                 const void *data,
+                 enum GNUNET_BLOCK_Type type,
+                 uint32_t priority,
+                 uint32_t anonymity,
+                 struct GNUNET_TIME_Absolute expiration,
+                 uint64_t uid)
 {
   unsigned long long *need = cls;
 
@@ -473,12 +477,15 @@
   unsigned long long last;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asked to free up %llu bytes of cache space\n", need);
+              "Asked to free up %llu bytes of cache space\n",
+              need);
   last = 0;
   while ((need > 0) && (last != need))
   {
     last = need;
-    plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
+    plugin->api->get_expiration (plugin->api->cls,
+                                 &quota_processor,
+                                 &need);
   }
 }
 
@@ -1068,7 +1075,7 @@
 
 
 /**
- * Handle GET-message.
+ * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
  *
  * @param cls closure
  * @param client identification of the client
@@ -1080,28 +1087,52 @@
             const struct GNUNET_MessageHeader *message)
 {
   const struct GetMessage *msg;
-  uint16_t size;
 
-  size = ntohs (message->size);
-  if ((size != sizeof (struct GetMessage)) &&
-      (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
-  {
-    GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
-  }
   msg = (const struct GetMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Processing GET request of type %u\n",
+              ntohl (msg->type));
+  GNUNET_STATISTICS_update (stats,
+                            gettext_noop ("# GET requests received"),
+                            1,
+                            GNUNET_NO);
+  GNUNET_SERVER_client_keep (client);
+  plugin->api->get_key (plugin->api->cls,
+                        GNUNET_ntohll (msg->offset),
+                        NULL,
+                        NULL,
+                        ntohl (msg->type),
+                        &transmit_item,
+                        client);
+}
+
+/**
+ * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ */
+static void
+handle_get_key (void *cls,
+                struct GNUNET_SERVER_Client *client,
+                const struct GNUNET_MessageHeader *message)
+{
+  const struct GetKeyMessage *msg;
+
+  msg = (const struct GetKeyMessage *) message;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Processing GET request for `%s' of type %u\n",
               GNUNET_h2s (&msg->key),
               ntohl (msg->type));
   GNUNET_STATISTICS_update (stats,
-                            gettext_noop ("# GET requests received"),
+                            gettext_noop ("# GET KEY requests received"),
                             1,
                             GNUNET_NO);
   GNUNET_SERVER_client_keep (client);
-  if ( (size == sizeof (struct GetMessage)) &&
-       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) )
+  if (GNUNET_YES !=
+      GNUNET_CONTAINER_bloomfilter_test (filter,
+                                         &msg->key))
   {
     /* don't bother database... */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1112,14 +1143,19 @@
                               ("# requests filtered by bloomfilter"),
                               1,
                               GNUNET_NO);
-    transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+    transmit_item (client,
+                   NULL, 0, NULL, 0, 0, 0,
+                   GNUNET_TIME_UNIT_ZERO_ABS,
                    0);
     return;
   }
-  plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
-                        ((size ==
-                          sizeof (struct GetMessage)) ? &msg->key : NULL), 
NULL,
-                        ntohl (msg->type), &transmit_item, client);
+  plugin->api->get_key (plugin->api->cls,
+                        GNUNET_ntohll (msg->offset),
+                        &msg->key,
+                        NULL,
+                        ntohl (msg->type),
+                        &transmit_item,
+                        client);
 }
 
 
@@ -1369,7 +1405,8 @@
                 _("Datastore payload must have been inaccurate (%lld < %lld). 
Recomputing it.\n"),
                 (long long) payload,
                 (long long) -delta);
-    plugin->api->estimate_size (plugin->api->cls, &payload);
+    plugin->api->estimate_size (plugin->api->cls,
+                                &payload);
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 _("New payload: %lld\n"),
                 (long long) payload);
@@ -1474,7 +1511,10 @@
   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
    sizeof (struct UpdateMessage)},
-  {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
+  {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET,
+   sizeof (struct GetMessage) },
+  {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
+   sizeof (struct GetKeyMessage) },
   {&handle_get_replication, NULL,
    GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
    sizeof (struct GNUNET_MessageHeader)},
@@ -1555,6 +1595,10 @@
                 "Failed to obtain value from statistics service, recomputing 
it\n");
     plugin->api->estimate_size (plugin->api->cls,
                                 &payload);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                _("New payload: %lld\n"),
+                (long long) payload);
+
   }
   if (GNUNET_YES == refresh_bf)
   {
@@ -1624,7 +1668,13 @@
     expired_kill_task = NULL;
   }
   if (GNUNET_YES == do_drop)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Dropping database!\n");
     plugin->api->drop (plugin->api->cls);
+    payload = 0;
+    last_sync++;
+  }
   if (NULL != plugin)
   {
     unload_plugin (plugin);
@@ -1651,7 +1701,8 @@
     sync_stats ();
   if (NULL != stats)
   {
-    GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+    GNUNET_STATISTICS_destroy (stats,
+                               GNUNET_YES);
     stats = NULL;
   }
   GNUNET_free (quota_stat_name);

Modified: gnunet/src/datastore/perf_datastore_api.c
===================================================================
--- gnunet/src/datastore/perf_datastore_api.c   2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/datastore/perf_datastore_api.c   2016-06-24 20:17:39 UTC (rev 
37359)
@@ -332,7 +332,6 @@
                                           key,
                                           size,
                                           data, 1, 1,
-                                          TIMEOUT,
                                           &remove_next, crc));
 }
 
@@ -396,7 +395,6 @@
                                            (GNUNET_CRYPTO_QUALITY_WEAK, 
1000))),
                                          1,
                                          1,
-                                         TIMEOUT,
                                          &check_success, crc));
     break;
   case RP_CUT:
@@ -404,7 +402,6 @@
     GNUNET_assert (NULL !=
                    GNUNET_DATASTORE_get_for_replication (datastore,
                                                          1, 1,
-                                                         TIMEOUT,
                                                          &delete_value,
                                                          crc));
     break;
@@ -466,7 +463,6 @@
                                            (GNUNET_CRYPTO_QUALITY_WEAK, 
1000))),
                                          1,
                                          1,
-                                         TIMEOUT,
                                          &check_success, crc));
     break;
 
@@ -573,7 +569,6 @@
                             0, 0, 0,
                             GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_UNIT_SECONDS),
                             0, 1,
-                            TIMEOUT,
                             &run_tests, crc))
   {
     FPRINTF (stderr,

Modified: gnunet/src/datastore/plugin_datastore_mysql.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_mysql.c       2016-06-24 20:12:53 UTC 
(rev 37358)
+++ gnunet/src/datastore/plugin_datastore_mysql.c       2016-06-24 20:17:39 UTC 
(rev 37359)
@@ -180,7 +180,7 @@
 #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
   struct GNUNET_MYSQL_StatementHandle *dec_repl;
 
-#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
+#define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090"
   struct GNUNET_MYSQL_StatementHandle *get_size;
 
 #define SELECT_IT_NON_ANONYMOUS "SELECT 
type,prio,anonLevel,expire,hash,value,uid "\
@@ -221,23 +221,22 @@
  *
  * @param plugin plugin context
  * @param uid unique ID of the entry to delete
- * @return GNUNET_OK on success, GNUNET_NO if no such value exists, 
GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, 
#GNUNET_SYSERR on error
  */
 static int
-do_delete_entry (struct Plugin *plugin, unsigned long long uid)
+do_delete_entry (struct Plugin *plugin,
+                 unsigned long long uid)
 {
   int ret;
   uint64_t uid64 = (uint64_t) uid;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Deleting value %llu from gn090 table\n",
-              uid);
-
   struct GNUNET_MY_QueryParam params_delete[] = {
     GNUNET_MY_query_param_uint64 (&uid64),
     GNUNET_MY_query_param_end
   };
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Deleting value %llu from gn090 table\n",
+              uid);
   ret = GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->delete_entry_by_uid,
                                  params_delete);
@@ -247,7 +246,7 @@
   }
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Deleting value %llu from gn090 table failed\n",
-              uid);
+              (unsigned long long) uid);
   return ret;
 }
 
@@ -256,7 +255,7 @@
  * Get an estimate of how much space the database is
  * currently using.
  *
- * @param cls our "struct Plugin *"
+ * @param cls our `struct Plugin *`
  * @return number of bytes used on disk
  */
 static void
@@ -266,26 +265,33 @@
   struct Plugin *plugin = cls;
   uint64_t total;
   int ret;
-
   struct GNUNET_MY_QueryParam params_get[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_get[] = {
     GNUNET_MY_result_spec_uint64 (&total),
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get);
-  if (GNUNET_OK == ret)
-    {
-      if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, 
results_get))
-      {
-        *estimate = (unsigned long long)total;
-      }
-    }
-  else
-    *estimate = 0;
+  ret = GNUNET_MY_exec_prepared (plugin->mc,
+                                 plugin->get_size,
+                                 params_get);
+  *estimate = 0;
+  total = UINT64_MAX;
+  if ( (GNUNET_OK == ret) &&
+       (GNUNET_OK ==
+        GNUNET_MY_extract_result (plugin->get_size,
+                                  results_get)) )
+  {
+    *estimate = (unsigned long long) total;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Size estimate for MySQL payload is %lld\n",
+                (long long) total);
+    GNUNET_assert (UINT64_MAX != total);
+    GNUNET_break (GNUNET_NO ==
+                  GNUNET_MY_extract_result (plugin->get_size,
+                                            NULL));
+  }
 }
 
 
@@ -294,7 +300,7 @@
  *
  * @param cls closure
  * @param key key for the item
- * @param size number of bytes in data
+ * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
  * @param priority priority of the content
@@ -302,7 +308,7 @@
  * @param replication replication-level for the content
  * @param expiration expiration time for the content
  * @param cont continuation called with success or failure status
- * @param cont_cls continuation closure
+ * @param cont_cls closure for @a cont
  */
 static void
 mysql_plugin_put (void *cls,
@@ -318,12 +324,9 @@
                   void *cont_cls)
 {
   struct Plugin *plugin = cls;
-
   uint64_t lexpiration = expiration.abs_value_us;
   uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                UINT64_MAX);
-  unsigned long lsize = 0;
-
   struct GNUNET_HashCode vhash;
   struct GNUNET_MY_QueryParam params_insert[] = {
     GNUNET_MY_query_param_uint32 (&replication),
@@ -334,7 +337,7 @@
     GNUNET_MY_query_param_uint64 (&lrvalue),
     GNUNET_MY_query_param_auto_from_type (key),
     GNUNET_MY_query_param_auto_from_type (&vhash),
-    GNUNET_MY_query_param_fixed_size (data, lsize),
+    GNUNET_MY_query_param_fixed_size (data, size),
     GNUNET_MY_query_param_end
   };
 
@@ -344,7 +347,6 @@
     cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
     return;
   }
-  lsize = size;
   GNUNET_CRYPTO_hash (data,
                       size,
                       &vhash);
@@ -354,15 +356,28 @@
                                plugin->insert_entry,
                                params_insert))
   {
-    cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run 
failure"));
+    cont (cont_cls,
+          key,
+          size,
+          GNUNET_SYSERR,
+          _("MySQL statement run failure"));
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Inserted value `%s' with size %u into gn090 table\n",
-              GNUNET_h2s (key), (unsigned int) size);
+              GNUNET_h2s (key),
+              (unsigned int) size);
   if (size > 0)
-    plugin->env->duc (plugin->env->cls, size);
-  cont (cont_cls, key, size, GNUNET_OK, NULL);
+    plugin->env->duc (plugin->env->cls,
+                      size);
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (plugin->insert_entry,
+                                          NULL));
+  cont (cont_cls,
+        key,
+        size,
+        GNUNET_OK,
+        NULL);
 }
 
 
@@ -390,18 +405,22 @@
  * @param cons_cls continuation closure
  */
 static void
-mysql_plugin_update (void *cls, uint64_t uid, int delta,
+mysql_plugin_update (void *cls,
+                     uint64_t uid,
+                     int delta,
                      struct GNUNET_TIME_Absolute expire,
-                     PluginUpdateCont cont, void *cont_cls)
+                     PluginUpdateCont cont,
+                     void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  uint32_t idelta = (uint32_t)delta;
+  uint32_t idelta = (uint32_t) delta;
   uint64_t lexpire = expire.abs_value_us;
   int ret;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Updating value %llu adding %d to priority and maxing exp at 
%s\n",
-              (unsigned long long)uid, delta,
+              (unsigned long long) uid,
+              delta,
              GNUNET_STRINGS_absolute_time_to_string (expire));
 
   struct GNUNET_MY_QueryParam params_update[] = {
@@ -416,12 +435,21 @@
                                  plugin->update_entry,
                                  params_update);
 
-  if (ret != GNUNET_OK)
+  if (GNUNET_OK != ret)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n",
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Failed to update value %llu\n",
                 (unsigned long long) uid);
   }
-  cont (cont_cls, ret, NULL);
+  else
+  {
+    GNUNET_break (GNUNET_NO ==
+                  GNUNET_MY_extract_result (plugin->update_entry,
+                                            NULL));
+  }
+  cont (cont_cls,
+        ret,
+        NULL);
 }
 
 
@@ -432,7 +460,7 @@
  * @param plugin the plugin handle
  * @param stmt select statement to run
  * @param proc function to call on result
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  * @param params_select arguments to initialize stmt
  */
 static void
@@ -474,7 +502,7 @@
 
   ret = GNUNET_MY_extract_result (stmt,
                                   results_select);
-  if (ret <= 0)
+  if (GNUNET_OK != ret)
   {
     proc (proc_cls,
           NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
@@ -489,6 +517,9 @@
               (unsigned int) anonymity,
              GNUNET_STRINGS_absolute_time_to_string (expiration));
   GNUNET_assert (value_size < MAX_DATUM_SIZE);
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (stmt,
+                                          NULL));
   ret = proc (proc_cls,
               &key,
               value_size,
@@ -498,7 +529,8 @@
               anonymity,
               expiration,
               uid);
-  if (ret == GNUNET_NO)
+  GNUNET_MY_cleanup_result (results_select);
+  if (GNUNET_NO == ret)
   {
     do_delete_entry (plugin, uid);
     if (0 != value_size)
@@ -538,16 +570,15 @@
   struct Plugin *plugin = cls;
   int ret;
   uint64_t total;
-
-  total = -1;
   struct GNUNET_MY_ResultSpec results_get[] = {
     GNUNET_MY_result_spec_uint64 (&total),
     GNUNET_MY_result_spec_end
   };
 
-  if (type != 0)
+  total = UINT64_MAX;
+  if (0 != type)
   {
-    if (vhash != NULL)
+    if (NULL != vhash)
     {
       struct GNUNET_MY_QueryParam params_get[] = {
         GNUNET_MY_query_param_auto_from_type (key),
@@ -560,9 +591,15 @@
         GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->count_entry_by_hash_vhash_and_type,
                                  params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
-                                  results_get);
+      GNUNET_break (GNUNET_OK == ret);
+      if (GNUNET_OK == ret)
+        ret =
+          GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type,
+                                    results_get);
+      if (GNUNET_OK == ret)
+        GNUNET_break (GNUNET_NO ==
+                      GNUNET_MY_extract_result 
(plugin->count_entry_by_hash_vhash_and_type,
+                                                NULL));
     }
     else
     {
@@ -576,14 +613,20 @@
         GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->count_entry_by_hash_and_type,
                                  params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
-                                  results_get);
+      GNUNET_break (GNUNET_OK == ret);
+      if (GNUNET_OK == ret)
+        ret =
+          GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type,
+                                    results_get);
+      if (GNUNET_OK == ret)
+        GNUNET_break (GNUNET_NO ==
+                      GNUNET_MY_extract_result 
(plugin->count_entry_by_hash_and_type,
+                                                NULL));
     }
   }
   else
   {
-    if (vhash != NULL)
+    if (NULL != vhash)
     {
       struct GNUNET_MY_QueryParam params_get[] = {
         GNUNET_MY_query_param_auto_from_type (key),
@@ -595,9 +638,15 @@
         GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->count_entry_by_hash_and_vhash,
                                  params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
-                                  results_get);
+      GNUNET_break (GNUNET_OK == ret);
+      if (GNUNET_OK == ret)
+        ret =
+          GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash,
+                                    results_get);
+      if (GNUNET_OK == ret)
+        GNUNET_break (GNUNET_NO ==
+                      GNUNET_MY_extract_result 
(plugin->count_entry_by_hash_and_vhash,
+                                                NULL));
     }
     else
     {
@@ -610,12 +659,19 @@
         GNUNET_MY_exec_prepared (plugin->mc,
                                  plugin->count_entry_by_hash,
                                  params_get);
-      ret =
-        GNUNET_MY_extract_result (plugin->count_entry_by_hash,
-                                  results_get);
+      GNUNET_break (GNUNET_OK == ret);
+      if (GNUNET_OK == ret)
+        ret =
+          GNUNET_MY_extract_result (plugin->count_entry_by_hash,
+                                    results_get);
+      if (GNUNET_OK == ret)
+        GNUNET_break (GNUNET_NO ==
+                      GNUNET_MY_extract_result (plugin->count_entry_by_hash,
+                                                NULL));
     }
   }
-  if ((ret != GNUNET_OK) || (0 >= total))
+  if ( (GNUNET_OK != ret) ||
+       (0 >= total) )
   {
     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
@@ -640,7 +696,8 @@
 
       execute_select (plugin,
                       plugin->select_entry_by_hash_vhash_and_type,
-                      proc, proc_cls,
+                      proc,
+                      proc_cls,
                       params_select);
     }
     else
@@ -654,7 +711,8 @@
 
       execute_select (plugin,
                       plugin->select_entry_by_hash_and_type,
-                      proc,  proc_cls,
+                      proc,
+                      proc_cls,
                       params_select);
     }
   }
@@ -671,7 +729,8 @@
 
       execute_select (plugin,
                       plugin->select_entry_by_hash_and_vhash,
-                      proc, proc_cls,
+                      proc,
+                      proc_cls,
                       params_select);
     }
     else
@@ -684,11 +743,12 @@
 
       execute_select (plugin,
                       plugin->select_entry_by_hash,
-                      proc, proc_cls,
+                      proc,
+                      proc_cls,
                       params_select);
     }
   }
- 
+
 }
 
 
@@ -695,17 +755,19 @@
 /**
  * Get a zero-anonymity datum from the datastore.
  *
- * @param cls our "struct Plugin*"
+ * @param cls our `struct Plugin *`
  * @param offset offset of the result
  * @param type entries of which type should be considered?
  *        Use 0 for any type.
  * @param proc function to call on a matching value or NULL
- * @param proc_cls closure for iter
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset,
+mysql_plugin_get_zero_anonymity (void *cls,
+                                 uint64_t offset,
                                  enum GNUNET_BLOCK_Type type,
-                                 PluginDatumProcessor proc, void *proc_cls)
+                                 PluginDatumProcessor proc,
+                                 void *proc_cls)
 {
   struct Plugin *plugin = cls;
   uint32_t typei = (uint32_t) type;
@@ -719,8 +781,10 @@
     GNUNET_MY_query_param_end
   };
 
-  execute_select (plugin, plugin->zero_iter,
-                  proc, proc_cls,
+  execute_select (plugin,
+                  plugin->zero_iter,
+                  proc,
+                  proc_cls,
                   params_zero_iter);
 }
 
@@ -749,13 +813,13 @@
 
 
 /**
- * Wrapper for the processor for 'mysql_plugin_get_replication'.
+ * Wrapper for the processor for #mysql_plugin_get_replication().
  * Decrements the replication counter and calls the original
  * iterator.
  *
  * @param cls closure
  * @param key key for the content
- * @param size number of bytes in data
+ * @param size number of bytes in @a data
  * @param data content stored
  * @param type type of the content
  * @param priority priority of the content
@@ -763,19 +827,18 @@
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
 repl_proc (void *cls,
-           const struct GNUNET_HashCode * key, 
+           const struct GNUNET_HashCode *key,
            uint32_t size,
-           const void *data, 
-           enum GNUNET_BLOCK_Type type, 
+           const void *data,
+           enum GNUNET_BLOCK_Type type,
            uint32_t priority,
-           uint32_t anonymity, 
+           uint32_t anonymity,
            struct GNUNET_TIME_Absolute expiration,
            uint64_t uid)
 {
@@ -784,21 +847,26 @@
   int ret;
   int iret;
 
-  ret =
-      rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
-                expiration, uid);
+  ret = rc->proc (rc->proc_cls,
+                  key,
+                  size,
+                  data,
+                  type,
+                  priority,
+                  anonymity,
+                  expiration,
+                  uid);
   if (NULL != key)
   {
-      struct GNUNET_MY_QueryParam params_proc[] = {
-        GNUNET_MY_query_param_uint64 (&uid),
-        GNUNET_MY_query_param_end
-      };
+    struct GNUNET_MY_QueryParam params_proc[] = {
+      GNUNET_MY_query_param_uint64 (&uid),
+      GNUNET_MY_query_param_end
+    };
 
-      iret =
-      GNUNET_MY_exec_prepared (plugin->mc,
-                               plugin->dec_repl,
-                               params_proc);
-    if (iret == GNUNET_SYSERR)
+    iret = GNUNET_MY_exec_prepared (plugin->mc,
+                                    plugin->dec_repl,
+                                    params_proc);
+    if (GNUNET_SYSERR == iret)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Failed to reduce replication counter\n");
@@ -813,35 +881,29 @@
  * Get a random item for replication.  Returns a single, not expired,
  * random item from those with the highest replication counters.  The
  * item's replication counter is decremented by one IF it was positive
- * before.  Call 'proc' with all values ZERO or NULL if the datastore
+ * before.  Call @a proc with all values ZERO or NULL if the datastore
  * is empty.
  *
  * @param cls closure
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc,
+mysql_plugin_get_replication (void *cls,
+                              PluginDatumProcessor proc,
                               void *proc_cls)
 {
   struct Plugin *plugin = cls;
   uint64_t rvalue;
   uint32_t repl;
-
   struct ReplCtx rc;
-  rc.plugin = plugin;
-  rc.proc = proc;
-  rc.proc_cls = proc_cls;
-
   struct GNUNET_MY_QueryParam params_get[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_get[] = {
     GNUNET_MY_result_spec_uint32 (&repl),
     GNUNET_MY_result_spec_end
   };
-
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_uint32 (&repl),
     GNUNET_MY_query_param_uint64 (&rvalue),
@@ -850,27 +912,36 @@
     GNUNET_MY_query_param_end
   };
 
+  rc.plugin = plugin;
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+
   if (1 !=
-      GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get))
+      GNUNET_MY_exec_prepared (plugin->mc,
+                               plugin->max_repl,
+                               params_get))
   {
     proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
     return;
   }
 
-  if (1 !=
-      GNUNET_MY_extract_result (plugin->max_repl, results_get))
+  if (GNUNET_OK !=
+      GNUNET_MY_extract_result (plugin->max_repl,
+                                results_get))
   {
-      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      return;
+    proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+    return;
   }
+  GNUNET_break (GNUNET_NO ==
+                GNUNET_MY_extract_result (plugin->max_repl,
+                                          NULL));
+  rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                     UINT64_MAX);
 
-  rvalue =
-      (unsigned long long) GNUNET_CRYPTO_random_u64 
(GNUNET_CRYPTO_QUALITY_WEAK,
-                                                     UINT64_MAX);
-
-  execute_select (plugin, 
+  execute_select (plugin,
                   plugin->select_replication,
-                  &repl_proc, &rc,
+                  &repl_proc,
+                  &rc,
                   params_select);
 }
 
@@ -880,69 +951,91 @@
  *
  * @param cls closure
  * @param proc function to call on each key
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
 mysql_plugin_get_keys (void *cls,
-                                        PluginKeyProcessor proc,
-                                        void *proc_cls)
+                       PluginKeyProcessor proc,
+                       void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  char *query = "SELECT hash FROM gn090";
   int ret;
   MYSQL_STMT *statement;
-  struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL;
-
-
+  unsigned int cnt;
   struct GNUNET_HashCode key;
-
-  statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
-
-  statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc,
-                                                             query);
-  GNUNET_assert (proc != NULL);
-
+  struct GNUNET_HashCode last;
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_end
   };
-
   struct GNUNET_MY_ResultSpec results_select[] = {
     GNUNET_MY_result_spec_auto_from_type (&key),
     GNUNET_MY_result_spec_end
   };
 
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc,
-                              statements_handle_select,
-                              params_select))
+  GNUNET_assert (NULL != proc);
+  statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys);
+  if (GNUNET_OK !=
+      GNUNET_MY_exec_prepared (plugin->mc,
+                               plugin->get_all_keys,
+                               params_select))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("`%s' for `%s' failed at %s:%d with error: %s\n"),
-                "mysql_stmt_execute", query, __FILE__, __LINE__,
+                "mysql_stmt_execute",
+                GET_ALL_KEYS,
+                __FILE__,
+                __LINE__,
                 mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
     proc (proc_cls, NULL, 0);
     return;
   }
-
-  ret = GNUNET_MY_extract_result (statements_handle_select,
-                                  results_select);
-
-  if (ret != MYSQL_NO_DATA)
+  ret = GNUNET_YES;
+  cnt = 0;
+  while (ret == GNUNET_YES)
   {
+    ret = GNUNET_MY_extract_result (plugin->get_all_keys,
+                                    results_select);
+    if (0 != memcmp (&last,
+                     &key,
+                     sizeof (key)))
+    {
+      if (0 != cnt)
+        proc (proc_cls,
+              &last,
+              cnt);
+      cnt = 1;
+      last = key;
+    }
+    else
+    {
+      cnt++;
+    }
+  }
+  if (0 != cnt)
+    proc (proc_cls,
+          &last,
+          cnt);
+  /* finally, let app know we are done */
+  proc (proc_cls,
+        NULL,
+        0);
+  if (GNUNET_SYSERR == ret)
+  {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("`%s' failed at %s:%d with error: %s\n"),
-                    "mysql_stmt_fetch", __FILE__, __LINE__,
-                    mysql_stmt_error (statement));
+                "mysql_stmt_fetch",
+                __FILE__,
+                __LINE__,
+                mysql_stmt_error (statement));
     GNUNET_MYSQL_statements_invalidate (plugin->mc);
     return;
   }
-
-  mysql_stmt_reset (statement);
 }
 
 
 /**
- * Context for 'expi_proc' function.
+ * Context for #expi_proc() function.
  */
 struct ExpiCtx
 {
@@ -958,7 +1051,7 @@
   PluginDatumProcessor proc;
 
   /**
-   * Closure for proc.
+   * Closure for @e proc.
    */
   void *proc_cls;
 };
@@ -966,7 +1059,7 @@
 
 
 /**
- * Wrapper for the processor for 'mysql_plugin_get_expiration'.
+ * Wrapper for the processor for #mysql_plugin_get_expiration().
  * If no expired value was found, we do a second query for
  * low-priority content.
  *
@@ -980,25 +1073,23 @@
  * @param expiration expiration time for the content
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
- *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
  *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ *         #GNUNET_NO to delete the item and continue (if supported)
  */
 static int
 expi_proc (void *cls,
-           const struct GNUNET_HashCode * key,
+           const struct GNUNET_HashCode *key,
            uint32_t size,
            const void *data,
            enum GNUNET_BLOCK_Type type,
            uint32_t priority,
-           uint32_t anonymity, 
+           uint32_t anonymity,
            struct GNUNET_TIME_Absolute expiration,
            uint64_t uid)
 {
   struct ExpiCtx *rc = cls;
   struct Plugin *plugin = rc->plugin;
-
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_end
   };
@@ -1005,44 +1096,55 @@
 
   if (NULL == key)
   {
-    execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls,
+    execute_select (plugin,
+                    plugin->select_priority,
+                    rc->proc,
+                    rc->proc_cls,
                     params_select);
     return GNUNET_SYSERR;
   }
-  return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
-                   expiration, uid);
+  return rc->proc (rc->proc_cls,
+                   key,
+                   size,
+                   data,
+                   type,
+                   priority,
+                   anonymity,
+                   expiration,
+                   uid);
 }
 
 
 /**
  * Get a random item for expiration.
- * Call 'proc' with all values ZERO or NULL if the datastore is empty.
+ * Call @a proc with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
  * @param proc function to call the value (once only).
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  */
 static void
-mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
+mysql_plugin_get_expiration (void *cls,
+                             PluginDatumProcessor proc,
                              void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  uint64_t nt;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_MY_QueryParam params_select[] = {
+    GNUNET_MY_query_param_absolute_time (&now),
+    GNUNET_MY_query_param_end
+  };
   struct ExpiCtx rc;
 
   rc.plugin = plugin;
   rc.proc = proc;
   rc.proc_cls = proc_cls;
-  nt = GNUNET_TIME_absolute_get ().abs_value_us;
-  
-  struct GNUNET_MY_QueryParam params_select[] = {
-    GNUNET_MY_query_param_uint64 (&nt),
-    GNUNET_MY_query_param_end
-  };
-
-  execute_select (plugin, plugin->select_expiration, expi_proc, &rc,
+  now = GNUNET_TIME_absolute_get ();
+  execute_select (plugin,
+                  plugin->select_expiration,
+                  expi_proc,
+                  &rc,
                   params_select);
-
 }
 
 
@@ -1049,7 +1151,7 @@
 /**
  * Drop database.
  *
- * @param cls the "struct Plugin*"
+ * @param cls the `struct Plugin *`
  */
 static void
 mysql_plugin_drop (void *cls)
@@ -1056,7 +1158,9 @@
 {
   struct Plugin *plugin = cls;
 
-  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090"))
+  if (GNUNET_OK !=
+      GNUNET_MYSQL_statement_run (plugin->mc,
+                                  "DROP TABLE gn090"))
     return;                     /* error */
   plugin->env->duc (plugin->env->cls, 0);
 }
@@ -1065,8 +1169,8 @@
 /**
  * Entry point for the plugin.
  *
- * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
- * @return our "struct Plugin*"
+ * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *`
+ * @return our `struct Plugin *`
  */
 void *
 libgnunet_plugin_datastore_mysql_init (void *cls)
@@ -1077,7 +1181,8 @@
 
   plugin = GNUNET_new (struct Plugin);
   plugin->env = env;
-  plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql");
+  plugin->mc = GNUNET_MYSQL_context_create (env->cfg,
+                                            "datastore-mysql");
   if (NULL == plugin->mc)
   {
     GNUNET_free (plugin);
@@ -1155,7 +1260,8 @@
 
 /**
  * Exit point from the plugin.
- * @param cls our "struct Plugin*"
+ *
+ * @param cls our `struct Plugin *`
  * @return always NULL
  */
 void *

Modified: gnunet/src/datastore/test_datastore_api.c
===================================================================
--- gnunet/src/datastore/test_datastore_api.c   2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/datastore/test_datastore_api.c   2016-06-24 20:17:39 UTC (rev 
37359)
@@ -412,7 +412,7 @@
     GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i),
                           get_data (crc->i), get_type (crc->i),
                           get_priority (crc->i), get_anonymity (crc->i), 0,
-                          get_expiration (crc->i), 1, 1, TIMEOUT,
+                          get_expiration (crc->i), 1, 1,
                           &check_success, crc);
     crc->i++;
     if (crc->i == ITERATIONS)
@@ -423,10 +423,17 @@
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Executing GET number %u\n",
                 crc->i);
-    GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-    GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key,
-                              get_type (crc->i), 1, 1, TIMEOUT,
-                              &check_value, crc);
+    GNUNET_CRYPTO_hash (&crc->i,
+                        sizeof (int),
+                        &crc->key);
+    GNUNET_DATASTORE_get_key (datastore,
+                              crc->offset,
+                              &crc->key,
+                              get_type (crc->i),
+                              1,
+                              1,
+                              &check_value,
+                              crc);
     break;
   case RP_DEL:
     crc->i--;
@@ -436,9 +443,14 @@
     crc->data = NULL;
     GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
     GNUNET_assert (NULL !=
-                   GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key,
-                                             get_type (crc->i), 1, 1, TIMEOUT,
-                                             &delete_value, crc));
+                   GNUNET_DATASTORE_get_key (datastore,
+                                             crc->offset,
+                                             &crc->key,
+                                             get_type (crc->i),
+                                             1,
+                                             1,
+                                             &delete_value,
+                                             crc));
     break;
   case RP_DO_DEL:
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -455,7 +467,7 @@
     }
     GNUNET_assert (NULL !=
                    GNUNET_DATASTORE_remove (datastore, &crc->key, crc->size,
-                                            crc->data, 1, 1, TIMEOUT,
+                                            crc->data, 1, 1,
                                             &check_success, crc));
     break;
   case RP_DELVALIDATE:
@@ -466,7 +478,7 @@
     GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
     GNUNET_assert (NULL !=
                    GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key,
-                                             get_type (crc->i), 1, 1, TIMEOUT,
+                                             get_type (crc->i), 1, 1,
                                              &check_nothing, crc));
     break;
   case RP_RESERVE:
@@ -479,7 +491,7 @@
     GNUNET_DATASTORE_put (datastore, crc->rid, &crc->key, get_size (42),
                           get_data (42), get_type (42), get_priority (42),
                           get_anonymity (42), 0, get_expiration (42), 1, 1,
-                          TIMEOUT, &check_success, crc);
+                          &check_success, crc);
     break;
   case RP_PUT_MULTIPLE_NEXT:
     crc->phase = RP_GET_MULTIPLE;
@@ -493,7 +505,6 @@
                           0,
                           get_expiration (43),
                           1, 1,
-                          TIMEOUT,
                           &check_success, crc);
     break;
   case RP_GET_MULTIPLE:
@@ -502,7 +513,6 @@
                                              crc->offset,
                                              &crc->key,
                                              get_type (42), 1, 1,
-                                             TIMEOUT,
                                              &check_multiple, crc));
     break;
   case RP_GET_MULTIPLE_NEXT:
@@ -512,7 +522,6 @@
                                              &crc->key,
                                              get_type (42),
                                              1, 1,
-                                             TIMEOUT,
                                              &check_multiple, crc));
     break;
   case RP_UPDATE:
@@ -521,7 +530,7 @@
     GNUNET_DATASTORE_update (datastore,
                              crc->uid, 100,
                              get_expiration (42), 1,
-                             1, TIMEOUT,
+                             1,
                              &check_success, crc);
     break;
   case RP_UPDATE_VALIDATE:
@@ -531,7 +540,6 @@
                                              &crc->key,
                                              get_type (42),
                                              1, 1,
-                                             TIMEOUT,
                                              &check_update, crc));
     break;
   case RP_DONE:
@@ -631,7 +639,6 @@
                             GNUNET_TIME_relative_to_absolute
                             (GNUNET_TIME_UNIT_SECONDS),
                             0, 1,
-                            TIMEOUT,
                             &run_tests, crc))
   {
     FPRINTF (stderr,

Modified: gnunet/src/datastore/test_datastore_api_management.c
===================================================================
--- gnunet/src/datastore/test_datastore_api_management.c        2016-06-24 
20:12:53 UTC (rev 37358)
+++ gnunet/src/datastore/test_datastore_api_management.c        2016-06-24 
20:17:39 UTC (rev 37359)
@@ -193,10 +193,18 @@
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "PUT",
                 crc->i);
     GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-    GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i),
-                          get_data (crc->i), get_type (crc->i),
-                          get_priority (crc->i), get_anonymity (crc->i), 0,
-                          get_expiration (crc->i), 1, 1, TIMEOUT,
+    GNUNET_DATASTORE_put (datastore,
+                          0,
+                          &crc->key,
+                          get_size (crc->i),
+                          get_data (crc->i),
+                          get_type (crc->i),
+                          get_priority (crc->i),
+                          get_anonymity (crc->i),
+                          0,
+                          get_expiration (crc->i),
+                          1,
+                          1,
                           &check_success, crc);
     crc->i++;
     if (crc->i == ITERATIONS)
@@ -213,7 +221,8 @@
                 crc->i);
     GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
     GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
-                              get_type (crc->i), 1, 1, TIMEOUT, &check_value,
+                              get_type (crc->i), 1, 1,
+                              &check_value,
                               crc);
     break;
   case RP_GET_FAIL:
@@ -221,7 +230,8 @@
                 crc->i);
     GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
     GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key,
-                              get_type (crc->i), 1, 1, TIMEOUT, &check_nothing,
+                              get_type (crc->i), 1, 1,
+                              &check_nothing,
                               crc);
     break;
   case RP_DONE:
@@ -266,11 +276,18 @@
   now = GNUNET_TIME_absolute_get ();
   datastore = GNUNET_DATASTORE_connect (cfg);
   if (NULL ==
-      GNUNET_DATASTORE_put (datastore, 0, &zkey, 4, "TEST",
-                            GNUNET_BLOCK_TYPE_TEST, 0, 0, 0,
-                            GNUNET_TIME_relative_to_absolute
-                            (GNUNET_TIME_UNIT_SECONDS), 0, 1,
-                            GNUNET_TIME_UNIT_MINUTES, &run_tests, crc))
+      GNUNET_DATASTORE_put (datastore,
+                            0,
+                            &zkey,
+                            4,
+                            "TEST",
+                            GNUNET_BLOCK_TYPE_TEST,
+                            0, 0, 0,
+                            GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_UNIT_SECONDS),
+                            0,
+                            1,
+                            &run_tests,
+                            crc))
   {
     FPRINTF (stderr, "%s",  "Test 'put' operation failed.\n");
     GNUNET_free (crc);

Modified: gnunet/src/datastore/test_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/test_plugin_datastore.c        2016-06-24 20:12:53 UTC 
(rev 37358)
+++ gnunet/src/datastore/test_plugin_datastore.c        2016-06-24 20:17:39 UTC 
(rev 37359)
@@ -109,11 +109,14 @@
 
   if (GNUNET_OK != status)
   {
-    FPRINTF (stderr, "ERROR: `%s'\n", msg);
+    FPRINTF (stderr,
+             "ERROR: `%s'\n",
+             msg);
   }
   else
   {
-    crc->api->estimate_size (crc->api->cls, &cs);
+    crc->api->estimate_size (crc->api->cls,
+                             &cs);
     GNUNET_assert (os <= cs);
     os = cs;
     stored_bytes += size;
@@ -184,22 +187,30 @@
 
 
 static int
-iterate_one_shot (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
-                  const void *data, enum GNUNET_BLOCK_Type type,
-                  uint32_t priority, uint32_t anonymity,
-                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+iterate_one_shot (void *cls,
+                  const struct GNUNET_HashCode *key,
+                  uint32_t size,
+                  const void *data,
+                  enum GNUNET_BLOCK_Type type,
+                  uint32_t priority,
+                  uint32_t anonymity,
+                  struct GNUNET_TIME_Absolute expiration,
+                  uint64_t uid)
 {
   struct CpsRunContext *crc = cls;
 
-  GNUNET_assert (key != NULL);
+  GNUNET_assert (NULL != key);
   guid = uid;
   crc->phase++;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Found result type=%u, priority=%u, size=%u, expire=%s, key %s\n",
-             type, priority, size,
+             (unsigned int) type,
+              (unsigned int) priority,
+              (unsigned int) size,
              GNUNET_STRINGS_absolute_time_to_string (expiration),
              GNUNET_h2s (key));
-  GNUNET_SCHEDULER_add_now (&test, crc);
+  GNUNET_SCHEDULER_add_now (&test,
+                            crc);
   return GNUNET_OK;
 }
 
@@ -219,11 +230,14 @@
   char *libname;
 
   if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
+      GNUNET_CONFIGURATION_get_value_string (cfg,
+                                             "DATASTORE",
+                                             "DATABASE",
                                              &name))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("No `%s' specified for `%s' in configuration!\n"), 
"DATABASE",
+                _("No `%s' specified for `%s' in configuration!\n"),
+                "DATABASE",
                 "DATASTORE");
     return;
   }
@@ -290,8 +304,16 @@
       break;
     }
     gen_key (5, &key);
-    crc->api->get_key (crc->api->cls, crc->offset++, &key, NULL,
-                       GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Looking for %s\n",
+                GNUNET_h2s (&key));
+    crc->api->get_key (crc->api->cls,
+                       crc->offset++,
+                       &key,
+                       NULL,
+                       GNUNET_BLOCK_TYPE_ANY,
+                       &iterate_one_shot,
+                       crc);
     break;
   case RP_UPDATE:
     crc->api->update (crc->api->cls,

Modified: gnunet/src/datastore/test_plugin_datastore_data_mysql.conf
===================================================================
--- gnunet/src/datastore/test_plugin_datastore_data_mysql.conf  2016-06-24 
20:12:53 UTC (rev 37358)
+++ gnunet/src/datastore/test_plugin_datastore_data_mysql.conf  2016-06-24 
20:17:39 UTC (rev 37359)
@@ -6,5 +6,4 @@
 DATABASE = mysql
 
 [datastore-mysql]
-DATABASE = gnunet
-
+DATABASE = gnunetcheck

Modified: gnunet/src/fs/fs_publish.c
===================================================================
--- gnunet/src/fs/fs_publish.c  2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/fs/fs_publish.c  2016-06-24 20:17:39 UTC (rev 37359)
@@ -266,7 +266,6 @@
   {
     pc->qre =
         GNUNET_DATASTORE_release_reserve (pc->dsh, pc->rid, UINT_MAX, UINT_MAX,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
                                           &finish_release_reserve, pc);
   }
   else
@@ -526,7 +525,6 @@
                               p->bo.replication_level,
                               p->bo.expiration_time,
                               -2, 1,
-                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                               &ds_put_cont, pc);
     return;
   }
@@ -547,7 +545,6 @@
                             p->bo.replication_level,
                             p->bo.expiration_time,
                             -2, 1,
-                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                             &ds_put_cont,
                             pc);
 }

Modified: gnunet/src/fs/fs_publish_ublock.c
===================================================================
--- gnunet/src/fs/fs_publish_ublock.c   2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/fs/fs_publish_ublock.c   2016-06-24 20:17:39 UTC (rev 37359)
@@ -294,7 +294,6 @@
                             bo->replication_level,
                             bo->expiration_time,
                             -2, 1,
-                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                             &ublock_put_cont, uc);
   }
   else

Modified: gnunet/src/fs/fs_unindex.c
===================================================================
--- gnunet/src/fs/fs_unindex.c  2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/fs/fs_unindex.c  2016-06-24 20:17:39 UTC (rev 37359)
@@ -215,7 +215,7 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending REMOVE request to DATASTORE service\n");
   GNUNET_DATASTORE_remove (uc->dsh, &chk->query, size, data, -2, 1,
-                           GNUNET_CONSTANTS_SERVICE_TIMEOUT, &process_cont, 
uc);
+                           &process_cont, uc);
   uc->chk = *chk;
 }
 
@@ -552,7 +552,6 @@
                                      data,
                                     0 /* priority */,
                                      1 /* queue size */,
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
                                     &continue_after_remove,
                                     uc);
   return;
@@ -563,7 +562,6 @@
                                      GNUNET_BLOCK_TYPE_FS_UBLOCK,
                                      0 /* priority */,
                                       1 /* queue size */,
-                                     GNUNET_TIME_UNIT_FOREVER_REL,
                                      &process_kblock_for_unindex,
                                      uc);
 }
@@ -615,7 +613,6 @@
                                      GNUNET_BLOCK_TYPE_FS_UBLOCK,
                                      0 /* priority */,
                                       1 /* queue size */,
-                                     GNUNET_TIME_UNIT_FOREVER_REL,
                                      &process_kblock_for_unindex,
                                      uc);
 }

Modified: gnunet/src/fs/gnunet-service-fs_cadet_server.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cadet_server.c      2016-06-24 20:12:53 UTC 
(rev 37358)
+++ gnunet/src/fs/gnunet-service-fs_cadet_server.c      2016-06-24 20:17:39 UTC 
(rev 37359)
@@ -445,7 +445,6 @@
                                     ntohl (sqm->type),
                                     0 /* priority */,
                                     GSF_datastore_queue_size,
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
                                     &handle_datastore_reply, sc);
   if (NULL == sc->qe)
   {

Modified: gnunet/src/fs/gnunet-service-fs_indexing.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_indexing.c  2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/fs/gnunet-service-fs_indexing.c  2016-06-24 20:17:39 UTC (rev 
37359)
@@ -523,7 +523,7 @@
   {
     GNUNET_break (0);
     GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1,
-                             GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL);
+                             &remove_cont, NULL);
     return GNUNET_SYSERR;
   }
   odb = (const struct OnDemandBlock *) data;
@@ -542,7 +542,7 @@
                               ("# index blocks removed: original file 
inaccessible"),
                               1, GNUNET_YES);
     GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1,
-                             GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL);
+                             &remove_cont, NULL);
     return GNUNET_SYSERR;
   }
   if ((NULL ==
@@ -560,7 +560,7 @@
     if (fh != NULL)
       GNUNET_DISK_file_close (fh);
     GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1,
-                             GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL);
+                             &remove_cont, NULL);
     return GNUNET_SYSERR;
   }
   GNUNET_DISK_file_close (fh);
@@ -574,7 +574,7 @@
                 _("Indexed file `%s' changed at offset %llu\n"), fn,
                 (unsigned long long) off);
     GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1,
-                             GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL);
+                             &remove_cont, NULL);
     return GNUNET_SYSERR;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2016-06-24 20:17:39 UTC (rev 
37359)
@@ -1149,7 +1149,6 @@
                               1 /* anonymity */ ,
                               0 /* replication */ ,
                               exp, 1 + prq.priority, MAX_DATASTORE_QUEUE,
-                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                               &put_migration_continuation, pmc))
     {
       put_migration_continuation (pmc,
@@ -1472,7 +1471,6 @@
                                     pr->public_data.options)) ? UINT_MAX :
                                   GSF_datastore_queue_size
                                   /* max queue size */ ,
-                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
     if (NULL != pr->qe)
       return;                   /* we're done */
@@ -1492,7 +1490,7 @@
   {
     GNUNET_break (0);
     GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
-                             GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
+                             NULL, NULL);
     pr->qe_start = GNUNET_TIME_absolute_get ();
     pr->warn_task =
         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
@@ -1512,7 +1510,6 @@
                                     pr->public_data.options)) ? UINT_MAX :
                                   GSF_datastore_queue_size
                                   /* max queue size */ ,
-                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
     if (NULL == pr->qe)
     {
@@ -1574,7 +1571,6 @@
                                   public_data.options)) ? UINT_MAX :
                                 GSF_datastore_queue_size
                                 /* max queue size */ ,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
   /* check if we successfully queued another datastore request;
    * if so, return, otherwise call our continuation (if we have
@@ -1681,7 +1677,6 @@
                                   public_data.options)) ? UINT_MAX :
                                 GSF_datastore_queue_size
                                 /* max queue size */ ,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
   if (NULL != pr->qe)
     return;
@@ -1795,7 +1790,6 @@
                               prq.priority, 1 /* anonymity */ ,
                               0 /* replication */ ,
                               expiration, 1 + prq.priority, 
MAX_DATASTORE_QUEUE,
-                              GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                               &put_migration_continuation, pmc))
     {
       put_migration_continuation (pmc,

Modified: gnunet/src/fs/gnunet-service-fs_push.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_push.c      2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/fs/gnunet-service-fs_push.c      2016-06-24 20:17:39 UTC (rev 
37359)
@@ -582,7 +582,6 @@
   value_found = GNUNET_NO;
   mig_qe =
     GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
                                           &process_migration_content, NULL);
   if (NULL == mig_qe)
     consider_gathering ();

Modified: gnunet/src/fs/gnunet-service-fs_put.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_put.c       2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/fs/gnunet-service-fs_put.c       2016-06-24 20:17:39 UTC (rev 
37359)
@@ -225,7 +225,6 @@
   po->dht_qe =
       GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
                                            UINT_MAX,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
                                            po->dht_put_type,
                                            &process_dht_put_content, po);
   if (NULL == po->dht_qe)

Index: gnunet/src/identity-provider
===================================================================
--- gnunet/src/identity-provider        2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/identity-provider        2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/identity-provider
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+gnunet-service-identity-provider
 .deps
 Makefile
 Makefile.in
Modified: gnunet/src/include/gnunet_datastore_service.h
===================================================================
--- gnunet/src/include/gnunet_datastore_service.h       2016-06-24 20:12:53 UTC 
(rev 37358)
+++ gnunet/src/include/gnunet_datastore_service.h       2016-06-24 20:17:39 UTC 
(rev 37359)
@@ -92,7 +92,7 @@
  * operation.
  *
  * @param cls closure
- * @param success #GNUNET_SYSERR on failure (including timeout/queue drop)
+ * @param success #GNUNET_SYSERR on failure
  *                #GNUNET_NO if content was already there
  *                #GNUNET_YES (or other positive value) on success
  * @param min_expiration minimum expiration time required for 0-priority 
content to be stored
@@ -149,7 +149,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -169,7 +168,6 @@
                       struct GNUNET_TIME_Absolute expiration,
                       unsigned int queue_priority,
                       unsigned int max_queue_size,
-                      struct GNUNET_TIME_Relative timeout,
                       GNUNET_DATASTORE_ContinuationWithStatus cont,
                       void *cont_cls);
 
@@ -188,7 +186,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -199,7 +196,6 @@
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
                                   uint32_t rid, unsigned int queue_priority,
                                   unsigned int max_queue_size,
-                                  struct GNUNET_TIME_Relative timeout,
                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
                                   void *cont_cls);
 
@@ -214,7 +210,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -228,7 +223,6 @@
                          struct GNUNET_TIME_Absolute expiration,
                          unsigned int queue_priority,
                          unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
                          GNUNET_DATASTORE_ContinuationWithStatus cont,
                          void *cont_cls);
 
@@ -246,7 +240,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
@@ -260,7 +253,6 @@
                          const void *data,
                          unsigned int queue_priority,
                          unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
                          GNUNET_DATASTORE_ContinuationWithStatus cont,
                          void *cont_cls);
 
@@ -305,7 +297,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param proc function to call on a matching value;
  *        or with a NULL value if no datum matches
  * @param proc_cls closure for @a proc
@@ -319,7 +310,6 @@
                           enum GNUNET_BLOCK_Type type,
                           unsigned int queue_priority,
                           unsigned int max_queue_size,
-                          struct GNUNET_TIME_Relative timeout,
                           GNUNET_DATASTORE_DatumProcessor proc,
                           void *proc_cls);
 
@@ -339,7 +329,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param type allowed type for the operation (never zero)
  * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
@@ -353,7 +342,6 @@
                                      uint64_t offset,
                                      unsigned int queue_priority,
                                      unsigned int max_queue_size,
-                                     struct GNUNET_TIME_Relative timeout,
                                      enum GNUNET_BLOCK_Type type,
                                      GNUNET_DATASTORE_DatumProcessor proc,
                                      void *proc_cls);
@@ -370,7 +358,6 @@
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
@@ -382,7 +369,6 @@
 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
                                       unsigned int queue_priority,
                                       unsigned int max_queue_size,
-                                      struct GNUNET_TIME_Relative timeout,
                                       GNUNET_DATASTORE_DatumProcessor proc,
                                       void *proc_cls);
 

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2016-06-24 20:12:53 UTC (rev 
37358)
+++ gnunet/src/include/gnunet_protocols.h       2016-06-24 20:17:39 UTC (rev 
37359)
@@ -459,7 +459,12 @@
  */
 #define GNUNET_MESSAGE_TYPE_DATASTORE_DROP 103
 
+/**
+ * Message sent by datastore client to get data by key.
+ */
+#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY 104
 
+
 
/*******************************************************************************
  * FS message types
  
******************************************************************************/

Index: gnunet/src/json
===================================================================
--- gnunet/src/json     2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/json     2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/json
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+test_json
 .deps
 Makefile
 Makefile.in
Index: gnunet/src/jsonapi
===================================================================
--- gnunet/src/jsonapi  2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/jsonapi  2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/jsonapi
___________________________________________________________________
Added: svn:ignore
## -0,0 +1,4 ##
+test_jsonapi
+Makefile.in
+Makefile
+.deps
Index: gnunet/src/my
===================================================================
--- gnunet/src/my       2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/my       2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/my
___________________________________________________________________
Added: svn:ignore
## -0,0 +1,4 ##
+test_my
+Makefile.in
+Makefile
+.deps
Index: gnunet/src/nat
===================================================================
--- gnunet/src/nat      2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/nat      2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/nat
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+test_stun
 test-nat-test
 test_nat_mini
 test_nat_test
Index: gnunet/src/peerstore
===================================================================
--- gnunet/src/peerstore        2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/peerstore        2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/peerstore
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+test_plugin_peerstore_sqlite
 Makefile.in
 Makefile
 gnunet-peerstore
Index: gnunet/src/pq
===================================================================
--- gnunet/src/pq       2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/pq       2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/pq
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+test_pq
 .deps
 Makefile
 Makefile.in
Index: gnunet/src/psycutil
===================================================================
--- gnunet/src/psycutil 2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/psycutil 2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/psycutil
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+test_psyc_env
 .deps
 Makefile
 Makefile.in
Index: gnunet/src/social
===================================================================
--- gnunet/src/social   2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/social   2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/social
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,4 ##
+gnunet-social
 gnunet-service-social
 test_social
 social.conf
Index: gnunet/src/testbed-logger
===================================================================
--- gnunet/src/testbed-logger   2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/testbed-logger   2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/testbed-logger
___________________________________________________________________
Added: svn:ignore
## -0,0 +1,6 ##
+testbed-logger.conf
+test_testbed_logger_api
+Makefile.in
+Makefile
+gnunet-service-testbed-logger
+.deps
Index: gnunet/src/util
===================================================================
--- gnunet/src/util     2016-06-24 20:12:53 UTC (rev 37358)
+++ gnunet/src/util     2016-06-24 20:17:39 UTC (rev 37359)

Property changes on: gnunet/src/util
___________________________________________________________________
Modified: svn:ignore
## -1,3 +1,5 ##
+test_socks.nc
+test_crypto_kdf
 perf_crypto_rsa
 perf_crypto_ecc_dlog
 test_crypto_ecc_dlog



reply via email to

[Prev in Thread] Current Thread [Next in Thread]