gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27485 - in gnunet/src: consensus dv include mesh set strea


From: gnunet
Subject: [GNUnet-SVN] r27485 - in gnunet/src: consensus dv include mesh set stream util
Date: Wed, 19 Jun 2013 12:48:54 +0200

Author: dold
Date: 2013-06-19 12:48:54 +0200 (Wed, 19 Jun 2013)
New Revision: 27485

Added:
   gnunet/src/set/gnunet-set-ibf-profiler.c
   gnunet/src/set/gnunet-set-profiler.c
Removed:
   gnunet/src/set/gnunet-set-ibf.c
   gnunet/src/set/gnunet-set.c
Modified:
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/dv/gnunet-service-dv.c
   gnunet/src/include/gnunet_container_lib.h
   gnunet/src/include/gnunet_mesh2_service.h
   gnunet/src/include/gnunet_mq_lib.h
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/include/gnunet_stream_lib.h
   gnunet/src/mesh/mesh2_api.c
   gnunet/src/set/Makefile.am
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-service-set.h
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/ibf.c
   gnunet/src/set/ibf.h
   gnunet/src/set/set_api.c
   gnunet/src/set/strata_estimator.c
   gnunet/src/set/strata_estimator.h
   gnunet/src/set/test_set_api.c
   gnunet/src/stream/stream_api.c
   gnunet/src/util/mq.c
   gnunet/src/util/test_mq.c
   gnunet/src/util/test_mq_client.c
Log:
- opaque mq structs
- mq for mesh
- faster hashing for IBFs
- mesh replaces stream in set
- new set profiler (work in progress)


Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/consensus/consensus_api.c        2013-06-19 10:48:54 UTC (rev 
27485)
@@ -33,38 +33,7 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
 
-/**
- * Actions that can be queued.
- */
-struct QueuedMessage
-{
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *next;
 
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *prev;
-
-  /**
-   * The actual queued message.
-   */
-  struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Will be called after transmit, if not NULL
-   */
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-
-  /**
-   * Closure for idc
-   */
-  void *idc_cls;
-};
-
-
 /**
  * Handle for the service.
  */
@@ -106,21 +75,11 @@
   struct GNUNET_PeerIdentity **peers;
 
   /**
-   * Currently active transmit request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
    * GNUNES_YES iff the join message has been sent to the service.
    */
   int joined;
 
   /**
-   * Closure for the insert done callback.
-   */
-  void *idc_cls;
-
-  /**
    * Called when the conclude operation finishes or fails.
    */
   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -135,109 +94,36 @@
    */
   struct GNUNET_TIME_Absolute conclude_deadline;
 
-  unsigned int conclude_min_size;
-
-  struct QueuedMessage *messages_head;
-
-  struct QueuedMessage *messages_tail;
-
+  /**
+   * Message queue for the client.
+   */
+  struct GNUNET_MQ_Handle *mq;
 };
 
-
-
 /**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
+ * FIXME: this should not bee necessary when the API
+ * issue has been fixed
  */
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus);
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_queued (void *cls, size_t size,
-                 void *buf)
+struct InsertDoneInfo
 {
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  struct QueuedMessage *qmsg;
-  size_t msg_size;
+  GNUNET_CONSENSUS_InsertDoneCallback idc;
+  void *cls;
+};
 
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-  consensus->th = NULL;
 
-  qmsg = consensus->messages_head;
-  GNUNET_CONTAINER_DLL_remove (consensus->messages_head, 
consensus->messages_tail, qmsg);
-
-  if (NULL == buf)
-  {
-    if (NULL != qmsg->idc)
-    {
-      qmsg->idc (qmsg->idc_cls, GNUNET_YES);
-    }
-    return 0;
-  }
-
-  msg_size = ntohs (qmsg->msg->size);
-
-  GNUNET_assert (size >= msg_size);
-
-  memcpy (buf, qmsg->msg, msg_size);
-  if (NULL != qmsg->idc)
-  {
-    qmsg->idc (qmsg->idc_cls, GNUNET_YES);
-  }
-  GNUNET_free (qmsg->msg);
-  GNUNET_free (qmsg);
-  /* FIXME: free the messages */
-
-  send_next (consensus);
-
-  return msg_size;
-}
-
-
 /**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
- */
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if (NULL != consensus->th)
-    return;
-
-  if (NULL != consensus->messages_head)
-  {
-    consensus->th = 
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs 
(consensus->messages_head->msg->size),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_queued, 
consensus);
-  }
-}
-
-
-/**
  * Called when the server has sent is a new element
  * 
- * @param consensus consensus handle
- * @param msg element message
+ * @param cls consensus handle
+ * @param mh element message
  */
 static void
-handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
-                    struct GNUNET_CONSENSUS_ElementMessage *msg)
+handle_new_element (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
 {
+  struct GNUNET_CONSENSUS_Handle *consensus = cls;
+  const struct GNUNET_CONSENSUS_ElementMessage *msg
+      = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
   struct GNUNET_SET_Element element;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -247,8 +133,6 @@
   element.data = &msg[1];
 
   consensus->new_element_cb (consensus->new_element_cls, &element);
-
-  send_next (consensus);
 }
 
 
@@ -256,13 +140,15 @@
  * Called when the server has announced
  * that the conclusion is over.
  * 
- * @param consensus consensus handle
- * @param msg conclude done message
+ * @param cls consensus handle
+ * @param mh conclude done message
  */
 static void
-handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (void *cls,
                      const struct GNUNET_MessageHeader *msg)
 {
+  struct GNUNET_CONSENSUS_Handle *consensus = cls;
+
   GNUNET_CONSENSUS_ConcludeCallback cc;
 
   GNUNET_assert (NULL != (cc = consensus->conclude_cb));
@@ -272,89 +158,6 @@
 
 
 /**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_CONSENSUS_Handle *consensus = cls;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
-
-  if (NULL == msg)
-  {
-    /* Error, timeout, death */
-    LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
-    GNUNET_CLIENT_disconnect (consensus->client);
-    consensus->client = NULL;
-    consensus->new_element_cb (consensus->new_element_cls, NULL);
-    return;
-  }
-  GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  switch (ntohs (msg->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
-      handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage 
*) msg);
-      break;
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
-      handle_conclude_done (consensus, msg);
-      break;
-    default:
-      GNUNET_break (0);
-  }
-}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_join (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_JoinMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
-
-  consensus = cls;
-  consensus->th = NULL;
-  consensus->joined = 1;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
-      consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
-  msg->header.size = htons (msize);
-  msg->session_id = consensus->session_id;
-  msg->num_peers = htonl (consensus->num_peers);
-  memcpy(&msg[1],
-        consensus->peers,
-        consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
-  send_next (consensus);
-  GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  
-  return msize;
-}
-
-/**
  * Create a consensus session.
  *
  * @param cfg configuration to use for connecting to the consensus service
@@ -377,7 +180,15 @@
                          void *new_element_cls)
 {
   struct GNUNET_CONSENSUS_Handle *consensus;
-  size_t join_message_size;
+  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+  struct GNUNET_MQ_Envelope *ev;
+  const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    {handle_new_element,
+      GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
+    {handle_conclude_done,
+      GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
+    GNUNET_MQ_HANDLERS_END
+  };
 
   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
   consensus->cfg = cfg;
@@ -393,24 +204,33 @@
         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
+  consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
+                                                         mq_handlers, 
consensus);
 
   GNUNET_assert (consensus->client != NULL);
 
-  join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
-      (num_peers * sizeof (struct GNUNET_PeerIdentity));
+  ev = GNUNET_MQ_msg_extra (join_msg,
+                            (num_peers * sizeof (struct GNUNET_PeerIdentity)),
+                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
 
-  consensus->th =
-      GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                           join_message_size,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           GNUNET_NO, &transmit_join, 
consensus);
+  join_msg->session_id = consensus->session_id;
+  join_msg->num_peers = htonl (consensus->num_peers);
+  memcpy(&join_msg[1],
+        consensus->peers,
+        consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
-
-  GNUNET_assert (consensus->th != NULL);
+  GNUNET_MQ_send (consensus->mq, ev);
   return consensus;
 }
 
 
+static void
+idc_adapter (void *cls)
+{
+  struct InsertDoneInfo *i = cls;
+  i->idc (i->cls, GNUNET_OK);
+  GNUNET_free (i);
+}
 
 /**
  * Insert an element in the set being reconsiled.  Must not be called after
@@ -428,28 +248,24 @@
                         GNUNET_CONSENSUS_InsertDoneCallback idc,
                         void *idc_cls)
 {
-  struct QueuedMessage *qmsg;
   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
-  size_t element_msg_size;
+  struct GNUNET_MQ_Envelope *ev;
+  struct InsertDoneInfo *i;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
 
-  element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
-                               element->size);
+  ev = GNUNET_MQ_msg_extra (element_msg, element->size,
+                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
 
-  element_msg = GNUNET_malloc (element_msg_size);
-  element_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
-  element_msg->header.size = htons (element_msg_size);
   memcpy (&element_msg[1], element->data, element->size);
-
-  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
-  qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
-  qmsg->idc = idc;
-  qmsg->idc_cls = idc_cls;
-
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
-
-  send_next (consensus);
+  
+  if (NULL != idc)
+  {
+    i = GNUNET_new (struct InsertDoneInfo);
+    i->idc = idc;
+    i->cls = idc_cls;
+    GNUNET_MQ_notify_sent (ev, idc_adapter, i);
+  }
 }
 
 
@@ -471,7 +287,7 @@
                           GNUNET_CONSENSUS_ConcludeCallback conclude,
                           void *conclude_cls)
 {
-  struct QueuedMessage *qmsg;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
 
   GNUNET_assert (NULL != conclude);
@@ -480,17 +296,10 @@
   consensus->conclude_cls = conclude_cls;
   consensus->conclude_cb = conclude;
 
-  conclude_msg = GNUNET_malloc (sizeof (struct 
GNUNET_CONSENSUS_ConcludeMessage));
-  conclude_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
-  conclude_msg->header.size = htons (sizeof (struct 
GNUNET_CONSENSUS_ConcludeMessage));
+  ev = GNUNET_MQ_msg (conclude_msg, 
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
 
-  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
-  qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
-
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
-
-  send_next (consensus);
+  GNUNET_MQ_send (consensus->mq, ev);
 }
 
 

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-06-19 09:34:15 UTC 
(rev 27484)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-06-19 10:48:54 UTC 
(rev 27485)
@@ -116,7 +116,7 @@
   /**
    * Queued messages to the client.
    */
-  struct GNUNET_MQ_MessageQueue *client_mq;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
    * Timeout for all rounds together, single rounds will schedule a timeout 
task
@@ -217,9 +217,9 @@
   struct GNUNET_SET_OperationHandle *set_op;
 
   /**
-   * Has conclude been called on the set_op?
+   * Has commit been called on the set_op?
    */
-  int set_op_concluded;
+  int set_op_commited;
 };
 
 
@@ -548,14 +548,14 @@
       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
     }
     session->partner_outgoing->set_op =
-        GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
-                             &session->global_id,
-                             (struct GNUNET_MessageHeader *) msg,
-                             0, /* FIXME */
-                             GNUNET_SET_RESULT_ADDED,
-                             set_result_cb, session->partner_outgoing);
-    GNUNET_SET_conclude (session->partner_outgoing->set_op, 
session->element_set);
-    session->partner_outgoing->set_op_concluded = GNUNET_YES;
+        GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
+                            &session->global_id,
+                            (struct GNUNET_MessageHeader *) msg,
+                            0, /* FIXME */
+                            GNUNET_SET_RESULT_ADDED,
+                            set_result_cb, session->partner_outgoing);
+    GNUNET_SET_commit (session->partner_outgoing->set_op, 
session->element_set);
+    session->partner_outgoing->set_op_commited = GNUNET_YES;
   }
 
 #ifdef GNUNET_EXTRA_LOGGING
@@ -767,12 +767,12 @@
                                        set_result_cb, &session->info[index]);
       if (ntohl (msg->exp_subround) == session->exp_subround)
       {
-        cpi->set_op_concluded = GNUNET_YES;
-        GNUNET_SET_conclude (cpi->set_op, session->element_set);
+        cpi->set_op_commited = GNUNET_YES;
+        GNUNET_SET_commit (cpi->set_op, session->element_set);
       }
       else
       {
-        cpi->set_op_concluded = GNUNET_NO;
+        cpi->set_op_commited = GNUNET_NO;
       }
       break;
     default:

Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c   2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/dv/gnunet-service-dv.c   2013-06-19 10:48:54 UTC (rev 27485)
@@ -769,7 +769,7 @@
   if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance)
   {
     /* we have added all elements to the set, run the operation */
-    GNUNET_SET_conclude (neighbor->set_op,
+    GNUNET_SET_commit (neighbor->set_op,
                         neighbor->my_set);
     GNUNET_SET_destroy (neighbor->my_set);
     neighbor->my_set = NULL;
@@ -1425,13 +1425,13 @@
   neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
   neighbor->my_set = GNUNET_SET_create (cfg,
                                        GNUNET_SET_OPERATION_UNION);
-  neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer,
-                                         &neighbor->real_session_id,
-                                         NULL,
-                                         0 /* FIXME: salt */,
-                                         GNUNET_SET_RESULT_ADDED,
-                                         &handle_set_union_result,
-                                         neighbor);
+  neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer,
+                                         &neighbor->real_session_id,
+                                         NULL,
+                                         0 /* FIXME: salt */,
+                                         GNUNET_SET_RESULT_ADDED,
+                                         &handle_set_union_result,
+                                         neighbor);
   build_set (neighbor);
 }
 

Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h   2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/include/gnunet_container_lib.h   2013-06-19 10:48:54 UTC (rev 
27485)
@@ -534,7 +534,7 @@
  *         GNUNET_NO if not.
  */
 typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls,
-                                                 const struct GNUNET_HashCode 
* key,
+                                                 const struct GNUNET_HashCode 
*key,
                                                  void *value);
 
 

Modified: gnunet/src/include/gnunet_mesh2_service.h
===================================================================
--- gnunet/src/include/gnunet_mesh2_service.h   2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/include/gnunet_mesh2_service.h   2013-06-19 10:48:54 UTC (rev 
27485)
@@ -162,7 +162,8 @@
  *                the tunnel.
  * @param handlers Callbacks for messages we care about, NULL-terminated. Each
  *                 one must call GNUNET_MESH_receive_done on the tunnel to
- *                 receive the next message.
+ *                 receive the next message.  Messages of a type that is not
+ *                 in the handlers array are ignored if received. 
  * @param ports NULL or 0-terminated array of port numbers for incoming 
tunnels.
  * 
  * @return handle to the mesh service NULL on error
@@ -325,7 +326,7 @@
 /**
  * Request information about the running mesh peer.
  * The callback will be called for every tunnel known to the service,
- * listing all active peers that blong to the tunnel.
+ * listing all active peers that belong to the tunnel.
  *
  * If called again on the same handle, it will overwrite the previous
  * callback and cls. To retrieve the cls, monitor_cancel must be
@@ -375,6 +376,18 @@
 GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h);
 
 
+/**
+ * Create a message queue for a mesh tunnel.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param tunnel the tunnel to create the message qeue for
+ * @return a message queue to messages over the tunnel
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel);
+
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/include/gnunet_mq_lib.h
===================================================================
--- gnunet/src/include/gnunet_mq_lib.h  2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/include/gnunet_mq_lib.h  2013-06-19 10:48:54 UTC (rev 27485)
@@ -21,7 +21,7 @@
 /**
  * @author Florian Dold
  * @file set/mq.h
- * @brief general purpose request queue
+ * @brief general purpose message queue
  */
 #ifndef GNUNET_MQ_H
 #define GNUNET_MQ_H
@@ -30,7 +30,7 @@
 
 
 /**
- * Allocate a GNUNET_MQ_Message, with extra space allocated after the space 
needed
+ * Allocate an envelope, with extra space allocated after the space needed
  * by the message struct.
  * The allocated message will already have the type and size field set.
  *
@@ -43,19 +43,19 @@
 #define GNUNET_MQ_msg_extra(mvar, esize, type) 
GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) 
&(mvar)), (esize) + sizeof *(mvar), (type))
 
 /**
- * Allocate a GNUNET_MQ_Message.
- * The allocated message will already have the type and size field set.
+ * Allocate a GNUNET_MQ_Envelope.
+ * The contained message will already have the type and size field set.
  *
  * @param mvar variable to store the allocated message in;
  *             must have a header field
  * @param type type of the message
- * @return the MQ message
+ * @return the allocated envelope
  */
 #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
 
 
 /**
- * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
+ * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
  * The allocated message will already have the type and size field set.
  *
  * @param type type of the message
@@ -64,7 +64,7 @@
 
 
 /**
- * Allocate a GNUNET_MQ_Message, where the message only consists of a header 
and extra space.
+ * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header 
and extra space.
  * The allocated message will already have the type and size field set.
  *
  * @param mh pointer that will changed to point at to the allocated message 
header
@@ -75,14 +75,14 @@
 
 
 /**
- * Allocate a GNUNET_MQ_Message, and append a payload message after the given
+ * Allocate a GNUNET_MQ_Envelope, and append a payload message after the given
  * message struct.
  *
  * @param mvar pointer to a message struct, will be changed to point at the 
newly allocated message,
  *        whose size is 'sizeof(*mvar) + ntohs (mh->size)'
  * @param type message type of the allocated message, has no effect on the 
nested message
  * @param mh message to nest
- * @return a newly allocated 'struct GNUNET_MQ_Message *'
+ * @return a newly allocated 'struct GNUNET_MQ_Envelope *'
  */
 #define GNUNET_MQ_msg_nested_mh(mvar, type, mh) 
GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct 
GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
 
@@ -98,11 +98,24 @@
 #define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct 
GNUNET_MessageHeader *) (var), sizeof (*(var)))
 
 
+/**
+ * Implementation of the GNUNET_MQ_extract_nexted_mh macro.
+ *
+ * @param mh message header to extract nested message header from
+ * @param base_size size of the message before the nested message's header 
appears
+ * @return pointer to the nested message, does not copy the message
+ */
 struct GNUNET_MessageHeader *
 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t 
base_size);
 
 
-struct GNUNET_MQ_Message *
+/**
+ * Implementation of the GNUNET_MQ_msg_nested_mh macro.
+ *
+ * @param mhp pointer to the message header pointer that will be changed to 
allocate at
+ *        the newly allocated space for the message.
+ */
+struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t 
base_size, uint16_t type,
                           const struct GNUNET_MessageHeader *nested_mh);
 
@@ -114,9 +127,15 @@
 #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
 
 
-struct GNUNET_MQ_MessageQueue;
+/**
+ * Opaque handle to a message queue.
+ */
+struct GNUNET_MQ_Handle;
 
-struct GNUNET_MQ_Message;
+/**
+ * Opaque handle to an envelope.
+ */
+struct GNUNET_MQ_Envelope;
 
 enum GNUNET_MQ_Error
 {
@@ -133,25 +152,48 @@
  * @param msg the received message
  */
 typedef void
-(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader 
*msg);
+(*GNUNET_MQ_MessageCallback) (void *cls,
+                              const struct GNUNET_MessageHeader *msg);
 
 
 /**
  * Signature of functions implementing the
- * sending part of a message queue
+ * sending functionality of a message queue.
  *
- * @param q the message queue
- * @param m the message
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
  */
 typedef void
-(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct 
GNUNET_MQ_Message *m);
+(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_Handle *mq,
+                       const struct GNUNET_MessageHeader *msg,
+                       void *impl_state);
 
 
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ * 
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
 typedef void
-(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
+(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
 
 
 /**
+ * Implementation function that cancels the currently sent message.
+ * 
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+typedef void
+(*GNUNET_MQ_CancelImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
+
+
+/**
  * Callback used for notifications
  *
  * @param cls closure
@@ -160,117 +202,23 @@
 (*GNUNET_MQ_NotifyCallback) (void *cls);
 
 
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, same closure as for the message handlers
+ * @param error error code
+ */
 typedef void
 (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
 
 
-struct GNUNET_MQ_Message
-{
-  /**
-   * Messages are stored in a linked list
-   */
-  struct GNUNET_MQ_Message *next;
-
-  /**
-   * Messages are stored in a linked list
-   */
-  struct GNUNET_MQ_Message *prev;
-
-  /**
-   * Actual allocated message header,
-   * usually points to the end of the containing GNUNET_MQ_Message
-   */
-  struct GNUNET_MessageHeader *mh;
-
-  /**
-   * Queue the message is queued in, NULL if message is not queued.
-   */
-  struct GNUNET_MQ_MessageQueue *parent_queue;
-
-  /**
-   * Called after the message was sent irrevokably
-   */
-  GNUNET_MQ_NotifyCallback sent_cb;
-
-  /**
-   * Closure for send_cb
-   */
-  void *sent_cls;
-};
-
-
 /**
- * Handle to a message queue.
- */
-struct GNUNET_MQ_MessageQueue
-{
-  /**
-   * Handlers array, or NULL if the queue should not receive messages
-   */
-  const struct GNUNET_MQ_Handler *handlers;
-
-  /**
-   * Closure for the handler callbacks,
-   * as well as for the error handler.
-   */
-  void *handlers_cls;
-
-  /**
-   * Actual implementation of message sending,
-   * called when a message is added
-   */
-  GNUNET_MQ_SendImpl send_impl;
-
-  /**
-   * Implementation-dependent queue destruction function
-   */
-  GNUNET_MQ_DestroyImpl destroy_impl;
-
-  /**
-   * Implementation-specific state
-   */
-  void *impl_state;
-
-  /**
-   * Callback will be called when an error occurs.
-   */
-  GNUNET_MQ_ErrorHandler error_handler;
-
-  /**
-   * Linked list of messages pending to be sent
-   */
-  struct GNUNET_MQ_Message *msg_head;
-
-  /**
-   * Linked list of messages pending to be sent
-   */
-  struct GNUNET_MQ_Message *msg_tail;
-
-  /**
-   * Message that is currently scheduled to be
-   * sent. Not the head of the message queue, as the implementation
-   * needs to know if sending has been already scheduled or not.
-   */
-  struct GNUNET_MQ_Message *current_msg;
-
-  /**
-   * Map of associations, lazily allocated
-   */
-  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
-
-  /**
-   * Next id that should be used for the assoc_map,
-   * initialized lazily to a random value together with
-   * assoc_map
-   */
-  uint32_t assoc_id;
-};
-
-
-/**
  * Message handler for a specific message type.
  */
-struct GNUNET_MQ_Handler
+struct GNUNET_MQ_MessageHandler
 {
   /**
    * Callback, called every time a new message of 
@@ -296,14 +244,14 @@
 
 
 /**
- * Create a new message for MQ.
+ * Create a new envelope.
  * 
  * @param mhp message header to store the allocated message header in, can be 
NULL
  * @param size size of the message to allocate
  * @param type type of the message, will be set in the allocated message
  * @return the allocated MQ message
  */
-struct GNUNET_MQ_Message *
+struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t 
type);
 
 
@@ -315,7 +263,7 @@
  * @param mqm the message to discard
  */
 void
-GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
+GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm);
 
 
 /**
@@ -326,7 +274,7 @@
  * @param mqm the message to send.
  */
 void
-GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message 
*mqm);
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev);
 
 
 /**
@@ -336,7 +284,7 @@
  * @param mqm queued message to cancel
  */
 void
-GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm);
+GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev);
 
 
 /**
@@ -347,9 +295,7 @@
  * @param assoc_data to associate
  */
 uint32_t
-GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
-                     struct GNUNET_MQ_Message *mqm,
-                     void *assoc_data);
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data);
 
 /**
  * Get the data associated with a request id in a queue
@@ -359,7 +305,7 @@
  * @return the associated data
  */
 void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
 
 
 /**
@@ -370,7 +316,7 @@
  * @return the associated data
  */
 void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t 
request_id);
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
 
 
 
@@ -383,9 +329,9 @@
  * @param cls closure for the handlers
  * @return the message queue
  */
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
-                                       const struct GNUNET_MQ_Handler 
*handlers,
+                                       const struct GNUNET_MQ_MessageHandler 
*handlers,
                                        void *cls);
 
 
@@ -395,7 +341,7 @@
  * @param client the client
  * @return the message queue
  */
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
 
 
@@ -404,16 +350,19 @@
  *
  * @param send function the implements sending messages
  * @param destroy function that implements destroying the queue
+ * @param destroy function that implements canceling a message
  * @param state for the queue, passed to 'send' and 'destroy'
  * @param handlers array of message handlers
  * @param error_handler handler for read and write errors
+ * @param cls closure for handlers
  * @return a new message queue
  */
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
                                GNUNET_MQ_DestroyImpl destroy,
+                               GNUNET_MQ_CancelImpl cancel,
                                void *impl_state,
-                               struct GNUNET_MQ_Handler *handlers,
+                               const struct GNUNET_MQ_MessageHandler *handlers,
                                GNUNET_MQ_ErrorHandler error_handler,
                                void *cls);
                                
@@ -424,27 +373,30 @@
  * Takes effect immediately, even for messages that already have been 
received, but for
  * with the handler has not been called.
  *
+ * If the message queue does not support receiving messages,
+ * this function has no effect.
+ *
  * @param mq message queue
  * @param new_handlers new handlers
  * @param cls new closure for the handlers
  */
 void
-GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
-                            const struct GNUNET_MQ_Handler *new_handlers,
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
+                            const struct GNUNET_MQ_MessageHandler 
*new_handlers,
                             void *cls);
 
 
 /**
- * Call a callback once the message has been sent, that is, the message
- * can not be canceled anymore.
- * There can be only one notify sent callback per message.
+ * Call a callback once the envelope has been sent, that is,
+ * sending it can not be canceled anymore.
+ * There can be only one notify sent callback per envelope.
  *
- * @param mqm message to call the notify callback for
+ * @param ev message to call the notify callback for
  * @param cb the notify callback
  * @param cls closure for the callback
  */
 void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
                        GNUNET_MQ_NotifyCallback cb,
                        void *cls);
 
@@ -455,7 +407,7 @@
  * @param mq message queue to destroy
  */
 void
-GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
+GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq);
 
 
 /**
@@ -465,7 +417,70 @@
  * @param mh message to dispatch
  */
 void
-GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq,
-                    const struct GNUNET_MessageHeader *mh);
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
+                          const struct GNUNET_MessageHeader *mh);
 
+
+/**
+ * Call the right callback for an error condition.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
+                        enum GNUNET_MQ_Error error);
+
+
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq);
+
+
+/**
+ * Get the message that should currently be sent.
+ * Fails if there is no current message.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq);
+
+
+/**
+ * Get the implementation state associated with the
+ * message queue.
+ *
+ * While the GNUNET_MQ_Impl* callbacks receive the
+ * implementation state, continuations that are scheduled
+ * by the implementation function often only have one closure
+ * argument, with this function it is possible to get at the
+ * implementation state when only passing the GNUNET_MQ_Handle
+ * as closure.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+void *
+GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq);
+
+/**
+ * Mark the current message as irrevocably sent, but do not
+ * proceed with sending the next message.
+ * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq);
+
 #endif

Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/include/gnunet_set_service.h     2013-06-19 10:48:54 UTC (rev 
27485)
@@ -257,9 +257,9 @@
 
 
 /**
- * Create a set operation for evaluation with another peer.
+ * Prepare a set operation to be evaluated with another peer.
  * The evaluation will not start until the client provides
- * a local set with GNUNET_SET_conclude.
+ * a local set with GNUNET_SET_commit.
  *
  * @param other_peer peer with the other set
  * @param app_id hash for the application using the set
@@ -273,14 +273,14 @@
  * @param result_cls closure for result_cb
  * @return a handle to cancel the operation
  */
-struct GNUNET_SET_OperationHandle * // FIXME: rename to _connect?
-GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
-                     const struct GNUNET_HashCode *app_id,
-                     const struct GNUNET_MessageHeader *context_msg,
-                     uint16_t salt,
-                     enum GNUNET_SET_ResultMode result_mode,
-                     GNUNET_SET_ResultIterator result_cb,
-                     void *result_cls);
+struct GNUNET_SET_OperationHandle *
+GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
+                    const struct GNUNET_HashCode *app_id,
+                    const struct GNUNET_MessageHeader *context_msg,
+                    uint16_t salt,
+                    enum GNUNET_SET_ResultMode result_mode,
+                    GNUNET_SET_ResultIterator result_cb,
+                    void *result_cls);
 
 
 /**
@@ -316,7 +316,7 @@
  * Accept a request we got via GNUNET_SET_listen.  Must be called during
  * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
  * afterwards.
- * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * Call GNUNET_SET_commit to provide the local set to use for the operation,
  * and to begin the exchange with the remote peer. 
  *
  * @param request request to accept
@@ -334,7 +334,7 @@
 
 
 /**
- * Conclude the given set operation using the given set. 
+ * Commit a set to be used with a set operation.
  * This function is called once we have fully constructed
  * the set that we want to use for the operation.  At this
  * time, the P2P protocol can then begin to exchange the
@@ -344,9 +344,9 @@
  * @param oh handle to the set operation 
  * @param set the set to use for the operation
  */
-void // FIXME: rename to _commit
-GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
-                    struct GNUNET_SET_Handle *set);
+void
+GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
+                   struct GNUNET_SET_Handle *set);
 
 
 /**

Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h      2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/include/gnunet_stream_lib.h      2013-06-19 10:48:54 UTC (rev 
27485)
@@ -403,9 +403,9 @@
  * @param error_handler callback for errors
  * @return the message queue for the socket
  */
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 
-                         const struct GNUNET_MQ_Handler *msg_handlers,
+                         const struct GNUNET_MQ_MessageHandler *msg_handlers,
                          GNUNET_MQ_ErrorHandler error_handler,
                          void *cls);
 

Modified: gnunet/src/mesh/mesh2_api.c
===================================================================
--- gnunet/src/mesh/mesh2_api.c 2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/mesh/mesh2_api.c 2013-06-19 10:48:54 UTC (rev 27485)
@@ -320,6 +320,24 @@
 };
 
 
+/**
+ * Implementation state for mesh's message queue.
+ */
+struct MeshMQState
+{
+  /**
+   * The current transmit handle, or NULL
+   * if no transmit is active.
+   */
+  struct GNUNET_MESH_TransmitHandle *th;
+
+  /**
+   * Tunnel to send the data over.
+   */
+  struct GNUNET_MESH_Tunnel *tunnel;
+};
+
+
 
/******************************************************************************/
 /***********************         DECLARATIONS         
*************************/
 
/******************************************************************************/
@@ -1685,4 +1703,115 @@
   h->tunnel_cls = callback_cls;
 
   return;
-}
\ No newline at end of file
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+mesh_mq_ntr (void *cls, size_t size,
+             void *buf)
+{
+  struct GNUNET_MQ_Handle *mq = cls; 
+  struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
+  const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
+  uint16_t msize;
+
+  state->th = NULL;
+  if (NULL == buf)
+  {
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
+    return 0;
+  }
+  msize = ntohs (msg->size);
+  GNUNET_assert (msize <= size);
+  memcpy (buf, msg, msize);
+  GNUNET_MQ_impl_send_continue (mq);
+  return msize;
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+                   const struct GNUNET_MessageHeader *msg, void *impl_state)
+{
+  struct MeshMQState *state = impl_state;
+
+  GNUNET_assert (NULL == state->th);
+  GNUNET_MQ_impl_send_commit (mq);
+  state->th =
+      GNUNET_MESH_notify_transmit_ready (state->tunnel,
+                                         /* FIXME: add option for corking */
+                                         GNUNET_NO,
+                                         GNUNET_TIME_UNIT_FOREVER_REL, 
+                                         ntohs (msg->size),
+                                         mesh_mq_ntr, mq);
+
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ * 
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct MeshMQState *state = impl_state;
+
+  if (NULL != state->th)
+    GNUNET_MESH_notify_transmit_ready_cancel (state->th);
+
+  GNUNET_free (state);
+}
+
+
+/**
+ * Create a message queue for a mesh tunnel.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param tunnel the tunnel to create the message qeue for
+ * @return a message queue to messages over the tunnel
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel)
+{
+  struct GNUNET_MQ_Handle *mq;
+  struct MeshMQState *state;
+
+  state = GNUNET_new (struct MeshMQState);
+  state->tunnel = tunnel;
+
+  mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
+                                      mesh_mq_destroy_impl,
+                                      NULL, /* FIXME: cancel impl. */
+                                      state,
+                                      NULL, /* no msg handlers */
+                                      NULL, /* no err handlers */
+                                      NULL); /* no handler cls */
+  return mq;
+}
+

Modified: gnunet/src/set/Makefile.am
===================================================================
--- gnunet/src/set/Makefile.am  2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/Makefile.am  2013-06-19 10:48:54 UTC (rev 27485)
@@ -16,7 +16,7 @@
 endif
 
 bin_PROGRAMS = \
- gnunet-set
+ gnunet-set-profiler gnunet-set-ibf-profiler
 
 libexec_PROGRAMS = \
  gnunet-service-set
@@ -24,17 +24,24 @@
 lib_LTLIBRARIES = \
   libgnunetset.la
 
-gnunet_set_SOURCES = \
- gnunet-set.c
-gnunet_set_LDADD = \
+gnunet_set_profiler_SOURCES = \
+ gnunet-set-profiler.c
+gnunet_set_profiler_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/set/libgnunetset.la \
-  $(top_builddir)/src/stream/libgnunetstream.la \
-  $(top_builddir)/src/testbed/libgnunettestbed.la \
+  $(top_builddir)/src/testing/libgnunettesting.la \
   $(GN_LIBINTL)
-gnunet_set_DEPENDENCIES = \
+gnunet_set_profiler_DEPENDENCIES = \
   libgnunetset.la
 
+
+gnunet_set_ibf_profiler_SOURCES = \
+ gnunet-set-ibf-profiler.c \
+ ibf.c
+gnunet_set_ibf_profiler_LDADD = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(GN_LIBINTL)
+
 gnunet_service_set_SOURCES = \
  gnunet-service-set.c \
  gnunet-service-set_union.c \
@@ -43,8 +50,7 @@
 gnunet_service_set_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/core/libgnunetcore.la \
-  $(top_builddir)/src/stream/libgnunetstream.la \
-  $(top_builddir)/src/mesh/libgnunetmesh.la \
+  $(top_builddir)/src/mesh/libgnunetmesh2.la \
   $(GN_LIBINTL)
 
 libgnunetset_la_SOURCES = \

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/gnunet-service-set.c 2013-06-19 10:48:54 UTC (rev 27485)
@@ -29,13 +29,16 @@
 
 /**
  * Configuration of our local peer.
+ * (Not declared 'static' as also needed in gnunet-service-set_union.c)
  */
 const struct GNUNET_CONFIGURATION_Handle *configuration;
 
 /**
- * Socket listening for other peers via stream.
+ * Handle to the mesh service, used
+ * to listen for and connect to remote peers.
+ * (Not declared 'static' as also needed in gnunet-service-set_union.c)
  */
-static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
+struct GNUNET_MESH_Handle *mesh;
 
 /**
  * Sets are held in a doubly linked list.
@@ -78,14 +81,14 @@
 
 
 /**
- * Get set that is owned by the client, if any.
+ * Get set that is owned by the given client, if any.
  *
  * @param client client to look for
  * @return set that the client owns, NULL if the client
  *         does not own a set
  */
 static struct Set *
-get_set (struct GNUNET_SERVER_Client *client)
+set_get (struct GNUNET_SERVER_Client *client)
 {
   struct Set *set;
   for (set = sets_head; NULL != set; set = set->next)
@@ -137,7 +140,7 @@
  * @param listener listener to destroy
  */
 static void
-destroy_listener (struct Listener *listener)
+listener_destroy (struct Listener *listener)
 {
   if (NULL != listener->client_mq)
   {
@@ -155,7 +158,7 @@
  * @param set the set to destroy
  */
 static void
-destroy_set (struct Set *set)
+set_destroy (struct Set *set)
 {
   switch (set->operation)
   {
@@ -187,12 +190,12 @@
   struct Set *set;
   struct Listener *listener;
 
-  set = get_set (client);
+  set = set_get (client);
   if (NULL != set)
-    destroy_set (set);
+    set_destroy (set);
   listener = get_listener (client);
   if (NULL != listener)
-    destroy_listener (listener);
+    listener_destroy (listener);
 }
 
 
@@ -202,18 +205,15 @@
  * @param incoming remote request to destroy
  */
 static void
-destroy_incoming (struct Incoming *incoming)
+incoming_destroy (struct Incoming *incoming)
 {
-  if (NULL != incoming->mq)
+  if (NULL != incoming->tc)
   {
-    GNUNET_MQ_destroy (incoming->mq);
-    incoming->mq = NULL;
+    GNUNET_free (incoming->tc);
+    GNUNET_assert (NULL != incoming->tc->tunnel);
+    GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
+    incoming->tc = NULL;
   }
-  if (NULL != incoming->socket)
-  {
-    GNUNET_STREAM_close (incoming->socket);
-    incoming->socket = NULL;
-  }
   GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
   GNUNET_free (incoming);
 }
@@ -237,6 +237,15 @@
 }
 
 
+
+static void
+tunnel_context_destroy (struct TunnelContext *tc)
+{
+  GNUNET_free (tc);
+  /* FIXME destroy the rest */
+}
+
+
 /**
  * Handle a request for a set operation from
  * another peer.
@@ -244,16 +253,31 @@
  * @param cls the incoming socket
  * @param mh the message
  */
-static void
-handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
+static int
+handle_p2p_operation_request (void *cls,
+                              struct GNUNET_MESH_Tunnel *tunnel,
+                              void **tunnel_ctx,
+                              const struct GNUNET_PeerIdentity *sender,
+                              const struct GNUNET_MessageHeader *mh)
 {
-  struct Incoming *incoming = cls;
+  struct TunnelContext *tc = *tunnel_ctx;
+  struct Incoming *incoming;
   const struct OperationRequestMessage *msg = (const struct 
OperationRequestMessage *) mh;
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_RequestMessage *cmsg;
   struct Listener *listener;
   const struct GNUNET_MessageHeader *context_msg;
 
+  if (CONTEXT_INCOMING != tc->type)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n");
+    tunnel_context_destroy (tc);
+    /* don't kill the whole mesh connection */
+    return GNUNET_OK;
+  }
+
+  incoming = tc->data;
+
   context_msg = GNUNET_MQ_extract_nested_mh (msg);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, 
app %s)\n",
               ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
@@ -263,20 +287,26 @@
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "set operation request from peer failed: "
                 "no set with matching application ID and operation type\n");
-    return;
+    tunnel_context_destroy (tc);
+    /* don't kill the whole mesh connection */
+    return GNUNET_OK;
   }
   mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, 
context_msg);
   if (NULL == mqm)
   {
     /* FIXME: disconnect the peer */
     GNUNET_break_op (0);
-    return;
+    tunnel_context_destroy (tc);
+    /* don't kill the whole mesh connection */
+    return GNUNET_OK;
   }
   incoming->accept_id = accept_id++;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", 
incoming->accept_id);
   cmsg->accept_id = htonl (incoming->accept_id);
-  cmsg->peer_id = incoming->peer;
+  cmsg->peer_id = incoming->tc->peer;
   GNUNET_MQ_send (listener->client_mq, mqm);
+
+  return GNUNET_OK;
 }
 
 
@@ -298,7 +328,7 @@
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation 
%u)\n",
               ntohs (msg->operation));
 
-  if (NULL != get_set (client))
+  if (NULL != set_get (client))
   {
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
@@ -379,7 +409,7 @@
 {
   struct Set *set;
 
-  set = get_set (client);
+  set = set_get (client);
   if (NULL == set)
   {
     GNUNET_break (0);
@@ -428,7 +458,7 @@
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
-  destroy_incoming (incoming);
+  incoming_destroy (incoming);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -449,7 +479,7 @@
 {
   struct Set *set;
 
-  set = get_set (client);
+  set = set_get (client);
   if (NULL == set)
   {
     GNUNET_break (0);
@@ -486,7 +516,7 @@
 {
   struct Set *set;
 
-  set = get_set (client);
+  set = set_get (client);
   if (NULL == set)
   {
     GNUNET_break (0);
@@ -558,9 +588,8 @@
     return;
   }
 
+  set = set_get (client);
 
-  set = get_set (client);
-
   if (NULL == set)
   {
     GNUNET_break (0);
@@ -584,51 +613,12 @@
 
   /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
    * otherwise they will be destroyed and disconnected */
-  destroy_incoming (incoming);
+  incoming_destroy (incoming);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 /**
- * Functions of this type are called upon new stream connection from other 
peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
- *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
- * @param initiator the identity of the peer who wants to establish a stream
- *            with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- *             stream (the socket will be invalid after the call)
- */
-static int
-stream_listen_cb (void *cls,
-                  struct GNUNET_STREAM_Socket *socket,
-                  const struct GNUNET_PeerIdentity *initiator)
-{
-  struct Incoming *incoming;
-  static const struct GNUNET_MQ_Handler handlers[] = {
-    {handle_p2p_operation_request, 
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
-    GNUNET_MQ_HANDLERS_END
-  };
-
-  if (NULL == socket)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-
-  incoming = GNUNET_new (struct Incoming);
-  incoming->peer = *initiator;
-  incoming->socket = socket;
-  incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, 
incoming);
-  /* FIXME: timeout for peers that only connect but don't send anything */
-  GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
-  return GNUNET_OK;
-}
-
-
-/**
  * Called to clean up, after a shutdown has been requested.
  *
  * @param cls closure
@@ -638,32 +628,127 @@
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  if (NULL != stream_listen_socket)
+  if (NULL != mesh)
   {
-    GNUNET_STREAM_listen_close (stream_listen_socket);
-    stream_listen_socket = NULL;
+    GNUNET_MESH_disconnect (mesh);
+    mesh = NULL;
   }
 
   while (NULL != incoming_head)
   {
-    destroy_incoming (incoming_head);
+    incoming_destroy (incoming_head);
   }
 
   while (NULL != listeners_head)
   {
-    destroy_listener (listeners_head);
+    listener_destroy (listeners_head);
   }
 
   while (NULL != sets_head)
   {
-    destroy_set (sets_head);
+    set_destroy (sets_head);
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
 }
 
 
+
 /**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Incoming *incoming = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out");
+  incoming_destroy (incoming);
+}
+
+
+/**
+ * Method called whenever another peer has added us to a tunnel
+ * the other peer initiated.
+ * Only called (once) upon reception of data with a message type which was
+ * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
+ * causes te tunnel to be ignored and no further notifications are sent about
+ * the same tunnel.
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param port Port this tunnel is for.
+ * @return initial tunnel context for the tunnel
+ *         (can be NULL -- that's not an error)
+ */
+static void *
+tunnel_new_cb (void *cls,
+               struct GNUNET_MESH_Tunnel *tunnel,
+               const struct GNUNET_PeerIdentity *initiator,
+               uint32_t port)
+{
+  struct Incoming *incoming;
+  struct TunnelContext *tc;
+
+  GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
+  tc = GNUNET_new (struct TunnelContext);
+  incoming = GNUNET_new (struct Incoming);
+  incoming->tc = tc;
+  tc->peer = *initiator;
+  tc->tunnel = tunnel;
+  tc->mq = GNUNET_MESH_mq_create (tunnel);
+  tc->data = incoming;
+  tc->type = CONTEXT_INCOMING;
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, 
incoming);
+  GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
+
+  return tc;
+}
+
+
+/**
+ * Function called whenever a tunnel is destroyed.  Should clean up
+ * any associated state.  This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ *                   with the tunnel is stored
+ */
+static void
+tunnel_end_cb (void *cls,
+               const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
+{
+  struct TunnelContext *ctx = tunnel_ctx;
+
+  switch (ctx->type)
+  {
+    case CONTEXT_INCOMING:
+      incoming_destroy ((struct Incoming *) ctx->data);
+      break;
+    case CONTEXT_OPERATION_UNION:
+      _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) 
ctx->data);
+      break;
+    case CONTEXT_OPERATION_INTERSECTION:
+      GNUNET_assert (0);
+      /* FIXME: cfuchs */
+      break;
+    default:
+      GNUNET_assert (0);
+  }
+
+}
+
+
+/**
  * Function called by the service's run
  * method to run service-specific setup code.
  *
@@ -686,16 +771,40 @@
     {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
     {NULL, NULL, 0, 0}
   };
+  static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
+    {handle_p2p_operation_request,
+      GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
+    /* messages for the union operation */
+    {_GSS_union_handle_p2p_message,
+      GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
+    {_GSS_union_handle_p2p_message,
+      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+    {_GSS_union_handle_p2p_message,
+      GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
+    {_GSS_union_handle_p2p_message,
+      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+    {_GSS_union_handle_p2p_message,
+      GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
+    /* FIXME: messages for intersection operation */
+    {NULL, 0, 0}
+  };
+  static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
 
   configuration = cfg;
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 
NULL);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                &shutdown_task, NULL);
   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
   GNUNET_SERVER_add_handlers (server, server_handlers);
-  stream_listen_socket = GNUNET_STREAM_listen (cfg, 
GNUNET_APPLICATION_TYPE_SET,
-                                               &stream_listen_cb, NULL,
-                                               GNUNET_STREAM_OPTION_END);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
+  mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb,
+                              mesh_handlers, mesh_ports);
+  if (NULL == mesh)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n");
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
 }
 
 
@@ -710,7 +819,8 @@
 main (int argc, char *const *argv)
 {
   int ret;
-  ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, 
&run, NULL);
+  ret = GNUNET_SERVICE_run (argc, argv, "set",
+                            GNUNET_SERVICE_OPTION_NONE, &run, NULL);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
   return (GNUNET_OK == ret) ? 0 : 1;
 }

Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/gnunet-service-set.h 2013-06-19 10:48:54 UTC (rev 27485)
@@ -33,7 +33,7 @@
 #include "gnunet_applications.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
+#include "gnunet_mesh2_service.h"
 #include "gnunet_set_service.h"
 #include "set.h"
 
@@ -47,7 +47,9 @@
  */
 struct UnionState;
 
+struct UnionEvaluateOperation;
 
+
 /**
  * A set that supports a specific operation
  * with other peers.
@@ -63,7 +65,7 @@
   /**
    * Message queue for the client
    */
-  struct GNUNET_MQ_MessageQueue *client_mq;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
    * Type of operation supported for this set
@@ -116,7 +118,7 @@
   /**
    * Message queue for the client
    */
-  struct GNUNET_MQ_MessageQueue *client_mq;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
    * Type of operation supported for this set
@@ -148,21 +150,19 @@
   struct Incoming *prev;
 
   /**
-   * Identity of the peer that connected to us
+   * Tunnel context, stores information about
+   * the tunnel and its peer.
    */
-  struct GNUNET_PeerIdentity peer;
+  struct TunnelContext *tc;
 
   /**
-   * Socket connected to the peer
+   * GNUNET_YES if the incoming peer has sent
+   * an operation request (and we are waiting
+   * for the client to ack/nack), GNUNET_NO otherwise.
    */
-  struct GNUNET_STREAM_Socket *socket;
+  int received_request;
 
   /**
-   * Message queue for the peer
-   */
-  struct GNUNET_MQ_MessageQueue *mq;
-
-  /**
    * App code, set once the peer has
    * requested an operation
    */
@@ -187,19 +187,38 @@
 
   /**
    * Unique request id for the request from
-   * a remote peer, sent to the client with will
+   * a remote peer, sent to the client, which will
    * accept or reject the request.
    */
   uint32_t accept_id;
 };
 
 
+enum TunnelContextType {
+  CONTEXT_INCOMING,
+  CONTEXT_OPERATION_UNION,
+  CONTEXT_OPERATION_INTERSECTION,
+};
+
+struct TunnelContext
+{
+  struct GNUNET_MESH_Tunnel *tunnel;
+  struct GNUNET_PeerIdentity peer;
+  struct GNUNET_MQ_Handle *mq;
+  enum TunnelContextType type;
+  void *data;
+};
+
+
+
 /**
  * Configuration of the local peer
  */
 extern const struct GNUNET_CONFIGURATION_Handle *configuration;
 
+extern struct GNUNET_MESH_Handle *mesh;
 
+
 /**
  * Create a new set supporting the union operation
  *
@@ -262,4 +281,32 @@
                    struct Incoming *incoming);
 
 
+/**
+ * Destroy a union operation, and free all resources
+ * associated with it.
+ *
+ * @param eo the union operation to destroy
+ */
+void
+_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
+
+
+/**
+ * Dispatch messages for a union operation.
+ *
+ * @param cls closure
+ * @param tunnel mesh tunnel
+ * @param tunnel_ctx tunnel context
+ * @param sender ???
+ * @param mh message to process
+ * @return ???
+ */
+int
+_GSS_union_handle_p2p_message (void *cls,
+                               struct GNUNET_MESH_Tunnel *tunnel,
+                               void **tunnel_ctx,
+                               const struct GNUNET_PeerIdentity *sender,
+                               const struct GNUNET_MessageHeader *mh);
+
+
 #endif

Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c   2013-06-19 09:34:15 UTC (rev 
27484)
+++ gnunet/src/set/gnunet-service-set_union.c   2013-06-19 10:48:54 UTC (rev 
27485)
@@ -124,17 +124,12 @@
   struct GNUNET_MessageHeader *context_msg;
 
   /**
-   * Stream socket connected to the other peer
+   * Tunnel context for the peer we
+   * evaluate the union operation with.
    */
-  struct GNUNET_STREAM_Socket *socket;
+  struct TunnelContext *tc;
 
   /**
-   * Message queue for the peer on the other
-   * end
-   */
-  struct GNUNET_MQ_MessageQueue *mq;
-
-  /**
    * Request ID to multiplex set operations to
    * the client inhabiting the set.
    */
@@ -397,22 +392,19 @@
  *
  * @param eo the union operation to destroy
  */
-static void
-destroy_union_operation (struct UnionEvaluateOperation *eo)
+void
+_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
   
-  if (NULL != eo->mq)
+  if (NULL != eo->tc)
   {
-    GNUNET_MQ_destroy (eo->mq);
-    eo->mq = NULL;
+    GNUNET_MQ_destroy (eo->tc->mq);
+    GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
+    GNUNET_free (eo->tc);
+    eo->tc = NULL;
   }
 
-  if (NULL != eo->socket)
-  {
-    GNUNET_STREAM_close (eo->socket);
-    eo->socket = NULL;
-  }
   if (NULL != eo->remote_ibf)
   {
     ibf_destroy (eo->remote_ibf);
@@ -457,14 +449,14 @@
 static void
 fail_union_operation (struct UnionEvaluateOperation *eo)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ResultMessage *msg;
 
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   msg->request_id = htonl (eo->request_id);
   GNUNET_MQ_send (eo->set->client_mq, mqm);
-  destroy_union_operation (eo);
+  _GSS_union_operation_destroy (eo);
 }
 
 
@@ -498,7 +490,7 @@
 static void
 send_operation_request (struct UnionEvaluateOperation *eo)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct OperationRequestMessage *msg;
 
   mqm = GNUNET_MQ_msg_nested_mh (msg, 
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
@@ -512,7 +504,7 @@
   }
   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
   msg->app_id = eo->app_id;
-  GNUNET_MQ_send (eo->mq, mqm);
+  GNUNET_MQ_send (eo->tc->mq, mqm);
 
   if (NULL != eo->context_msg)
   {
@@ -562,7 +554,7 @@
  * Insert an element into the union operation's
  * key-to-element mapping
  *
- * @param the union operation
+ * @param eo the union operation
  * @param ee the element entry
  */
 static void
@@ -685,7 +677,7 @@
   while (buckets_sent < (1 << ibf_order))
   {
     unsigned int buckets_in_message;
-    struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MQ_Envelope *mqm;
     struct IBFMessage *msg;
 
     buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -700,7 +692,7 @@
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
-    GNUNET_MQ_send (eo->mq, mqm);
+    GNUNET_MQ_send (eo->tc->mq, mqm);
   }
 
   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
@@ -715,14 +707,14 @@
 static void
 send_strata_estimator (struct UnionEvaluateOperation *eo)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_MessageHeader *strata_msg;
 
   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * 
SE_IBF_SIZE,
                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
-  GNUNET_MQ_send (eo->mq, mqm);
+  GNUNET_MQ_send (eo->tc->mq, mqm);
   eo->phase = PHASE_EXPECT_IBF;
 }
 
@@ -751,7 +743,7 @@
 /**
  * Handle a strata estimator from a remote peer
  *
- * @param the union operation
+ * @param cls the union operation
  * @param mh the message
  */
 static void
@@ -804,7 +796,7 @@
   while (NULL != ke)
   {
     const struct GNUNET_SET_Element *const element = &ke->element->element;
-    struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MQ_Envelope *mqm;
     struct GNUNET_MessageHeader *mh;
 
     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
@@ -817,7 +809,7 @@
     }
     memcpy (&mh[1], element->data, element->size);
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
-    GNUNET_MQ_send (eo->mq, mqm);
+    GNUNET_MQ_send (eo->tc->mq, mqm);
     ke = ke->next_colliding;
   }
   return GNUNET_NO;
@@ -889,11 +881,11 @@
     }
     if (GNUNET_NO == res)
     {
-      struct GNUNET_MQ_Message *mqm;
+      struct GNUNET_MQ_Envelope *mqm;
 
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending 
DONE\n");
       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-      GNUNET_MQ_send (eo->mq, mqm);
+      GNUNET_MQ_send (eo->tc->mq, mqm);
       break;
     }
     if (1 == side)
@@ -902,7 +894,7 @@
     }
     else
     {
-      struct GNUNET_MQ_Message *mqm;
+      struct GNUNET_MQ_Envelope *mqm;
       struct GNUNET_MessageHeader *msg;
 
       /* FIXME: before sending the request, check if we may just have the 
element */
@@ -910,7 +902,7 @@
       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
                                         
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
       *(struct IBF_Key *) &msg[1] = key;
-      GNUNET_MQ_send (eo->mq, mqm);
+      GNUNET_MQ_send (eo->tc->mq, mqm);
     }
   }
   ibf_destroy (diff_ibf);
@@ -987,7 +979,7 @@
 send_client_element (struct UnionEvaluateOperation *eo,
                      struct GNUNET_SET_Element *element)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ResultMessage *rm;
 
   GNUNET_assert (0 != eo->request_id);
@@ -1006,39 +998,17 @@
 
 
 /**
- * Completion callback for shutdown
- *
- * @param cls the closure from GNUNET_STREAM_shutdown call
- * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
- *          SHUT_RDWR) 
- */
-/*
-static void 
-stream_shutdown_cb (void *cls,
-                    int operation)
-{
-  //struct UnionEvaluateOperation *eo = cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
-
-  // destroy_union_operation (eo);
-}
-*/
-
-
-/**
  * Send a result message to the client indicating
  * that the operation is over.
  * After the result done message has been sent to the client,
  * destroy the evaluate operation.
  *
  * @param eo union operation
- * @param element element to send
  */
 static void
 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ResultMessage *rm;
 
   GNUNET_assert (0 != eo->request_id);
@@ -1047,7 +1017,6 @@
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   GNUNET_MQ_send (eo->set->client_mq, mqm);
 
-  // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
 }
 
 
@@ -1153,13 +1122,13 @@
   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
   {
     /* we got all requests, but still have to send our elements as response */
-    struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MQ_Envelope *mqm;
 
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after 
elements\n");
     eo->phase = PHASE_FINISHED;
     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
-    GNUNET_MQ_send (eo->mq, mqm);
+    GNUNET_MQ_send (eo->tc->mq, mqm);
     return;
   }
   if (eo->phase == PHASE_EXPECT_ELEMENTS)
@@ -1175,48 +1144,11 @@
 
 
 /**
- * The handlers array, used for both evaluate and accept
- */
-static const struct GNUNET_MQ_Handler union_handlers[] = {
-  {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
-  {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
-  {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
-  {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
-  {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
-  GNUNET_MQ_HANDLERS_END
-};
-
-
-/**
- * Functions of this type will be called when a stream is established
- * 
- * @param cls the closure from GNUNET_STREAM_open
- * @param socket socket to use to communicate with the
- *        other side (read/write)
- */
-static void
-stream_open_cb (void *cls,
-                struct GNUNET_STREAM_Socket *socket)
-{
-  struct UnionEvaluateOperation *eo = cls;
-
-  GNUNET_assert (NULL == eo->mq);
-  GNUNET_assert (socket == eo->socket);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
-             "open cb successful\n");
-  eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
-  /* we started the operation, thus we have to send the operation request */
-  send_operation_request (eo);
-  eo->phase = PHASE_EXPECT_SE;
-}
-
-
-/**
  * Evaluate a union operation with
  * a remote peer.
  *
  * @param m the evaluate request message from the client
- * @parem set the set to evaluate the operation with
+ * @param set the set to evaluate the operation with
  */
 void
 _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
@@ -1243,14 +1175,20 @@
              "evaluating union operation, (app %s)\n", 
               GNUNET_h2s (&eo->app_id));
 
-  eo->socket = 
-      GNUNET_STREAM_open (configuration, &eo->peer, 
GNUNET_APPLICATION_TYPE_SET,
-                          &stream_open_cb, eo,
-                          GNUNET_STREAM_OPTION_END);
+  eo->tc = GNUNET_new (struct TunnelContext);
+  eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
+                                              GNUNET_APPLICATION_TYPE_SET);
+  GNUNET_assert (NULL != eo->tc->tunnel);
+  eo->tc->peer = eo->peer;
+  eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
+  /* we started the operation, thus we have to send the operation request */
+  eo->phase = PHASE_EXPECT_SE;
+
   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
                                eo->set->state.u->ops_tail,
                                eo);
-  /* the stream open callback will kick off the operation */
+
+  send_operation_request (eo);
 }
 
 
@@ -1270,25 +1208,17 @@
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
 
   eo = GNUNET_new (struct UnionEvaluateOperation);
+  eo->tc = incoming->tc;
   eo->generation_created = set->state.u->current_generation++;
   eo->set = set;
-  eo->peer = incoming->peer;
   eo->salt = ntohs (incoming->salt);
   GNUNET_assert (0 != ntohl (m->request_id));
   eo->request_id = ntohl (m->request_id);
   eo->se = strata_estimator_dup (set->state.u->se);
-  eo->mq = incoming->mq;
   /* transfer ownership of mq and socket from incoming to eo */
-  incoming->mq = NULL;
-  eo->socket = incoming->socket;
-  incoming->socket = NULL;
-  /* the peer's socket is now ours, we'll receive all messages */
-  GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
-
   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
                                eo->set->state.u->ops_tail,
                                eo);
-
   /* kick off the operation */
   send_strata_estimator (eo);
 }
@@ -1384,7 +1314,7 @@
 
   while (NULL != set->state.u->ops_head)
   {
-    destroy_union_operation (set->state.u->ops_head);
+    _GSS_union_operation_destroy (set->state.u->ops_head);
   }
 }
 
@@ -1418,3 +1348,57 @@
   ee->generation_removed = set->state.u->current_generation;
 }
 
+
+/**
+ * Dispatch messages for a union operation.
+ *
+ * @param cls closure
+ * @param tunnel mesh tunnel
+ * @param tunnel_ctx tunnel context
+ * @param sender ???
+ * @param mh message to process
+ * @return ???
+ */
+int
+_GSS_union_handle_p2p_message (void *cls,
+                               struct GNUNET_MESH_Tunnel *tunnel,
+                               void **tunnel_ctx,
+                               const struct GNUNET_PeerIdentity *sender,
+                               const struct GNUNET_MessageHeader *mh)
+{
+  struct TunnelContext *tc = *tunnel_ctx;
+  struct UnionEvaluateOperation *eo;
+
+  if (CONTEXT_OPERATION_UNION != tc->type)
+  {
+    /* FIXME: kill the tunnel */
+    /* never kill mesh */
+    return GNUNET_OK;
+  }
+
+  eo = tc->data;
+
+  switch (ntohs (mh->type))
+  {
+    case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
+      handle_p2p_ibf (eo, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
+      handle_p2p_strata_estimator (eo, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
+      handle_p2p_elements (eo, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
+      handle_p2p_element_requests (eo, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
+      handle_p2p_done (eo, mh);
+      break;
+    default:
+      /* something wrong with mesh's message handlers? */
+      GNUNET_assert (0);
+  }
+  /* never kill mesh! */
+  return GNUNET_OK;
+}

Copied: gnunet/src/set/gnunet-set-ibf-profiler.c (from rev 27417, 
gnunet/src/set/gnunet-set-ibf.c)
===================================================================
--- gnunet/src/set/gnunet-set-ibf-profiler.c                            (rev 0)
+++ gnunet/src/set/gnunet-set-ibf-profiler.c    2013-06-19 10:48:54 UTC (rev 
27485)
@@ -0,0 +1,245 @@
+/*
+     This file is part of GNUnet.
+     (C) 2012 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/gnunet-set-ibf-profiler.c
+ * @brief tool for profiling the invertible bloom filter implementation
+ * @author Florian Dold
+ */
+
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_container_lib.h"
+#include "gnunet_util_lib.h"
+
+#include "ibf.h"
+
+static unsigned int asize = 10;
+static unsigned int bsize = 10;
+static unsigned int csize = 10;
+static unsigned int hash_num = 4;
+static unsigned int ibf_size = 80;
+
+/* FIXME: add parameter for this */
+static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
+
+static struct GNUNET_CONTAINER_MultiHashMap *set_a;
+static struct GNUNET_CONTAINER_MultiHashMap *set_b;
+/* common elements in a and b */
+static struct GNUNET_CONTAINER_MultiHashMap *set_c;
+
+static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
+
+static struct InvertibleBloomFilter *ibf_a;
+static struct InvertibleBloomFilter *ibf_b;
+
+
+static void
+register_hashcode (struct GNUNET_HashCode *hash)
+{
+  struct GNUNET_HashCode replicated;
+  struct IBF_Key key;
+  key = ibf_key_from_hashcode (hash);
+  ibf_hashcode_from_key (key, &replicated);
+  GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, 
GNUNET_memdup (hash, sizeof *hash),
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+static void
+iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, 
void *cls)
+{
+  struct GNUNET_HashCode replicated;
+  ibf_hashcode_from_key (key, &replicated);
+  GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, 
iter, cls);
+}
+
+
+static int
+insert_iterator (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 void *value)
+{
+  struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
+  ibf_insert (ibf, ibf_key_from_hashcode (key));
+  return GNUNET_YES;
+}
+
+
+static int
+remove_iterator (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 void *value)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
+  /* if remove fails, there just was a collision with another key */
+  (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
+  return GNUNET_YES;
+}
+
+
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_HashCode id;
+  struct IBF_Key ibf_key;
+  int i;
+  int side;
+  int res;
+  struct GNUNET_TIME_Absolute start_time;
+  struct GNUNET_TIME_Relative delta_time;
+
+  set_a = GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + 
csize)),
+                                                 GNUNET_NO);
+  set_b = GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + 
csize)),
+                                                GNUNET_NO);
+  set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
+                                                GNUNET_NO);
+
+  key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize 
== 0) ? 1 : (asize+bsize+csize)),
+                                                          GNUNET_NO);
+
+  printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
+          hash_num, ibf_size, asize, bsize, csize);
+
+  i = 0;
+  while (i < asize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    GNUNET_CONTAINER_multihashmap_put (
+        set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    register_hashcode (&id);
+    i++;
+  }
+  i = 0;
+  while (i < bsize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+      continue;
+    GNUNET_CONTAINER_multihashmap_put (
+        set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    register_hashcode (&id);
+    i++;
+  }
+  i = 0;
+  while (i < csize)
+  {
+    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+      continue;
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
+      continue;
+    GNUNET_CONTAINER_multihashmap_put (
+        set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    register_hashcode (&id);
+    i++;
+  }
+
+  ibf_a = ibf_create (ibf_size, hash_num);
+  ibf_b = ibf_create (ibf_size, hash_num);
+
+  printf ("generated sets\n");
+
+  start_time = GNUNET_TIME_absolute_get ();
+
+  GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
+  GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
+  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
+  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
+
+  delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+
+  printf ("encoded in: %s\n", GNUNET_STRINGS_relative_time_to_string 
(delta_time, GNUNET_NO));
+
+  ibf_subtract (ibf_a, ibf_b);
+
+
+  start_time = GNUNET_TIME_absolute_get ();
+
+  for (i = 0; i <= asize + bsize; i++)
+  {
+    res = ibf_decode (ibf_a, &side, &ibf_key);
+    if (GNUNET_SYSERR == res) 
+    {
+      printf ("decode failed, %u/%u elements left\n",
+         GNUNET_CONTAINER_multihashmap_size (set_a) + 
GNUNET_CONTAINER_multihashmap_size (set_b),
+         asize + bsize);
+      return;
+    }
+    if (GNUNET_NO == res)
+    {
+      if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
+          (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
+      {
+        delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+        printf ("decoded successfully in: %s\n", 
GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+      }
+      else
+      {
+        printf ("decode missed elements (should never happen)\n");
+      }
+      return;
+    }
+
+    if (side == 1)
+      iter_hashcodes (ibf_key, remove_iterator, set_a);
+    if (side == -1)
+      iter_hashcodes (ibf_key, remove_iterator, set_b);
+  }
+  printf("cyclic IBF, %u/%u elements left\n",
+         GNUNET_CONTAINER_multihashmap_size (set_a) + 
GNUNET_CONTAINER_multihashmap_size (set_b),
+         asize + bsize);
+}
+
+int
+main (int argc, char **argv)
+{
+  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+    {'A', "asize", NULL,
+     gettext_noop ("number of element in set A-B"), 1,
+     &GNUNET_GETOPT_set_uint, &asize},
+    {'B', "bsize", NULL,
+     gettext_noop ("number of element in set B-A"), 1,
+     &GNUNET_GETOPT_set_uint, &bsize},
+    {'C', "csize", NULL,
+     gettext_noop ("number of common elements in A and B"), 1,
+     &GNUNET_GETOPT_set_uint, &csize},
+    {'k', "hash-num", NULL,
+     gettext_noop ("hash num"), 1,
+     &GNUNET_GETOPT_set_uint, &hash_num},
+    {'s', "ibf-size", NULL,
+     gettext_noop ("ibf size"), 1,
+     &GNUNET_GETOPT_set_uint, &ibf_size},
+    GNUNET_GETOPT_OPTION_END
+  };
+  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-ibf",
+                      "help",
+                      options, &run, NULL, GNUNET_YES);
+  return 0;
+}
+

Deleted: gnunet/src/set/gnunet-set-ibf.c
===================================================================
--- gnunet/src/set/gnunet-set-ibf.c     2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/gnunet-set-ibf.c     2013-06-19 10:48:54 UTC (rev 27485)
@@ -1,238 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2012 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file consensus/gnunet-consensus-ibf.c
- * @brief tool for reconciling data with invertible bloom filters
- * @author Florian Dold
- */
-
-
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_util_lib.h"
-
-#include "ibf.h"
-
-static unsigned int asize = 10;
-static unsigned int bsize = 10;
-static unsigned int csize = 10;
-static unsigned int hash_num = 3;
-static unsigned int ibf_size = 80;
-
-/* FIXME: add parameter for this */
-static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
-
-static struct GNUNET_CONTAINER_MultiHashMap *set_a;
-static struct GNUNET_CONTAINER_MultiHashMap *set_b;
-/* common elements in a and b */
-static struct GNUNET_CONTAINER_MultiHashMap *set_c;
-
-static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
-
-static struct InvertibleBloomFilter *ibf_a;
-static struct InvertibleBloomFilter *ibf_b;
-
-
-static void
-register_hashcode (struct GNUNET_HashCode *hash)
-{
-  struct GNUNET_HashCode replicated;
-  struct IBF_Key key;
-  key = ibf_key_from_hashcode (hash);
-  ibf_hashcode_from_key (key, &replicated);
-  GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, 
GNUNET_memdup (hash, sizeof *hash),
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-}
-
-static void
-iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, 
void *cls)
-{
-  struct GNUNET_HashCode replicated;
-  ibf_hashcode_from_key (key, &replicated);
-  GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, 
iter, cls);
-}
-
-
-static int
-insert_iterator (void *cls,
-                 const struct GNUNET_HashCode *key,
-                 void *value)
-{
-  struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
-  ibf_insert (ibf, ibf_key_from_hashcode (key));
-  return GNUNET_YES;
-}
-
-
-static int
-remove_iterator (void *cls,
-                 const struct GNUNET_HashCode *key,
-                 void *value)
-{
-  struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
-  /* if remove fails, there just was a collision with another key */
-  (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
-  return GNUNET_YES;
-}
-
-
-static void
-run (void *cls, char *const *args, const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-  struct GNUNET_HashCode id;
-  struct IBF_Key ibf_key;
-  int i;
-  int side;
-  int res;
-  struct GNUNET_TIME_Absolute start_time;
-  struct GNUNET_TIME_Relative delta_time;
-
-  set_a = GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + 
csize)),
-                                                 GNUNET_NO);
-  set_b = GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + 
csize)),
-                                                GNUNET_NO);
-  set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
-                                                GNUNET_NO);
-
-  key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize 
== 0) ? 1 : (asize+bsize+csize)),
-                                                          GNUNET_NO);
-
-  printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
-          hash_num, ibf_size, asize, bsize, csize);
-
-  i = 0;
-  while (i < asize)
-  {
-    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
-      continue;
-    GNUNET_CONTAINER_multihashmap_put (
-        set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    register_hashcode (&id);
-    i++;
-  }
-  i = 0;
-  while (i < bsize)
-  {
-    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
-      continue;
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
-      continue;
-    GNUNET_CONTAINER_multihashmap_put (
-        set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    register_hashcode (&id);
-    i++;
-  }
-  i = 0;
-  while (i < csize)
-  {
-    GNUNET_CRYPTO_hash_create_random (random_quality, &id);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
-      continue;
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
-      continue;
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
-      continue;
-    GNUNET_CONTAINER_multihashmap_put (
-        set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    register_hashcode (&id);
-    i++;
-  }
-
-  ibf_a = ibf_create (ibf_size, hash_num);
-  ibf_b = ibf_create (ibf_size, hash_num);
-
-  printf ("generated sets\n");
-
-  start_time = GNUNET_TIME_absolute_get ();
-
-  GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
-  GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
-  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
-  GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
-
-  delta_time = GNUNET_TIME_absolute_get_duration (start_time);
-
-  printf ("encoded in: %s\n", GNUNET_STRINGS_relative_time_to_string 
(delta_time, GNUNET_NO));
-
-  ibf_subtract (ibf_a, ibf_b);
-
-
-  start_time = GNUNET_TIME_absolute_get ();
-
-  for (;;)
-  {
-    res = ibf_decode (ibf_a, &side, &ibf_key);
-    if (GNUNET_SYSERR == res) 
-    {
-      printf ("decode failed\n");
-      return;
-    }
-    if (GNUNET_NO == res)
-    {
-      if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
-          (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
-      {
-        delta_time = GNUNET_TIME_absolute_get_duration (start_time);
-        printf ("decoded successfully in: %s\n", 
GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
-      }
-      else
-        printf ("decode missed elements\n");
-      return;
-    }
-
-    if (side == 1)
-      iter_hashcodes (ibf_key, remove_iterator, set_a);
-    if (side == -1)
-      iter_hashcodes (ibf_key, remove_iterator, set_b);
-  }
-}
-
-int
-main (int argc, char **argv)
-{
-  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-    {'A', "asize", NULL,
-     gettext_noop ("number of element in set A-B"), 1,
-     &GNUNET_GETOPT_set_uint, &asize},
-    {'B', "bsize", NULL,
-     gettext_noop ("number of element in set B-A"), 1,
-     &GNUNET_GETOPT_set_uint, &bsize},
-    {'C', "csize", NULL,
-     gettext_noop ("number of common elements in A and B"), 1,
-     &GNUNET_GETOPT_set_uint, &csize},
-    {'k', "hash-num", NULL,
-     gettext_noop ("hash num"), 1,
-     &GNUNET_GETOPT_set_uint, &hash_num},
-    {'s', "ibf-size", NULL,
-     gettext_noop ("ibf size"), 1,
-     &GNUNET_GETOPT_set_uint, &ibf_size},
-    GNUNET_GETOPT_OPTION_END
-  };
-  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-ibf",
-                      "help",
-                      options, &run, NULL, GNUNET_YES);
-  return 0;
-}
-

Added: gnunet/src/set/gnunet-set-profiler.c
===================================================================
--- gnunet/src/set/gnunet-set-profiler.c                                (rev 0)
+++ gnunet/src/set/gnunet-set-profiler.c        2013-06-19 10:48:54 UTC (rev 
27485)
@@ -0,0 +1,320 @@
+/*
+      This file is part of GNUnet
+      (C) 2013 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+      Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file set/gnunet-set-profiler.c
+ * @brief profiling tool for set
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_set_service.h"
+#include "gnunet_testbed_service.h"
+
+
+static int ret;
+
+static unsigned int num_a = 5;
+static unsigned int num_b = 5;
+static unsigned int num_c = 20;
+
+static unsigned int salt = 42;
+
+static char* op_str = "union";
+
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+struct GNUNET_CONTAINER_MultiHashMap *map_a;
+struct GNUNET_CONTAINER_MultiHashMap *map_b;
+struct GNUNET_CONTAINER_MultiHashMap *map_c;
+
+
+/**
+ * Elements that set a received, should match map_c
+ * in the end.
+ */
+struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
+
+/**
+ * Elements that set b received, should match map_c
+ * in the end.
+ */
+struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
+
+struct GNUNET_SET_Handle *set_a;
+struct GNUNET_SET_Handle *set_b;
+
+struct GNUNET_HashCode app_id;
+
+struct GNUNET_PeerIdentity local_peer;
+
+struct GNUNET_SET_ListenHandle *set_listener;
+
+struct GNUNET_SET_OperationHandle *set_oh1;
+struct GNUNET_SET_OperationHandle *set_oh2;
+
+
+int a_done;
+int b_done;
+
+
+
+static int
+map_remove_iterator (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     void *value)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *m = cls;
+  int ret;
+
+  ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
+  GNUNET_assert (GNUNET_OK == ret);
+  return GNUNET_YES;
+
+}
+
+
+static void
+set_result_cb_1 (void *cls,
+                 const struct GNUNET_SET_Element *element,
+                 enum GNUNET_SET_Status status)
+{
+  GNUNET_assert (GNUNET_NO == a_done);
+  GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+  switch (status)
+  {
+    case GNUNET_SET_STATUS_DONE:
+    case GNUNET_SET_STATUS_HALF_DONE:
+      a_done = GNUNET_YES;
+      GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, 
map_a_received);
+      GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received));
+      return;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_assert (0);
+      return;
+    case GNUNET_SET_STATUS_OK:
+      break;
+    default:
+      GNUNET_assert (0);
+  }
+  GNUNET_CONTAINER_multihashmap_put (map_a_received,
+                                     element->data, NULL,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_result_cb_2 (void *cls,
+                 const struct GNUNET_SET_Element *element,
+                 enum GNUNET_SET_Status status)
+{
+  GNUNET_assert (GNUNET_NO == b_done);
+  GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+  switch (status)
+  {
+    case GNUNET_SET_STATUS_DONE:
+    case GNUNET_SET_STATUS_HALF_DONE:
+      b_done = GNUNET_YES;
+      GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, 
map_b_received);
+      GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received));
+      return;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_assert (0);
+      return;
+    case GNUNET_SET_STATUS_OK:
+      break;
+    default:
+      GNUNET_assert (0);
+  }
+  GNUNET_CONTAINER_multihashmap_put (map_b_received,
+                                     element->data, NULL,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_listen_cb (void *cls,
+               const struct GNUNET_PeerIdentity *other_peer,
+               const struct GNUNET_MessageHeader *context_msg,
+               struct GNUNET_SET_Request *request)
+{
+  GNUNET_assert (NULL == set_oh2);
+  set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                               set_result_cb_2, NULL);
+  GNUNET_SET_commit (set_oh2, set_b);
+}
+
+
+
+static int
+set_insert_iterator (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     void *value)
+{
+  struct GNUNET_SET_Handle *set = cls;
+  struct GNUNET_SET_Element *el;
+
+  el = GNUNET_malloc (sizeof *el + sizeof *key);
+  el->type = 0;
+  memcpy (&el[1], key, sizeof *key);
+  el->data = &el[1];
+  el->size = sizeof *key;
+  GNUNET_SET_add_element (set, el, NULL, NULL);
+  GNUNET_free (el);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ * 
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+test_main (void *cls,
+          const struct GNUNET_CONFIGURATION_Handle *cfg,
+          struct GNUNET_TESTING_Peer *peer)
+{
+  unsigned int i;
+  struct GNUNET_HashCode hash;
+
+  config = cfg;
+
+  if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
+  {
+    GNUNET_assert (0);
+    return;
+  }
+  
+  map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+  map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+  map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
+
+  for (i = 0; i < num_a; i++)
+  {
+    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+    {
+      i--;
+      continue;
+    }
+    GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  }
+
+  for (i = 0; i < num_b; i++)
+  {
+    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+    {
+      i--;
+      continue;
+    }
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
+    {
+      i--;
+      continue;
+    }
+    GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  }
+
+  for (i = 0; i < num_c; i++)
+  {
+    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+    {
+      i--;
+      continue;
+    }
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
+    {
+      i--;
+      continue;
+    }
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
+    {
+      i--;
+      continue;
+    }
+    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  }
+
+  /* use last hash for app id */
+  app_id = hash;
+
+  /* FIXME: also implement intersection etc. */
+  set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+  set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+
+  GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a);
+  GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b);
+  GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a);
+  GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b);
+
+  set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
+                                    &app_id, set_listen_cb, NULL);
+
+  set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, 
GNUNET_SET_RESULT_ADDED,
+                       set_result_cb_1, NULL);
+  GNUNET_SET_commit (set_oh1, set_a);
+}
+
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+
+  ret = GNUNET_TESTING_peer_run ("test_set_api",
+                                 "test_set.conf",
+                                 &test_main, NULL);
+}
+
+
+int
+main (int argc, char **argv)
+{
+   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+      { 'A', "num-first", NULL,
+        gettext_noop ("number of values"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_a },
+      { 'B', "num-second", NULL,
+        gettext_noop ("number of values"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_b },
+      { 'B', "num-common", NULL,
+        gettext_noop ("number of values"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c },
+      { 'x', "operation", NULL,
+        gettext_noop ("oeration to execute"),
+        GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str },
+      GNUNET_GETOPT_OPTION_END
+  };
+  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
+                     "help",
+                     options, &run, NULL, GNUNET_YES);
+  return ret;
+}
+

Deleted: gnunet/src/set/gnunet-set.c
===================================================================
--- gnunet/src/set/gnunet-set.c 2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/gnunet-set.c 2013-06-19 10:48:54 UTC (rev 27485)
@@ -1,203 +0,0 @@
-/*
-      This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
-
-      GNUnet is free software; you can redistribute it and/or modify
-      it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
-      option) any later version.
-
-      GNUnet is distributed in the hope that it will be useful, but
-      WITHOUT ANY WARRANTY; without even the implied warranty of
-      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-      General Public License for more details.
-
-      You should have received a copy of the GNU General Public License
-      along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file set/gnunet-set.c
- * @brief profiling tool for the set service
- * @author Florian Dold
- */
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testbed_service.h"
-#include "gnunet_set_service.h"
-
-
-static struct GNUNET_PeerIdentity local_id;
-
-static struct GNUNET_HashCode app_id;
-static struct GNUNET_SET_Handle *set1;
-static struct GNUNET_SET_Handle *set2;
-static struct GNUNET_SET_ListenHandle *listen_handle;
-const static struct GNUNET_CONFIGURATION_Handle *config;
-
-int num_done;
-
-
-static void
-result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
-                enum GNUNET_SET_Status status)
-{
-  switch (status)
-  {
-    case GNUNET_SET_STATUS_OK:
-      printf ("set 1: got element\n");
-      break;
-    case GNUNET_SET_STATUS_FAILURE:
-      printf ("set 1: failure\n");
-      break;
-    case GNUNET_SET_STATUS_DONE:
-      printf ("set 1: done\n");
-      GNUNET_SET_destroy (set1);
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-}
-
-
-static void
-result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
-           enum GNUNET_SET_Status status)
-{
-  switch (status)
-  {
-    case GNUNET_SET_STATUS_OK:
-      printf ("set 2: got element\n");
-      break;
-    case GNUNET_SET_STATUS_FAILURE:
-      printf ("set 2: failure\n");
-      break;
-    case GNUNET_SET_STATUS_DONE:
-      printf ("set 2: done\n");
-      GNUNET_SET_destroy (set2);
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-}
-
-
-static void
-listen_cb (void *cls,
-           const struct GNUNET_PeerIdentity *other_peer,
-           const struct GNUNET_MessageHeader *context_msg,
-           struct GNUNET_SET_Request *request)
-{
-  struct GNUNET_SET_OperationHandle *oh;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
-  GNUNET_SET_listen_cancel (listen_handle);
-
-  oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, 
NULL);
-  GNUNET_SET_conclude (oh, set2);
-}
-
-
-/**
- * Start the set operation.
- *
- * @param cls closure, unused
- */
-static void
-start (void *cls)
-{
-  struct GNUNET_SET_OperationHandle *oh;
-  
-  listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
-                                     &app_id, listen_cb, NULL);
-  oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
-                            GNUNET_SET_RESULT_ADDED,
-                            result_cb_set1, NULL);
-  GNUNET_SET_conclude (oh, set1);
-}
-
-
-/**
- * Initialize the second set, continue
- *
- * @param cls closure, unused
- */
-static void
-init_set2 (void *cls)
-{
-  struct GNUNET_SET_Element element;
-
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
-
-  element.data = "hello";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set2, &element, NULL, NULL);
-  element.data = "quux";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set2, &element, start, NULL);
-}
-
-
-/**
- * Initialize the first set, continue.
- */
-static void
-init_set1 (void)
-{
-  struct GNUNET_SET_Element element;
-
-  element.data = "hello";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set1, &element, NULL, NULL);
-  element.data = "bar";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set1, &element, init_set2, NULL);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
-}
-
-
-/**
- * Main function that will be run.
- *
- * @param cls closure
- * @param args remaining command-line arguments
- * @param cfgfile name of the configuration file used (for saving, can be 
NULL!)
- * @param cfg configuration
- */
-static void
-run (void *cls, char *const *args,
-     const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-  static const char* app_str = "gnunet-set";
-
-  config = cfg;
-  
-  GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
-
-  GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
-
-  set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
-  set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
-
-  init_set1 ();
-}
-
-
-
-int
-main (int argc, char **argv)
-{
-   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-      GNUNET_GETOPT_OPTION_END
-  };
-  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
-                     "help",
-                     options, &run, NULL, GNUNET_NO);
-  return 0;
-}
-

Modified: gnunet/src/set/ibf.c
===================================================================
--- gnunet/src/set/ibf.c        2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/ibf.c        2013-06-19 10:48:54 UTC (rev 27485)
@@ -19,7 +19,7 @@
 */
 
 /**
- * @file consensus/ibf.c
+ * @file set/ibf.c
  * @brief implementation of the invertible bloom filter
  * @author Florian Dold
  */
@@ -27,6 +27,12 @@
 #include "ibf.h"
 
 /**
+ * Compute the key's hash from the key.
+ * Redefine to use a different hash function.
+ */
+#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof (struct 
IBF_KeyHash)))
+
+/**
  * Create a key from a hashcode.
  *
  * @param hash the hashcode
@@ -89,23 +95,21 @@
 ibf_get_indices (const struct InvertibleBloomFilter *ibf,
                  struct IBF_Key key, int *dst)
 {
-  struct GNUNET_HashCode bucket_indices;
-  unsigned int filled;
-  int i;
-  GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices);
-  filled = 0;
-  for (i = 0; filled < ibf->hash_num; i++)
+  uint32_t filled;
+  uint32_t i;
+  uint32_t bucket = key.key_val & 0xFFFFFFFF;
+
+  for (i = 0, filled=0; filled < ibf->hash_num; i++)
   {
-    unsigned int bucket;
     unsigned int j;
-    if ( (0 != i) && (0 == (i % 16)) )
-      GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), 
&bucket_indices);
-    bucket = bucket_indices.bits[i % 16] % ibf->size;
+    uint64_t x;
     for (j = 0; j < filled; j++)
       if (dst[j] == bucket)
         goto try_next;
-    dst[filled++] = bucket;
+    dst[filled++] = bucket % ibf->size;
     try_next: ;
+    x = ((uint64_t) bucket << 32) | i;
+    bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
   }
 }
 
@@ -116,16 +120,14 @@
                   const int *buckets, int side)
 {
   int i;
-  struct GNUNET_HashCode key_hash_sha;
-  struct IBF_KeyHash key_hash;
-  GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha);
-  key_hash.key_hash_val = key_hash_sha.bits[0];
+
   for (i = 0; i < ibf->hash_num; i++)
   {
     const int bucket = buckets[i];
     ibf->count[bucket].count_val += side;
     ibf->key_sum[bucket].key_val ^= key.key_val;
-    ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val;
+    ibf->key_hash_sum[bucket].key_hash_val
+        ^= IBF_KEY_HASH_VAL (key);
   }
 }
 
@@ -183,7 +185,6 @@
 {
   struct IBF_KeyHash hash;
   int i;
-  struct GNUNET_HashCode key_hash_sha;
   int buckets[ibf->hash_num];
 
   GNUNET_assert (NULL != ibf);
@@ -197,8 +198,7 @@
     if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
       continue;
 
-    GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), 
&key_hash_sha);
-    hash.key_hash_val = key_hash_sha.bits[0];
+    hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
 
     /* test if the hash matches the key */
     if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)

Modified: gnunet/src/set/ibf.h
===================================================================
--- gnunet/src/set/ibf.h        2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/ibf.h        2013-06-19 10:48:54 UTC (rev 27485)
@@ -19,7 +19,7 @@
 */
 
 /**
- * @file consensus/ibf.h
+ * @file set/ibf.h
  * @brief invertible bloom filter
  * @author Florian Dold
  */

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/set_api.c    2013-06-19 10:48:54 UTC (rev 27485)
@@ -40,7 +40,7 @@
 struct GNUNET_SET_Handle
 {
   struct GNUNET_CLIENT_Connection *client;
-  struct GNUNET_MQ_MessageQueue *mq;
+  struct GNUNET_MQ_Handle *mq;
   unsigned int messages_since_ack;
 };
 
@@ -73,7 +73,7 @@
    * Message sent to the server on calling conclude,
    * NULL if conclude has been called.
    */
-  struct GNUNET_MQ_Message *conclude_mqm;
+  struct GNUNET_MQ_Envelope *conclude_mqm;
 
   /**
    * Address of the request if in the conclude message,
@@ -89,7 +89,7 @@
 struct GNUNET_SET_ListenHandle
 {
   struct GNUNET_CLIENT_Connection *client;
-  struct GNUNET_MQ_MessageQueue* mq;
+  struct GNUNET_MQ_Handle* mq;
   GNUNET_SET_ListenCallback listen_cb;
   void *listen_cls;
 };
@@ -115,7 +115,7 @@
 
   if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
   {
-    struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MQ_Envelope *mqm;
     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
     GNUNET_MQ_send (set->mq, mqm);
   }
@@ -162,7 +162,7 @@
 
   if (GNUNET_NO == req->accepted)
   {
-    struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MQ_Envelope *mqm;
     struct GNUNET_SET_AcceptRejectMessage *amsg;
 
     mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
@@ -197,9 +197,9 @@
                    enum GNUNET_SET_OperationType op)
 {
   struct GNUNET_SET_Handle *set;
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_CreateMessage *msg;
-  static const struct GNUNET_MQ_Handler mq_handlers[] = {
+  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
     {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
     GNUNET_MQ_HANDLERS_END
   };
@@ -234,7 +234,7 @@
                         GNUNET_SET_Continuation cont,
                         void *cont_cls)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ElementMessage *msg;
 
   mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
@@ -262,7 +262,7 @@
                            GNUNET_SET_Continuation cont,
                            void *cont_cls)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ElementMessage *msg;
 
   mqm = GNUNET_MQ_msg_extra (msg, element->size, 
GNUNET_MESSAGE_TYPE_SET_REMOVE);
@@ -287,9 +287,9 @@
 
 
 /**
- * Create a set operation for evaluation with another peer.
+ * Prepare a set operation to be evaluated with another peer.
  * The evaluation will not start until the client provides
- * a local set with GNUNET_SET_conclude.
+ * a local set with GNUNET_SET_commit.
  *
  * @param other_peer peer with the other set
  * @param app_id hash for the application using the set
@@ -304,15 +304,15 @@
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
-                     const struct GNUNET_HashCode *app_id,
-                     const struct GNUNET_MessageHeader *context_msg,
-                     uint16_t salt,
-                     enum GNUNET_SET_ResultMode result_mode,
-                     GNUNET_SET_ResultIterator result_cb,
-                     void *result_cls)
+GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
+                    const struct GNUNET_HashCode *app_id,
+                    const struct GNUNET_MessageHeader *context_msg,
+                    uint16_t salt,
+                    enum GNUNET_SET_ResultMode result_mode,
+                    GNUNET_SET_ResultIterator result_cb,
+                    void *result_cls)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_EvaluateMessage *msg;
 
@@ -322,9 +322,6 @@
 
   mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 
context_msg);
 
-  if (NULL != context_msg)
-    LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
-
   msg->app_id = *app_id;
   msg->target_peer = *other_peer;
   msg->salt = salt;
@@ -356,9 +353,9 @@
                    void *listen_cls)
 {
   struct GNUNET_SET_ListenHandle *lh;
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_ListenMessage *msg;
-  static const struct GNUNET_MQ_Handler mq_handlers[] = {
+  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
     {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
     GNUNET_MQ_HANDLERS_END
   };
@@ -403,7 +400,7 @@
  * @param result_mode specified how results will be returned,
  *        see 'GNUNET_SET_ResultMode'.
  * @param result_cb callback for the results
- * @param result_cls closure for result_cb
+ * @param cls closure for result_cb
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
@@ -412,7 +409,7 @@
                    GNUNET_SET_ResultIterator result_cb,
                    void *cls)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_AcceptRejectMessage *msg;
 
@@ -441,7 +438,7 @@
 void
 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_OperationHandle *h_assoc;
 
   if (NULL != oh->set)
@@ -460,7 +457,7 @@
 
 
 /**
- * Conclude the given set operation using the given set. 
+ * Commit a set to be used with a set operation.
  * This function is called once we have fully constructed
  * the set that we want to use for the operation.  At this
  * time, the P2P protocol can then begin to exchange the
@@ -471,13 +468,13 @@
  * @param set the set to use for the operation
  */
 void
-GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
+GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
                     struct GNUNET_SET_Handle *set)
 {
   GNUNET_assert (NULL == oh->set);
   GNUNET_assert (NULL != oh->conclude_mqm);
   oh->set = set;
-  oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
+  oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh);
   *oh->request_id_addr = htonl (oh->request_id);
   GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
   oh->conclude_mqm = NULL;

Modified: gnunet/src/set/strata_estimator.c
===================================================================
--- gnunet/src/set/strata_estimator.c   2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/strata_estimator.c   2013-06-19 10:48:54 UTC (rev 27485)
@@ -19,7 +19,7 @@
 */
 
 /**
- * @file consensus/ibf.h
+ * @file set/ibf.h
  * @brief invertible bloom filter
  * @author Florian Dold
  */

Modified: gnunet/src/set/strata_estimator.h
===================================================================
--- gnunet/src/set/strata_estimator.h   2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/strata_estimator.h   2013-06-19 10:48:54 UTC (rev 27485)
@@ -19,7 +19,7 @@
 */
 
 /**
- * @file consensus/strata_estimator.h
+ * @file set/strata_estimator.h
  * @brief estimator of set difference
  * @author Florian Dold
  */

Modified: gnunet/src/set/test_set_api.c
===================================================================
--- gnunet/src/set/test_set_api.c       2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/set/test_set_api.c       2013-06-19 10:48:54 UTC (rev 27485)
@@ -95,7 +95,7 @@
   GNUNET_SET_listen_cancel (listen_handle);
 
   oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, 
NULL);
-  GNUNET_SET_conclude (oh, set2);
+  GNUNET_SET_commit (oh, set2);
 }
 
 
@@ -111,10 +111,10 @@
 
   listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
                                      &app_id, listen_cb, NULL);
-  oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
-                            GNUNET_SET_RESULT_ADDED,
-                            result_cb_set1, NULL);
-  GNUNET_SET_conclude (oh, set1);
+  oh = GNUNET_SET_prepare (&local_id, &app_id, NULL, 42,
+                           GNUNET_SET_RESULT_ADDED,
+                           result_cb_set1, NULL);
+  GNUNET_SET_commit (oh, set1);
 }
 
 

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/stream/stream_api.c      2013-06-19 10:48:54 UTC (rev 27485)
@@ -3779,11 +3779,11 @@
  * @param size the number of bytes written
  */
 static void 
-mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t 
size)
+mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status,
+                        size_t size)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
-  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
 
   switch (status)
   {
@@ -3793,56 +3793,32 @@
       /* FIXME: call shutdown handler */
       return;
     case GNUNET_STREAM_TIMEOUT:
-      if (NULL == mq->error_handler)
-        LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler 
installed for message queue\n");
-      else
-        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+      GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
       return;
     case GNUNET_STREAM_SYSERR:
-      if (NULL == mq->error_handler)
-        LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler 
installed for message queue\n");
-      else
-        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
+      GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
       return;
     default:
       GNUNET_assert (0);
       return;
   }
-  
-  /* call cb for message we finished sending */
-  mqm = mq->current_msg;
-  GNUNET_assert (NULL != mq->current_msg);
-  if (NULL != mqm->sent_cb)
-    mqm->sent_cb (mqm->sent_cls);
-  GNUNET_free (mqm);
 
   mss->wh = NULL;
 
-  mqm = mq->msg_head;
-  mq->current_msg = mqm;
-  if (NULL == mqm)
-    return;
-  GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
-  mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
-                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                 mq_stream_write_queued, mq);
-  GNUNET_assert (NULL != mss->wh);
+  GNUNET_MQ_impl_send_continue (mq);
 }
 
 
 static void
-mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
-                     struct GNUNET_MQ_Message *mqm)
+mq_stream_send_impl (struct GNUNET_MQ_Handle *mq,
+                     const struct GNUNET_MessageHeader *msg, void *impl_state)
 {
-  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+  struct MQStreamState *mss = impl_state;
 
-  if (NULL != mq->current_msg)
-  {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
-    return;
-  }
-  mq->current_msg = mqm;
-  mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+  /* no way to cancel sending now */
+  GNUNET_MQ_impl_send_commit (mq);
+
+  mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size),
                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                  mq_stream_write_queued, mq);
 }
@@ -3862,12 +3838,12 @@
  */
 static int
 mq_stream_mst_callback (void *cls, void *client,
-                     const struct GNUNET_MessageHeader *message)
+                        const struct GNUNET_MessageHeader *message)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct GNUNET_MQ_Handle *mq = cls;
 
   GNUNET_assert (NULL != message);
-  GNUNET_MQ_dispatch (mq, message);
+  GNUNET_MQ_inject_message (mq, message);
   return GNUNET_OK;
 }
 
@@ -3889,8 +3865,8 @@
                           const void *data,
                           size_t size)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
-  struct MQStreamState *mss;
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
   int ret;
 
   switch (status)
@@ -3901,45 +3877,33 @@
       /* FIXME: call shutdown handler */
       return 0;
     case GNUNET_STREAM_TIMEOUT:
-      if (NULL == mq->error_handler)
-        LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler 
installed for message queue\n");
-      else
-        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+      GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
       return 0;
     case GNUNET_STREAM_SYSERR:
-      if (NULL == mq->error_handler)
-        LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler 
installed for message queue\n");
-      else
-        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+      GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
       return 0;
     default:
       GNUNET_assert (0);
       return 0;
   }
   
-  mss = (struct MQStreamState *) mq->impl_state;
-  GNUNET_assert (GNUNET_STREAM_OK == status);
   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, 
GNUNET_NO);
   if (GNUNET_OK != ret)
   {
-    if (NULL == mq->error_handler)
-      LOG (GNUNET_ERROR_TYPE_WARNING,
-           "read error (message stream malformed), but no error handler 
installed for message queue\n");
-    else
-      mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
     return 0;
   }
-  /* we always read all data */
   mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
                                 mq_stream_data_processor, mq);
+  /* we always read all data */
   return size;
 }
 
 
 static void
-mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
-  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+  struct MQStreamState *mss = impl_state;
 
   if (NULL != mss->rh)
   {
@@ -3972,24 +3936,21 @@
  * @param error_handler callback for errors
  * @return the message queue for the socket
  */
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 
-                         const struct GNUNET_MQ_Handler *msg_handlers,
+                         const struct GNUNET_MQ_MessageHandler *msg_handlers,
                          GNUNET_MQ_ErrorHandler error_handler,
                          void *cls)
 {
-  struct GNUNET_MQ_MessageQueue *mq;
+  struct GNUNET_MQ_Handle *mq;
   struct MQStreamState *mss;
 
-  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
   mss = GNUNET_new (struct MQStreamState);
   mss->socket = socket;
-  mq->impl_state = mss;
-  mq->send_impl = mq_stream_send_impl;
-  mq->destroy_impl = mq_stream_destroy_impl;
-  mq->handlers = msg_handlers;
-  mq->handlers_cls = cls;
-  mq->error_handler = error_handler;
+  mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl,
+                                      mq_stream_destroy_impl, 
+                                      NULL,
+                                      mss, msg_handlers, error_handler, cls);
   if (NULL != msg_handlers)
   {
     mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);

Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c        2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/util/mq.c        2013-06-19 10:48:54 UTC (rev 27485)
@@ -31,7 +31,119 @@
 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
 
 
+struct GNUNET_MQ_Envelope
+{
+  /**
+   * Messages are stored in a linked list.
+   * Each queue has its own list of envelopes.
+   */
+  struct GNUNET_MQ_Envelope *next;
 
+  /**
+   * Messages are stored in a linked list
+   * Each queue has its own list of envelopes.
+   */
+  struct GNUNET_MQ_Envelope *prev;
+
+  /**
+   * Actual allocated message header,
+   * usually points to the end of the containing GNUNET_MQ_Envelope
+   */
+  struct GNUNET_MessageHeader *mh;
+
+  /**
+   * Queue the message is queued in, NULL if message is not queued.
+   */
+  struct GNUNET_MQ_Handle *parent_queue;
+
+  /**
+   * Called after the message was sent irrevocably.
+   */
+  GNUNET_MQ_NotifyCallback sent_cb;
+
+  /**
+   * Closure for send_cb
+   */
+  void *sent_cls;
+};
+
+
+/**
+ * Handle to a message queue.
+ */
+struct GNUNET_MQ_Handle
+{
+  /**
+   * Handlers array, or NULL if the queue should not receive messages
+   */
+  const struct GNUNET_MQ_MessageHandler *handlers;
+
+  /**
+   * Closure for the handler callbacks,
+   * as well as for the error handler.
+   */
+  void *handlers_cls;
+
+  /**
+   * Actual implementation of message sending,
+   * called when a message is added
+   */
+  GNUNET_MQ_SendImpl send_impl;
+
+  /**
+   * Implementation-dependent queue destruction function
+   */
+  GNUNET_MQ_DestroyImpl destroy_impl;
+
+  /**
+   * Implementation-specific state
+   */
+  void *impl_state;
+
+  /**
+   * Callback will be called when an error occurs.
+   */
+  GNUNET_MQ_ErrorHandler error_handler;
+
+  /**
+   * Linked list of messages pending to be sent
+   */
+  struct GNUNET_MQ_Envelope *envelope_head;
+
+  /**
+   * Linked list of messages pending to be sent
+   */
+  struct GNUNET_MQ_Envelope *envelope_tail;
+
+  /**
+   * Message that is currently scheduled to be
+   * sent. Not the head of the message queue, as the implementation
+   * needs to know if sending has been already scheduled or not.
+   */
+  struct GNUNET_MQ_Envelope *current_envelope;
+
+  /**
+   * Has the current envelope been commited?
+   * Either GNUNET_YES or GNUNET_NO.
+   */
+  int commited;
+
+  /**
+   * Map of associations, lazily allocated
+   */
+  struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+
+  /**
+   * Next id that should be used for the assoc_map,
+   * initialized lazily to a random value together with
+   * assoc_map
+   */
+  uint32_t assoc_id;
+};
+
+
+
+
 struct ServerClientSocketState
 {
   struct GNUNET_SERVER_Client *client;
@@ -42,9 +154,14 @@
 struct ClientConnectionState
 {
   /**
-   * Did we call receive?
+   * Did we call receive alread alreadyy?
    */
   int receive_active;
+
+  /**
+   * Do we also want to receive?
+   */
+  int receive_requested;
   struct GNUNET_CLIENT_Connection *connection;
   struct GNUNET_CLIENT_TransmitHandle *th;
 };
@@ -59,9 +176,9 @@
  * @param mh message to dispatch
  */
 void
-GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct 
GNUNET_MessageHeader *mh)
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct 
GNUNET_MessageHeader *mh)
 {
-  const struct GNUNET_MQ_Handler *handler;
+  const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
 
   handler = mq->handlers;
@@ -81,9 +198,28 @@
 }
 
 
+/**
+ * Call the right callback for an error condition.
+ *
+ * @param mq message queue
+ */
 void
-GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
+GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
+                        enum GNUNET_MQ_Error error)
 {
+  if (NULL == mq->error_handler)
+  {
+    /* FIXME: log what kind of error occured */
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler 
installed\n");
+    return;
+  }
+  mq->error_handler (mq->handlers_cls, error);
+}
+
+
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
+{
   GNUNET_assert (NULL == mqm->parent_queue);
   GNUNET_free (mqm);
 }
@@ -94,20 +230,156 @@
  * May only be called once per message.
  * 
  * @param mq message queue
- * @param mqm the message to send.
+ * @param ev the message to send.
  */
 void
-GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message 
*mqm)
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
 {
   GNUNET_assert (NULL != mq);
-  mq->send_impl (mq, mqm);
+  GNUNET_assert (NULL == ev->parent_queue);
+  
+  /* is the implementation busy? queue it! */
+  if (NULL != mq->current_envelope)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, 
ev);
+    return;
+  }
+  mq->current_envelope = ev;
+  mq->send_impl (mq, ev->mh, mq->impl_state);
 }
 
 
-struct GNUNET_MQ_Message *
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
+{
+  /* call is only valid if we're actually currently sending
+   * a message */
+  GNUNET_assert (NULL != mq);
+  GNUNET_assert (NULL != mq->current_envelope);
+  GNUNET_assert (GNUNET_YES == mq->commited);
+  mq->commited = GNUNET_NO;
+  GNUNET_free (mq->current_envelope);
+  if (NULL == mq->envelope_head)
+  {
+    mq->current_envelope = NULL;
+    return;
+  }
+
+
+  GNUNET_assert (NULL != mq->envelope_tail);
+  GNUNET_assert (NULL != mq->envelope_head);
+  mq->current_envelope = mq->envelope_head;
+  GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
+                               mq->current_envelope);
+  mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
+}
+
+
+/**
+ * Create a message queue for the specified handlers.
+ *
+ * @param send function the implements sending messages
+ * @param destroy function that implements destroying the queue
+ * @param destroy function that implements canceling a message
+ * @param state for the queue, passed to 'send' and 'destroy'
+ * @param handlers array of message handlers
+ * @param error_handler handler for read and write errors
+ * @return a new message queue
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
+                               GNUNET_MQ_DestroyImpl destroy,
+                               GNUNET_MQ_CancelImpl cancel,
+                               void *impl_state,
+                               const struct GNUNET_MQ_MessageHandler *handlers,
+                               GNUNET_MQ_ErrorHandler error_handler,
+                               void *cls)
+{
+  struct GNUNET_MQ_Handle *mq;
+
+  mq = GNUNET_new (struct GNUNET_MQ_Handle);
+  mq->send_impl = send;
+  mq->destroy_impl = destroy;
+  mq->handlers = handlers;
+  mq->handlers_cls = cls;
+  mq->impl_state = impl_state;
+
+  return mq;
+}
+
+
+/**
+ * Get the message that should currently be sent.
+ * Fails if there is no current message.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
+{
+  if (NULL == mq->current_envelope)
+    GNUNET_abort ();
+  if (NULL == mq->current_envelope->mh)
+    GNUNET_abort ();
+  return mq->current_envelope->mh;
+}
+
+
+/**
+ * Get the implementation state associated with the
+ * message queue.
+ *
+ * While the GNUNET_MQ_Impl* callbacks receive the
+ * implementation state, continuations that are scheduled
+ * by the implementation function often only have one closure
+ * argument, with this function it is possible to get at the
+ * implementation state when only passing the GNUNET_MQ_Handle
+ * as closure.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+void *
+GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
+{
+  return mq->impl_state;
+}
+
+
+
+/**
+ * Mark the current message as irrevocably sent, but do not
+ * proceed with sending the next message.
+ * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
+{
+  GNUNET_assert (NULL != mq->current_envelope);
+  GNUNET_assert (GNUNET_NO == mq->commited);
+  mq->commited = GNUNET_YES;
+  if (NULL != mq->current_envelope->sent_cb)
+    mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
+}
+
+
+struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t 
type)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
 
   mqm = GNUNET_malloc (sizeof *mqm + size);
   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
@@ -119,11 +391,11 @@
 }
 
 
-struct GNUNET_MQ_Message *
+struct GNUNET_MQ_Envelope *
 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t 
base_size, uint16_t type,
                           const struct GNUNET_MessageHeader *nested_mh)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   uint16_t size;
 
   if (NULL == nested_mh)
@@ -154,85 +426,62 @@
 transmit_queued (void *cls, size_t size,
                  void *buf)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
-  struct GNUNET_MQ_Message *mqm = mq->current_msg;
-  struct ServerClientSocketState *state = mq->impl_state;
+  struct GNUNET_MQ_Handle *mq = cls;
+  struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
+  const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
   size_t msg_size;
 
   GNUNET_assert (NULL != buf);
 
-  if (NULL != mqm->sent_cb)
-  {
-    mqm->sent_cb (mqm->sent_cls);
-  }
-
-  mq->current_msg = NULL;
-  GNUNET_assert (NULL != mqm);
-  msg_size = ntohs (mqm->mh->size);
+  msg_size = ntohs (msg->size);
   GNUNET_assert (size >= msg_size);
-  memcpy (buf, mqm->mh, msg_size);
-  GNUNET_free (mqm);
+  memcpy (buf, msg, msg_size);
   state->th = NULL;
 
-  if (NULL != mq->msg_head)
-  {
-    mq->current_msg = mq->msg_head;
-    GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
-    state->th = 
-        GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             &transmit_queued, mq);
-  }
+  GNUNET_MQ_impl_send_continue (mq);
+
   return msg_size;
 }
 
 
 
 static void
-server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                            void *impl_state)
 {
-  struct ServerClientSocketState *state;
+  struct ServerClientSocketState *state = impl_state;
   
   GNUNET_assert (NULL != mq);
-  state = mq->impl_state;
   GNUNET_assert (NULL != state);
   GNUNET_SERVER_client_drop (state->client);
   GNUNET_free (state);
 }
 
 static void
-server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct 
GNUNET_MQ_Message *mqm)
+server_client_send_impl (struct GNUNET_MQ_Handle *mq,
+                         const struct GNUNET_MessageHeader *msg, void 
*impl_state)
 {
-  struct ServerClientSocketState *state;
-  int msize;
+  struct ServerClientSocketState *state = impl_state;
 
   GNUNET_assert (NULL != mq);
-  state = mq->impl_state;
   GNUNET_assert (NULL != state);
 
-  if (NULL != state->th)
-  {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
-    return;
-  }
-  GNUNET_assert (NULL == mq->msg_head);
-  GNUNET_assert (NULL == mq->current_msg);
-  msize = ntohs (mqm->mh->size);
-  mq->current_msg = mqm;
+  GNUNET_MQ_impl_send_commit (mq);
+
   state->th = 
-      GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
+      GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            &transmit_queued, mq);
 }
 
 
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
 {
-  struct GNUNET_MQ_MessageQueue *mq;
+  struct GNUNET_MQ_Handle *mq;
   struct ServerClientSocketState *scss;
 
-  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  mq = GNUNET_new (struct GNUNET_MQ_Handle);
   scss = GNUNET_new (struct ServerClientSocketState);
   mq->impl_state = scss;
   scss->client = client;
@@ -254,24 +503,21 @@
 handle_client_message (void *cls,
                        const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct GNUNET_MQ_Handle *mq = cls;
   struct ClientConnectionState *state;
 
   state = mq->impl_state;
   
   if (NULL == msg)
   {
-    if (NULL == mq->error_handler)
-      LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler 
installed)\n");
-    else
-      mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
     return;
   }
 
   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 
-  GNUNET_MQ_dispatch (mq, msg);
+  GNUNET_MQ_inject_message (mq, msg);
 }
 
 
@@ -287,23 +533,22 @@
 connection_client_transmit_queued (void *cls, size_t size,
                  void *buf)
 {
-  struct GNUNET_MQ_MessageQueue *mq = cls;
-  struct GNUNET_MQ_Message *mqm = mq->current_msg;
+  struct GNUNET_MQ_Handle *mq = cls;
+  const struct GNUNET_MessageHeader *msg;
   struct ClientConnectionState *state = mq->impl_state;
   size_t msg_size;
 
+  GNUNET_assert (NULL != mq);
+  msg = GNUNET_MQ_impl_current (mq);
+
   if (NULL == buf)
   {
-    if (NULL == mq->error_handler)
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler 
installed\n");
-      return 0;
-    }
-    mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+    GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
     return 0;
   }
 
-  if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
+  if ( (GNUNET_YES == state->receive_requested) &&
+       (GNUNET_NO == state->receive_active) )
   {
     state->receive_active = GNUNET_YES;
     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
@@ -311,78 +556,53 @@
   }
 
 
-  GNUNET_assert (NULL != mqm);
+  msg_size = ntohs (msg->size);
+  GNUNET_assert (size >= msg_size);
+  memcpy (buf, msg, msg_size);
+  state->th = NULL;
 
-  if (NULL != mqm->sent_cb)
-  {
-    mqm->sent_cb (mqm->sent_cls);
-  }
+  GNUNET_MQ_impl_send_continue (mq);
 
-  mq->current_msg = NULL;
-  GNUNET_assert (NULL != buf);
-  msg_size = ntohs (mqm->mh->size);
-  GNUNET_assert (size >= msg_size);
-  memcpy (buf, mqm->mh, msg_size);
-  GNUNET_free (mqm);
-  state->th = NULL;
-  if (NULL != mq->msg_head)
-  {
-    mq->current_msg = mq->msg_head;
-    GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
-    state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs 
(mq->current_msg->mh->size), 
-                                             GNUNET_TIME_UNIT_FOREVER_REL, 
GNUNET_NO,
-                                             
&connection_client_transmit_queued, mq);
-  }
   return msg_size;
 }
 
 
 
 static void
-connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
 {
-  GNUNET_free (mq->impl_state);
+  GNUNET_free (impl_state);
 }
 
 static void
-connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
-                             struct GNUNET_MQ_Message *mqm)
+connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
+                             const struct GNUNET_MessageHeader *msg, void 
*impl_state)
 {
-  struct ClientConnectionState *state = mq->impl_state;
-  int msize;
+  struct ClientConnectionState *state = impl_state;
 
   GNUNET_assert (NULL != state);
+  GNUNET_assert (NULL == state->th);
 
-  if (NULL != state->th)
-  {
-    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
-    return;
-  }
-  GNUNET_assert (NULL == mq->current_msg);
-  mq->current_msg = mqm;
-  msize = ntohs (mqm->mh->size);
+  GNUNET_MQ_impl_send_commit (mq);
+
   state->th = 
-      GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
+      GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs 
(msg->size), 
                                            GNUNET_TIME_UNIT_FOREVER_REL, 
GNUNET_NO,
                                            &connection_client_transmit_queued, 
mq);
 }
 
 
-
-
-
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
-                                       const struct GNUNET_MQ_Handler 
*handlers,
+                                       const struct GNUNET_MQ_MessageHandler 
*handlers,
                                        void *cls)
 {
-  struct GNUNET_MQ_MessageQueue *mq;
+  struct GNUNET_MQ_Handle *mq;
   struct ClientConnectionState *state;
 
   GNUNET_assert (NULL != connection);
 
-  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->handlers = handlers;
   mq->handlers_cls = cls;
   state = GNUNET_new (struct ClientConnectionState);
@@ -390,16 +610,20 @@
   mq->impl_state = state;
   mq->send_impl = connection_client_send_impl;
   mq->destroy_impl = connection_client_destroy_impl;
+  if (NULL != handlers)
+    state->receive_requested = GNUNET_YES;
 
   return mq;
 }
 
 
 void
-GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
-                            const struct GNUNET_MQ_Handler *new_handlers,
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
+                            const struct GNUNET_MQ_MessageHandler 
*new_handlers,
                             void *cls)
 {
+  /* FIXME: notify implementation? */
+  /* FIXME: what about NULL handlers? abort receive? */
   mq->handlers = new_handlers;
   mq->handlers_cls = cls;
 }
@@ -413,8 +637,7 @@
  * @param assoc_data to associate
  */
 uint32_t
-GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
-                     struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
                      void *assoc_data)
 {
   uint32_t id;
@@ -433,7 +656,7 @@
 
 
 void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
   if (NULL == mq->assoc_map)
     return NULL;
@@ -442,7 +665,7 @@
 
 
 void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
 {
   void *val;
 
@@ -456,7 +679,7 @@
 
 
 void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
                        GNUNET_MQ_NotifyCallback cb,
                        void *cls)
 {
@@ -466,13 +689,13 @@
 
 
 void
-GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
+GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
 {
   /* FIXME: destroy all pending messages in the queue */
 
   if (NULL != mq->destroy_impl)
   {
-    mq->destroy_impl (mq);
+    mq->destroy_impl (mq, mq->impl_state);
   }
 
   GNUNET_free (mq);
@@ -480,7 +703,6 @@
 
 
 
-
 struct GNUNET_MessageHeader *
 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t 
base_size)
 {

Modified: gnunet/src/util/test_mq.c
===================================================================
--- gnunet/src/util/test_mq.c   2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/util/test_mq.c   2013-06-19 10:48:54 UTC (rev 27485)
@@ -40,7 +40,7 @@
 void
 test1 (void)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct MyMessage *mm;
   
   mm = NULL;
@@ -57,7 +57,7 @@
 void
 test2 (void)
 {
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_MessageHeader *mh;
 
   mqm = GNUNET_MQ_msg_header (42);

Modified: gnunet/src/util/test_mq_client.c
===================================================================
--- gnunet/src/util/test_mq_client.c    2013-06-19 09:34:15 UTC (rev 27484)
+++ gnunet/src/util/test_mq_client.c    2013-06-19 10:48:54 UTC (rev 27485)
@@ -60,6 +60,9 @@
     return;
   }
 
+  /* can happen if notify does not work */
+  GNUNET_assert (received < 2);
+
   GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
 }
 
@@ -98,14 +101,16 @@
 
 void send_cb (void *cls)
 {
+  /* the notify should only be called once */
+  GNUNET_assert (GNUNET_NO == notify);
   printf ("notify sent\n");
   notify = GNUNET_YES;
 }
 
 void test_mq (struct GNUNET_CLIENT_Connection *client)
 {
-  struct GNUNET_MQ_MessageQueue *mq;
-  struct GNUNET_MQ_Message *mqm;
+  struct GNUNET_MQ_Handle *mq;
+  struct GNUNET_MQ_Envelope *mqm;
 
   /* FIXME: test handling responses */
   mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]