gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27354 - in gnunet/src: . consensus dv include set stream u


From: gnunet
Subject: [GNUnet-SVN] r27354 - in gnunet/src: . consensus dv include set stream util
Date: Mon, 3 Jun 2013 12:53:49 +0200

Author: dold
Date: 2013-06-03 12:53:49 +0200 (Mon, 03 Jun 2013)
New Revision: 27354

Modified:
   gnunet/src/Makefile.am
   gnunet/src/consensus/Makefile.am
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/consensus_protocol.h
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/test_consensus.conf
   gnunet/src/dv/gnunet-service-dv.c
   gnunet/src/include/gnunet_consensus_service.h
   gnunet/src/include/gnunet_mq_lib.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-service-set.h
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/gnunet-set.c
   gnunet/src/set/set.h
   gnunet/src/set/set_api.c
   gnunet/src/set/test_set.conf
   gnunet/src/set/test_set_api.c
   gnunet/src/stream/stream_api.c
   gnunet/src/util/crypto_hash.c
   gnunet/src/util/mq.c
   gnunet/src/util/test_mq.c
Log:
- conclude for SET
- consensus with SET


Modified: gnunet/src/Makefile.am
===================================================================
--- gnunet/src/Makefile.am      2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/Makefile.am      2013-06-03 10:53:49 UTC (rev 27354)
@@ -3,7 +3,7 @@
 #endif
 
 if HAVE_EXPERIMENTAL
- EXP_DIR = gns consensus dv set experimentation
+ EXP_DIR = gns set dv consensus experimentation
 endif
 
 if LINUX

Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am    2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/consensus/Makefile.am    2013-06-03 10:53:49 UTC (rev 27354)
@@ -61,6 +61,8 @@
   $(top_builddir)/src/mesh/libgnunetmesh.la \
   $(top_builddir)/src/set/libgnunetset.la \
   $(GN_LIBINTL)
+gnunet_service_consensus_DEPENDENCIES = \
+  $(top_builddir)/src/set/libgnunetset.la
 
 gnunet_service_evil_consensus_SOURCES = \
  gnunet-service-consensus.c
@@ -71,6 +73,8 @@
   $(top_builddir)/src/mesh/libgnunetmesh.la \
   $(top_builddir)/src/set/libgnunetset.la \
   $(GN_LIBINTL)
+gnunet_service_evil_consensus_DEPENDENCIES = \
+  $(top_builddir)/src/set/libgnunetset.la
 gnunet_service_evil_consensus_CFLAGS = -DEVIL
 
 libgnunetconsensus_la_SOURCES = \

Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/consensus/consensus_api.c        2013-06-03 10:53:49 UTC (rev 
27354)
@@ -236,9 +236,9 @@
  */
 static void
 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
-                   struct GNUNET_CONSENSUS_ElementMessage *msg)
+                    struct GNUNET_CONSENSUS_ElementMessage *msg)
 {
-  struct GNUNET_CONSENSUS_Element element;
+  struct GNUNET_SET_Element element;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
 
@@ -424,7 +424,7 @@
  */
 void
 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
-                        const struct GNUNET_CONSENSUS_Element *element,
+                        const struct GNUNET_SET_Element *element,
                         GNUNET_CONSENSUS_InsertDoneCallback idc,
                         void *idc_cls)
 {

Modified: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h   2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/consensus/consensus_protocol.h   2013-06-03 10:53:49 UTC (rev 
27354)
@@ -38,12 +38,15 @@
 /**
  * Sent as context message for set reconciliation.
  */
-struct ConsensusRoundMessage
+struct GNUNET_CONSENSUS_RoundContextMessage
 {
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
+   */
   struct GNUNET_MessageHeader header;
-  uint8_t round;
-  uint8_t exp_round;
-  uint8_t exp_subround;
+  uint32_t round;
+  uint32_t exp_round;
+  uint32_t exp_subround;
 };
 
 GNUNET_NETWORK_STRUCT_END

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-06-03 10:53:49 UTC (rev 
27354)
@@ -133,7 +133,7 @@
   {
     int j;
     struct GNUNET_HashCode *val;
-    struct GNUNET_CONSENSUS_Element *element;
+    struct GNUNET_SET_Element *element;
     generate_indices(unique_indices);
 
     val = GNUNET_malloc (sizeof *val);
@@ -151,6 +151,8 @@
     }
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling 
conclude\n");
+
   for (i = 0; i < num_peers; i++)
     GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 
conclude_cb, consensus_handles[i]);
 }
@@ -194,7 +196,7 @@
 
 static void
 new_element_cb (void *cls,
-                const struct GNUNET_CONSENSUS_Element *element)
+                const struct GNUNET_SET_Element *element)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
 }

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-06-03 10:51:03 UTC 
(rev 27353)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-06-03 10:53:49 UTC 
(rev 27354)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
+      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -43,9 +43,9 @@
 
 
 /**
- * Number of exponential rounds, used in the inventory and completion round.
+ * Number of exponential rounds, used in the exp and completion round.
  */
-#define NUM_EXP_ROUNDS (4)
+#define NUM_EXP_ROUNDS 4
 
 /* forward declarations */
 
@@ -155,17 +155,17 @@
    * Permutation of peers for the current round,
    * maps logical index (for current round) to physical index (location in 
info array)
    */
-  int *shuffle;
+  uint32_t *shuffle;
 
   /**
    * Current round of the exponential scheme.
    */
-  int exp_round;
+  uint32_t exp_round;
 
   /**
    * Current sub-round of the exponential scheme.
    */
-  int exp_subround;
+  uint32_t exp_subround;
 
   /**
    * The partner for the current exp-round
@@ -201,17 +201,6 @@
   struct GNUNET_PeerIdentity peer_id;
 
   /**
-   * Do we connect to the peer, or does the peer connect to us?
-   * Only valid for all-to-all phases
-   */
-  int is_outgoing;
-
-  /**
-   * Did we receive/send a consensus hello?
-   */
-  int hello;
-
-  /**
    * Back-reference to the consensus session,
    * to that ConsensusPeerInformation can be used as a closure
    */
@@ -223,22 +212,14 @@
   int exp_subround_finished;
 
   /**
-   * GNUNET_YES if we synced inventory with this peer;
-   * GNUNET_NO otherwise.
+   * Set operation we are currently executing with this peer.
    */
-  int inventory_synced;
+  struct GNUNET_SET_OperationHandle *set_op;
 
   /**
-   * Round this peer seems to be in, according to the last SE we got.
-   * Necessary to store this, as we sometimes need to respond to a request 
from an
-   * older round, while we are already in the next round.
+   * Has conclude been called on the set_op?
    */
-  enum ConsensusRound apparent_round;
-
-  /**
-   * Set operation we are currently executing with this peer.
-   */
-  struct GNUNET_SET_OperationHandle *set_op;
+  int set_op_concluded;
 };
 
 
@@ -268,9 +249,8 @@
 static struct GNUNET_PeerIdentity my_peer;
 
 
-/*
 static int
-exp_subround_finished (const struct ConsensusSession *session)
+have_exp_subround_finished (const struct ConsensusSession *session)
 {
   int not_finished;
   not_finished = 0;
@@ -284,25 +264,9 @@
     return GNUNET_YES;
   return GNUNET_NO;
 }
-*/
 
 
 
-/*
-static int
-inventory_round_finished (struct ConsensusSession *session)
-{
-  int i;
-  int finished;
-  finished = 0;
-  for (i = 0; i < session->num_peers; i++)
-    if (GNUNET_YES == session->info[i].inventory_synced)
-      finished++;
-  if (finished >= (session->num_peers / 2))
-    return GNUNET_YES;
-  return GNUNET_NO;
-}
-*/
 
 /**
  * Destroy a session, free all resources associated with it.
@@ -341,39 +305,6 @@
 
 
 /**
- * Start the inventory round, contact all peers we are supposed to contact.
- *
- * @param session the current session
- */
-static void
-start_inventory (struct ConsensusSession *session)
-{
-  int i;
-  int last;
-
-  for (i = 0; i < session->num_peers; i++)
-    session->info[i].is_outgoing = GNUNET_NO;
-
-  last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % 
session->num_peers;
-  i = (session->local_peer_idx + 1) % session->num_peers;
-  while (i != last)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", 
session->local_peer_idx, i);
-    session->info[i].is_outgoing = GNUNET_YES;
-    // embrace_peer (&session->info[i]);
-    i = (i + 1) % session->num_peers;
-  }
-  // tie-breaker for even number of peers
-  if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all 
(tie-breaker)\n", session->local_peer_idx, i);
-    session->info[last].is_outgoing = GNUNET_YES;
-    // embrace_peer (&session->info[last]);
-  }
-}
-
-
-/**
  * Start the next round.
  * This function can be invoked as a timeout task, or called manually (tc will 
be NULL then).
  *
@@ -407,27 +338,8 @@
       subround_over (session, NULL);
       break;
     case CONSENSUS_ROUND_EXCHANGE:
-      /* handle two peers specially */
-      if (session->num_peers <= 2)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", 
session->local_peer_idx);
-        //GNUNET_CONTAINER_multihashmap_iterate (session->values, 
send_client_elements_iter, session);
-        //send_client_conclude_done (session);
-        session->current_round = CONSENSUS_ROUND_FINISH;
-        return;
-      }
-      session->current_round = CONSENSUS_ROUND_INVENTORY;
-      start_inventory (session);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      session->current_round = CONSENSUS_ROUND_COMPLETION;
-      session->exp_round = 0;
-      subround_over (session, NULL);
-      break;
-    case CONSENSUS_ROUND_COMPLETION:
+      /* FIXME: send all elements to client */
       session->current_round = CONSENSUS_ROUND_FINISH;
-      //send_client_conclude_done (session);
-      break;
     default:
       GNUNET_assert (0);
   }
@@ -435,68 +347,91 @@
 
 
 /**
- * Adapt the shuffle of the session for the current round.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
+ *
+ * @param session the session to create the new permutation for
  */
 static void
 shuffle (struct ConsensusSession *session)
 {
-  /* adapted from random_permute in util/crypto_random.c */
-  /* FIXME
-  unsigned int *ret;
-  unsigned int i;
-  unsigned int tmp;
-  uint32_t x;
+  uint32_t i;
+  uint32_t randomness[session->num_peers-1];
 
-  GNUNET_assert (n > 0);
-  ret = GNUNET_malloc (n * sizeof (unsigned int));
-  for (i = 0; i < n; i++)
-    ret[i] = i;
-  for (i = n - 1; i > 0; i--)
+  if (NULL == session->shuffle)
+    session->shuffle = GNUNET_malloc (session->num_peers * sizeof 
(*session->shuffle));
+
+  GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, 
sizeof (uint32_t),
+                     &session->global_id, sizeof (struct GNUNET_HashCode));
+
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle[i] = i;
+
+  for (i = session->num_peers - 1; i > 0; i--)
   {
-    x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
-    tmp = ret[x];
-    ret[x] = ret[i];
-    ret[i] = tmp;
+    uint32_t x;
+    uint32_t tmp;
+    x = randomness[i-1];
+    tmp = session->shuffle[x];
+    session->shuffle[x] = session->shuffle[i];
+    session->shuffle[i] = tmp;
   }
-  */
 }
 
 
 /**
  * Find and set the partner_incoming and partner_outgoing of our peer,
- * one of them may not exist in most cases.
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
  *
  * @param session the consensus session
  */
 static void
 find_partners (struct ConsensusSession *session)
 {
-  int mark[session->num_peers];
-  int i;
+  int arc;
+  int partner_idx;
+  int largest_arc;
+  int num_ghosts;
 
-  memset (mark, 0, session->num_peers * sizeof (int));
-  session->partner_incoming = session->partner_outgoing = NULL;
-  for (i = 0; i < session->num_peers; i++)
+  /* distance to neighboring peer in current subround */
+  arc = 1 << session->exp_subround;
+  partner_idx = (session->local_peer_idx + arc) % session->num_peers;
+  largest_arc = 1;
+  while (largest_arc < session->num_peers)
+    largest_arc <<= 1;
+  num_ghosts = largest_arc - session->num_peers;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts);
+
+  if (0 == (session->local_peer_idx & arc))
   {
-    int arc;
-    if (0 != mark[i])
-      continue;
-    arc = (i + (1 << session->exp_subround)) % session->num_peers;
-    mark[i] = mark[arc] = 1;
-    GNUNET_assert (i != arc);
-    if (i == session->local_peer_idx)
+    /* we are outgoing */
+    session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
+    /* are we a 'ghost' of a peer that would exist if
+     * the number of peers was a power of two, and thus have to partner
+     * with an additional peer?
+     */
+    if (session->local_peer_idx < num_ghosts)
     {
-      GNUNET_assert (NULL == session->partner_outgoing);
-      session->partner_outgoing = &session->info[session->shuffle[arc]];
-      session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+      int ghost_partner_idx;
+      ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers;
+      /* platform dependent; modulo sometimes returns negative values */
+      if (ghost_partner_idx < 0)
+        ghost_partner_idx += arc;
+      session->partner_incoming = 
&session->info[session->shuffle[ghost_partner_idx]];
     }
-    if (arc == session->local_peer_idx)
+    else
     {
-      GNUNET_assert (NULL == session->partner_incoming);
-      session->partner_incoming = &session->info[session->shuffle[i]];
-      session->partner_incoming->exp_subround_finished = GNUNET_NO;
+      session->partner_incoming = NULL;
     }
   }
+  else
+  {
+    session->partner_outgoing = NULL;
+    session->partner_incoming = &session->info[session->shuffle[partner_idx]];
+  }
 }
 
 
@@ -508,11 +443,42 @@
  * @param element a result element, only valid if status is 
GNUNET_SET_STATUS_OK
  * @param status see enum GNUNET_SET_Status
  */
-static void set_result_cb (void *cls,
-                           const struct GNUNET_SET_Element *element,
-                           enum GNUNET_SET_Status status)
+static void 
+set_result_cb (void *cls,
+               const struct GNUNET_SET_Element *element,
+               enum GNUNET_SET_Status status)
 {
-  /* FIXME */
+  struct ConsensusPeerInformation *cpi = cls;
+
+  switch (status)
+  {
+    case GNUNET_SET_STATUS_OK:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
+      break;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
+      break;
+    case GNUNET_SET_STATUS_HALF_DONE:
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
+      cpi->exp_subround_finished = GNUNET_YES;
+      if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+        subround_over (cpi->session, NULL);
+      return;
+    default:
+      GNUNET_break (0);
+      return;
+  }
+
+  switch (cpi->session->current_round)
+  {
+    case CONSENSUS_ROUND_EXCHANGE:
+      GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
+      break;
+    default:
+      GNUNET_break (0);
+      return;
+  }
 }
 
 
@@ -540,14 +506,6 @@
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
-  /* check if we are done with the log phase, 2-peer consensus only does one 
log round */
-  if ( (session->exp_round == NUM_EXP_ROUNDS) ||
-       ((session->num_peers == 2) && (session->exp_round == 1)))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", 
session->local_peer_idx);
-    round_over (session, NULL);
-    return;
-  }
   if (session->exp_round == 0)
   {
     /* initialize everything for the log-rounds */
@@ -575,18 +533,27 @@
 
   if (NULL != session->partner_outgoing)
   {
+    struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+    msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+    msg->header.size = htons (sizeof *msg);
+    msg->round = htonl (session->current_round);
+    msg->exp_round = htonl (session->exp_round);
+    msg->exp_subround = htonl (session->exp_subround);
+
     if (NULL != session->partner_outgoing->set_op)
+    {
       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
+    }
     session->partner_outgoing->set_op =
-        GNUNET_SET_evaluate (session->element_set, 
-                             &session->partner_outgoing->peer_id,
+        GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
                              &session->global_id,
-                             NULL, /* FIXME */
+                             (struct GNUNET_MessageHeader *) msg,
                              0, /* FIXME */
                              GNUNET_SET_RESULT_ADDED,
-                             set_result_cb, session);
-
-
+                             set_result_cb, session->partner_outgoing);
+    GNUNET_SET_conclude (session->partner_outgoing->set_op, 
session->element_set);
+    session->partner_outgoing->set_op_concluded = GNUNET_YES;
   }
 
 #ifdef GNUNET_EXTRA_LOGGING
@@ -642,6 +609,8 @@
   int i;
   struct GNUNET_HashCode tmp;
 
+  /* FIXME: use kdf? */
+
   session->global_id = *session_id;
   for (i = 0; i < session->num_peers; ++i)
   {
@@ -727,9 +696,6 @@
 }
 
 
-
-
-
 /**
  * Called when another peer wants to do a set operation with the
  * local peer.
@@ -750,7 +716,67 @@
                const struct GNUNET_MessageHeader *context_msg,
                struct GNUNET_SET_Request *request)
 {
-  /* FIXME */
+  struct ConsensusSession *session = cls;
+  struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct 
GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
+  struct ConsensusPeerInformation *cpi;
+  int index;
+
+  if (NULL == context_msg)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  index = get_peer_idx (other_peer, session);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s 
(&other_peer->hashPubKey));
+
+  if (index < 0)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  cpi = &session->info[index];
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", 
session->local_peer_idx, index);
+
+  switch (session->current_round)
+  {
+    case CONSENSUS_ROUND_EXCHANGE:
+      if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
+      {
+        GNUNET_break_op (0);
+        return;
+      }
+      if (ntohl (msg->exp_round) < session->exp_round)
+      {
+        GNUNET_break_op (0);
+        return;
+      }
+      if (ntohl (msg->exp_subround) < session->exp_subround)
+      {
+        GNUNET_break_op (0);
+        return;
+      }
+      if (NULL != cpi->set_op)
+        GNUNET_SET_operation_cancel (cpi->set_op);
+      cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                                       set_result_cb, &session->info[index]);
+      if (ntohl (msg->exp_subround) == session->exp_subround)
+      {
+        cpi->set_op_concluded = GNUNET_YES;
+        GNUNET_SET_conclude (cpi->set_op, session->element_set);
+      }
+      else
+      {
+        cpi->set_op_concluded = GNUNET_NO;
+      }
+      break;
+    default:
+      GNUNET_break_op (0);
+      return;
+  }
 }
 
 
@@ -769,7 +795,9 @@
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", 
session->num_peers);
   compute_global_id (session, &join_msg->session_id);
 
-  /* check if some local client already owns the session. */
+  /* check if some local client already owns the session.
+   * it is only legal to have a session with an existing global id
+   * if all other sessions with this global id are finished.*/
   other_session = sessions_head;
   while (NULL != other_session)
   {
@@ -789,6 +817,8 @@
 
   session->local_peer_idx = get_peer_idx (&my_peer, session);
   GNUNET_assert (-1 != session->local_peer_idx);
+  session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_assert (NULL != session->element_set);
   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
                                              &session->global_id,
                                              set_listen_cb, session);
@@ -827,6 +857,8 @@
 {
   struct ConsensusSession *session;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n");
+
   session = get_session_by_client (client);
   if (NULL != session)
   {
@@ -835,9 +867,13 @@
     return;
   }
   session = GNUNET_new (struct ConsensusSession);
+  session->client = client;
   GNUNET_SERVER_client_keep (client);
   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n");
 }
 
 
@@ -858,12 +894,7 @@
   struct GNUNET_SET_Element *element;
   ssize_t element_size;
 
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
+  session = get_session_by_client (client);
 
   if (NULL == session)
   {
@@ -886,6 +917,7 @@
     GNUNET_break (0);
     return;
   }
+
   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
   element->type = msg->element_type;
   element->size = element_size;
@@ -893,6 +925,8 @@
   element->data = &element[1];
   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", 
session->local_peer_idx);
 }
 
 
@@ -911,10 +945,10 @@
   struct ConsensusSession *session;
   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
 
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
-
   session = get_session_by_client (client);
-
   if (NULL == session)
   {
     /* client not found */
@@ -922,14 +956,12 @@
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
     /* client requested conclude twice */
     GNUNET_break (0);
     return;
   }
-
   if (session->num_peers <= 1)
   {
     //send_client_conclude_done (session);
@@ -937,7 +969,7 @@
   else
   {
     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
-    /* the 'begin' round is over, start with the next, real round */
+    /* the 'begin' round is over, start with the next, actual round */
     round_over (session, NULL);
   }
 
@@ -964,6 +996,29 @@
 
 
 /**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
+
+  session = get_session_by_client (client);
+  if (NULL == session)
+    return;
+  if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+      (CONSENSUS_ROUND_FINISH == session->current_round))
+    destroy_session (session);
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for 
consensus to finish\n");
+}
+
+
+/**
  * Start processing consensus requests.
  *
  * @param cls closure
@@ -971,13 +1026,14 @@
  * @param c configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct 
GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
-    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
     {NULL, NULL, 0, 0}
   };
 
@@ -992,6 +1048,7 @@
   }
   GNUNET_SERVER_add_handlers (server, server_handlers);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 
NULL);
+  GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 }
 

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/consensus/test_consensus.conf    2013-06-03 10:53:49 UTC (rev 
27354)
@@ -5,7 +5,7 @@
 HOME = $SERVICEHOME
 BINARY = gnunet-service-consensus
 #PREFIX = gdbserver :12345
-PREFIX = valgrind --leak-check=full
+PREFIX = valgrind
 ACCEPT_FROM = 127.0.0.1;
 ACCEPT_FROM6 = ::1;
 UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -19,8 +19,12 @@
 
 
 [arm]
-DEFAULTSERVICES = core consensus
+DEFAULTSERVICES = core consensus set
 
+[set]
+OPTIONS = -L INFO
+PREFIX = valgrind
 
+
 [testbed]
 OVERLAY_TOPOLOGY = CLIQUE

Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c   2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/dv/gnunet-service-dv.c   2013-06-03 10:53:49 UTC (rev 27354)
@@ -1404,7 +1404,6 @@
   neighbor->my_set = GNUNET_SET_create (cfg,
                                        GNUNET_SET_OPERATION_UNION);
   neighbor->set_op = GNUNET_SET_accept (request,
-                                       neighbor->my_set /* FIXME: pass later! 
*/,
                                        GNUNET_SET_RESULT_ADDED,
                                        &handle_set_union_result,
                                        neighbor);
@@ -1428,8 +1427,7 @@
   neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
   neighbor->my_set = GNUNET_SET_create (cfg,
                                        GNUNET_SET_OPERATION_UNION);
-  neighbor->set_op = GNUNET_SET_evaluate (neighbor->my_set /* FIXME: pass 
later! */,
-                                         &neighbor->peer,
+  neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer,
                                          &neighbor->real_session_id,
                                          NULL,
                                          0 /* FIXME: salt */,

Modified: gnunet/src/include/gnunet_consensus_service.h
===================================================================
--- gnunet/src/include/gnunet_consensus_service.h       2013-06-03 10:51:03 UTC 
(rev 27353)
+++ gnunet/src/include/gnunet_consensus_service.h       2013-06-03 10:53:49 UTC 
(rev 27354)
@@ -39,31 +39,10 @@
 #include "gnunet_common.h"
 #include "gnunet_time_lib.h"
 #include "gnunet_configuration_lib.h"
+#include "gnunet_set_service.h"
 
 
 /**
- * An element of the consensus set.
- */
-struct GNUNET_CONSENSUS_Element
-{
-  /**
-   * The actual data of the element.
-   */
-   const void *data;
-
-   /**
-    * Size of the element's data.
-    */
-   uint16_t size;
-
-   /**
-    * Application specific element type
-    */
-   uint16_t type;
-};
-
-
-/**
  * Called when a new element was received from another peer, or an error 
occured.
  * May deliver duplicate values.
  * Elements given to a consensus operation by the local peer are NOT given
@@ -73,7 +52,7 @@
  * @param element new element, NULL on error
  */
 typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
-                                                 const struct 
GNUNET_CONSENSUS_Element *element);
+                                                 const struct 
GNUNET_SET_Element *element);
 
 
 
@@ -138,7 +117,7 @@
  */
 void
 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
-                         const struct GNUNET_CONSENSUS_Element *element,
+                         const struct GNUNET_SET_Element *element,
                          GNUNET_CONSENSUS_InsertDoneCallback idc,
                          void *idc_cls);
 

Modified: gnunet/src/include/gnunet_mq_lib.h
===================================================================
--- gnunet/src/include/gnunet_mq_lib.h  2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/include/gnunet_mq_lib.h  2013-06-03 10:53:49 UTC (rev 27354)
@@ -53,38 +53,8 @@
  */
 #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
 
-/**
- * Append data to the end of an existing MQ message.
- * If the operation is successful, mqm is changed to point to the new MQ 
message,
- * and GNUNET_OK is returned.
- * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
- * the user of this API must take care of disposing the already allocated 
message
- * (either by sending it, or by using GNUNET_MQ_discard)
- *
- * @param mqm MQ message to augment with additional data
- * @param src source buffer for the additional data
- * @param len length of the additional data
- * @return GNUNET_SYSERR if nesting the message failed,
- *         GNUNET_OK on success
- */
-#define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len)
 
-
 /**
- * Append a message to the end of an existing MQ message.
- * If the operation is successful, mqm is changed to point to the new MQ 
message,
- * and GNUNET_OK is returned.
- * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
- * the user of this API must take care of disposing the already allocated 
message
- * (either by sending it, or by using GNUNET_MQ_discard)
- *
- * @param mqm MQ message to augment with additional data
- * @param mh the message to append, must be of type 'struct 
GNUNET_MessageHeader *'
- */
-#define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : 
GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size)))
-
-
-/**
  * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
  * The allocated message will already have the type and size field set.
  *
@@ -105,6 +75,40 @@
 
 
 /**
+ * Allocate a GNUNET_MQ_Message, and append a payload message after the given
+ * message struct.
+ *
+ * @param mvar pointer to a message struct, will be changed to point at the 
newly allocated message,
+ *        whose size is 'sizeof(*mvar) + ntohs (mh->size)'
+ * @param type message type of the allocated message, has no effect on the 
nested message
+ * @param mh message to nest
+ * @return a newly allocated 'struct GNUNET_MQ_Message *'
+ */
+#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) 
GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct 
GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
+
+
+/**
+ * Return a pointer to the message at the end of the given message.
+ *
+ * @param var pointer to a message struct, the type of the expression 
determines the base size,
+ *        the space after the base size is the nested message
+ * @return a 'struct GNUNET_MessageHeader *' that points at the nested message 
of the given message,
+ *         or NULL if the given message in 'var' does not have any space after 
the message struct
+ */
+#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct 
GNUNET_MessageHeader *) (var), sizeof (*(var)))
+
+
+struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t 
base_size);
+
+
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t 
base_size, uint16_t type,
+                          const struct GNUNET_MessageHeader *nested_mh);
+
+
+
+/**
  * End-marker for the handlers array
  */
 #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
@@ -128,7 +132,8 @@
  * @param cls closure
  * @param msg the received message
  */
-typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct 
GNUNET_MessageHeader *msg);
+typedef void
+(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader 
*msg);
 
 
 /**
@@ -151,10 +156,12 @@
  *
  * @param cls closure
  */
-typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
+typedef void
+(*GNUNET_MQ_NotifyCallback) (void *cls);
 
 
-typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
+typedef void
+(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
 
 
 struct GNUNET_MQ_Message
@@ -287,6 +294,7 @@
 };
 
 
+
 /**
  * Create a new message for MQ.
  * 
@@ -300,21 +308,6 @@
 
 
 /**
- * Resize the the mq message pointed to by mqmp,
- * and append the given data to it.
- *
- * @param mqmp pointer to a mq message pointer
- * @param src source of the data to append
- * @param len length of the data to append
- * @return GNUNET_OK on success,
- *         GNUNET_SYSERR on error (e.g. if len is too large)
- */
-int
-GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
-                 const void *src, uint16_t len);
-
-
-/**
  * Discard the message queue message, free all
  * allocated resources. Must be called in the event
  * that a message is created but should not actually be sent.

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/include/gnunet_protocols.h       2013-06-03 10:53:49 UTC (rev 
27354)
@@ -1754,11 +1754,18 @@
  */
 #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548
 
+/**
+ * Abort a round, don't send requested elements anymore
+ */
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547
 
+
 
/*******************************************************************************
  * SET message types
  
******************************************************************************/
 
+#define GNUNET_MESSAGE_TYPE_SET_REJECT 569
+
 /**
  * Cancel a set operation
  */
@@ -1800,44 +1807,49 @@
 #define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577
 
 /**
- * Evaluate a set operation
+ * Start a set operation with the given set
  */
-#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578
+#define GNUNET_MESSAGE_TYPE_SET_CONCLUDE 578
 
 /**
- * Evaluate a set operation.
+ * Notify the client of a request from a remote peer
  */
-#define GNUNET_MESSAGE_TYPE_SET_CREATE 579
+#define GNUNET_MESSAGE_TYPE_SET_REQUEST 579
 
 /**
- * Evaluate a set operation.
+ * Create a new local set
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580
+#define GNUNET_MESSAGE_TYPE_SET_CREATE 580
 
 /**
+ * Request a set operation from a remote peer.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 581
+
+/**
  * Strata estimator.
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581
+#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 582
 
 /**
  * Invertible bloom filter.
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582
+#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 583
 
 /**
  * Actual set elements.
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 584
 
 /**
  * Requests for the elements with the given hashes.
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585
 
 /**
  * Operation is done.
  */
-#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585
+#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 586
 
 
 

Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/include/gnunet_set_service.h     2013-06-03 10:53:49 UTC (rev 
27354)
@@ -257,11 +257,10 @@
 
 
 /**
- * Evaluate a set operation with our set and the set of another peer.
+ * Create a set operation for evaluation with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with GNUNET_SET_conclude.
  *
- * @param set set to use -- FIXME: remove
- *            this argument, use GNUNET_SET_conclude instead! 
- * @param salt salt for HKDF (explain more here)
  * @param other_peer peer with the other set
  * @param app_id hash for the application using the set
  * @param context_msg additional information for the request
@@ -275,8 +274,7 @@
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
-                     const struct GNUNET_PeerIdentity *other_peer,
+GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
                      const struct GNUNET_HashCode *app_id,
                      const struct GNUNET_MessageHeader *context_msg,
                      uint16_t salt,
@@ -315,13 +313,13 @@
 
 
 /**
- * Accept a request we got via GNUNET_SET_listen.  Must be called
- * during GNUNET_SET_listen, as the 'struct GNUNET_SET_Request'
- * becomes invalid afterwards.
+ * Accept a request we got via GNUNET_SET_listen.  Must be called during
+ * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
+ * afterwards.
+ * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer. 
  *
  * @param request request to accept
- * @param set set used for the requested operation -- FIXME: remove
- *            this argument, use GNUNET_SET_conclude instead! 
  * @param result_mode specified how results will be returned,
  *        see 'GNUNET_SET_ResultMode'.
  * @param result_cb callback for the results
@@ -330,7 +328,6 @@
  */
 struct GNUNET_SET_OperationHandle *
 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
-                   struct GNUNET_SET_Handle *set,
                    enum GNUNET_SET_ResultMode result_mode,
                    GNUNET_SET_ResultIterator result_cb,
                    void *cls);
@@ -353,9 +350,9 @@
 
 
 /**
- * Cancel the given set operation.  FIXME: do clients have
- * to cancel the operatino if the GNUNET_SET_ResultIterator
- * has been called with timeout/error/done?
+ * Cancel the given set operation.
+ * May not be called after the operation's GNUNET_SET_ResultIterator has been
+ * called with a status that indicates error, timeout or done.
  *
  * @param oh set operation to cancel
  */

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/gnunet-service-set.c 2013-06-03 10:53:49 UTC (rev 27354)
@@ -226,7 +226,24 @@
   GNUNET_free (incoming);
 }
 
+static struct Listener *
+get_listener_by_target (enum GNUNET_SET_OperationType op,
+                        const struct GNUNET_HashCode *app_id)
+{
+  struct Listener *l;
 
+  for (l = listeners_head; NULL != l; l = l->next)
+  {
+    if (l->operation != op)
+      continue;
+    if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id))
+      continue;
+    return l;
+  }
+  return NULL;
+}
+
+
 /**
  * Handle a request for a set operation from
  * another peer.
@@ -240,62 +257,33 @@
   struct Incoming *incoming = cls;
   const struct OperationRequestMessage *msg = (const struct 
OperationRequestMessage *) mh;
   struct GNUNET_MQ_Message *mqm;
-  struct RequestMessage *cmsg;
+  struct GNUNET_SET_RequestMessage *cmsg;
   struct Listener *listener;
   const struct GNUNET_MessageHeader *context_msg;
 
-  if (ntohs (mh->size) < sizeof *msg)
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, 
app %s)\n",
+              ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
+  listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id);
+  if (NULL == listener)
   {
-    /* message is to small for its type */
-    GNUNET_break (0);
-    destroy_incoming (incoming);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "set operation request from peer failed: "
+                "no set with matching application ID and operation type\n");
     return;
   }
-  else if (ntohs (mh->size) == sizeof *msg)
+  mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, 
context_msg);
+  if (NULL == mqm)
   {
-    /* there is no context message */
-    context_msg = NULL;
-  }
-  else
-  {
-    context_msg = &msg[1].header;
-    if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
-    {
-      /* size of context message is invalid */
-      GNUNET_break (0);
-      destroy_incoming (incoming);
-      return;
-    }
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, 
app %s)\n",
-              ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
-
-  /* find the appropriate listener */
-  for (listener = listeners_head;
-       listener != NULL;
-       listener = listener->next)
-  {
-    if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
-         (ntohs (msg->operation) != listener->operation) )
-      continue;
-    mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
-    if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
-    {
-      /* FIXME: disconnect the peer */
-      GNUNET_MQ_discard (mqm);
-      GNUNET_break (0);
-      return;
-    }
-    incoming->accept_id = accept_id++;
-    cmsg->accept_id = htonl (incoming->accept_id);
-    GNUNET_MQ_send (listener->client_mq, mqm);
+    /* FIXME: disconnect the peer */
+    GNUNET_break_op (0);
     return;
   }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-              "set operation request from peer failed: "
-              "no set with matching application ID and operation type\n");
+  incoming->accept_id = accept_id++;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", 
incoming->accept_id);
+  cmsg->accept_id = htonl (incoming->accept_id);
+  cmsg->peer_id = incoming->peer;
+  GNUNET_MQ_send (listener->client_mq, mqm);
 }
 
 
@@ -311,7 +299,7 @@
                       struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *m)
 {
-  struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
+  struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
   struct Set *set;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation 
%u)\n",
@@ -363,7 +351,7 @@
                       struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *m)
 {
-  struct ListenMessage *msg = (struct ListenMessage *) m;
+  struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
   struct Listener *listener;
 
   if (NULL != get_listener (client))
@@ -410,7 +398,7 @@
   switch (set->operation)
   {
     case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_remove ((struct ElementMessage *) m, set);
+      _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
     case GNUNET_SET_OPERATION_INTERSECTION:
       /* FIXME: cfuchs */
       break;
@@ -423,7 +411,39 @@
 }
 
 
+
 /**
+ * Called when the client wants to reject an operation
+ * request from another peer.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_reject (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *m)
+{
+  struct Incoming *incoming;
+  struct GNUNET_SET_AcceptRejectMessage *msg = (struct 
GNUNET_SET_AcceptRejectMessage *) m;
+
+  GNUNET_break (0 == ntohl (msg->request_id));
+
+  incoming = get_incoming (ntohl (msg->accept_reject_id));
+  if (NULL == incoming)
+  {
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
+  destroy_incoming (incoming);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+
+/**
  * Called when a client wants to add an element to a
  * set it inhabits.
  *
@@ -448,7 +468,7 @@
   switch (set->operation)
   {
     case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_add ((struct ElementMessage *) m, set);
+      _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
     case GNUNET_SET_OPERATION_INTERSECTION:
       /* FIXME: cfuchs */
       break;
@@ -490,7 +510,7 @@
       /* FIXME: cfuchs */
       break;
     case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_evaluate ((struct EvaluateMessage *) m, set);
+      _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
       break;
     default:
       GNUNET_assert (0);
@@ -502,23 +522,6 @@
 
 
 /**
- * Handle a cancel request from a client.
- *
- * @param cls unused
- * @param client the client
- * @param m the cancel message
- */
-static void
-handle_client_cancel (void *cls,
-                      struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *m)
-{
-  /* FIXME: implement */
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
  * Handle an ack from a client.
  *
  * @param cls unused
@@ -550,25 +553,20 @@
 {
   struct Set *set;
   struct Incoming *incoming;
-  struct AcceptMessage *msg = (struct AcceptMessage *) mh;
+  struct GNUNET_SET_AcceptRejectMessage *msg = (struct 
GNUNET_SET_AcceptRejectMessage *) mh;
 
+  incoming = get_incoming (ntohl (msg->accept_reject_id));
 
-  incoming = get_incoming (ntohl (msg->accept_id));
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl 
(msg->accept_reject_id));
 
   if (NULL == incoming)
   {
+
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
-  if (0 == ntohl (msg->request_id))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
-    destroy_incoming (incoming);
-    GNUNET_SERVER_receive_done (client, GNUNET_OK);
-    return;
-  }
 
   set = get_set (client);
 
@@ -687,14 +685,14 @@
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
+    {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
+    {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
     {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+    {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
     {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
-    {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+    {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
     {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
-    {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
-    {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
-    {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
-    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
     {NULL, NULL, 0, 0}
   };
 
@@ -705,6 +703,8 @@
   stream_listen_socket = GNUNET_STREAM_listen (cfg, 
GNUNET_APPLICATION_TYPE_SET,
                                                &stream_listen_cb, NULL,
                                                GNUNET_STREAM_OPTION_END);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
 }
 
 

Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/gnunet-service-set.h 2013-06-03 10:53:49 UTC (rev 27354)
@@ -217,7 +217,7 @@
  * @parem set the set to evaluate the operation with
  */
 void
-_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
+_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set);
 
 
 /**
@@ -227,7 +227,7 @@
  * @param set set to add the element to
  */
 void
-_GSS_union_add (struct ElementMessage *m, struct Set *set);
+_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
 
 
 /**
@@ -238,7 +238,7 @@
  * @param set set to remove the element from
  */
 void
-_GSS_union_remove (struct ElementMessage *m, struct Set *set);
+_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
 
 
 /**
@@ -258,7 +258,7 @@
  * @param incoming information about the requesting remote peer
  */
 void
-_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
+_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
                    struct Incoming *incoming);
 
 

Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c   2013-06-03 10:51:03 UTC (rev 
27353)
+++ gnunet/src/set/gnunet-service-set_union.c   2013-06-03 10:53:49 UTC (rev 
27354)
@@ -245,8 +245,7 @@
 
 
 /**
- * Information about the element used for 
- * a specific union operation.
+ * Entries in the key-to-element map of the union set.
  */
 struct KeyEntry
 {
@@ -401,11 +400,14 @@
 static void
 destroy_union_operation (struct UnionEvaluateOperation *eo)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
+  
   if (NULL != eo->mq)
   {
     GNUNET_MQ_destroy (eo->mq);
     eo->mq = NULL;
   }
+
   if (NULL != eo->socket)
   {
     GNUNET_STREAM_close (eo->socket);
@@ -433,12 +435,16 @@
     eo->key_to_element = NULL;
   }
 
-
   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
                                eo->set->state.u->ops_tail,
                                eo);
   GNUNET_free (eo);
-  /* FIXME: free and destroy everything else */
+
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
+
+
+  /* FIXME: do a garbage collection of the set generations */
 }
 
 
@@ -452,7 +458,7 @@
 fail_union_operation (struct UnionEvaluateOperation *eo)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct ResultMessage *msg;
+  struct GNUNET_SET_ResultMessage *msg;
 
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -495,20 +501,25 @@
   struct GNUNET_MQ_Message *mqm;
   struct OperationRequestMessage *msg;
 
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
-  if (NULL != eo->context_msg)
-    if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs 
(eo->context_msg->size)))
-    {
-      /* the context message is too large */
-      GNUNET_break (0);
-      GNUNET_SERVER_client_disconnect (eo->set->client);
-      GNUNET_MQ_discard (mqm);
-      return;
-    }
+  mqm = GNUNET_MQ_msg_nested_mh (msg, 
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
+
+  if (NULL == mqm)
+  {
+    /* the context message is too large */
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (eo->set->client);
+    return;
+  }
   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
   msg->app_id = eo->app_id;
   GNUNET_MQ_send (eo->mq, mqm);
 
+  if (NULL != eo->context_msg)
+  {
+    GNUNET_free (eo->context_msg);
+    eo->context_msg = NULL;
+  }
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
 }
 
@@ -537,7 +548,7 @@
   {
     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
     {
-      new_k->next_colliding = old_k;
+      new_k->next_colliding = old_k->next_colliding;
       old_k->next_colliding = new_k;
       return GNUNET_NO;
     }
@@ -568,12 +579,11 @@
   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
                                                       (uint32_t) 
ibf_key.key_val,
                                                       insert_element_iterator, 
k);
+
   /* was the element inserted into a colliding bucket? */
   if (GNUNET_SYSERR == ret)
-  {
-    GNUNET_assert (NULL != k->next_colliding);
     return;
-  }
+
   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) 
ibf_key.key_val, k,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
 }
@@ -781,8 +791,8 @@
  */
 static int
 send_element_iterator (void *cls,
-                      uint32_t key,
-                      void *value)
+                       uint32_t key,
+                       void *value)
 {
   struct SendElementClosure *sec = cls;
   struct IBF_Key ibf_key = sec->ibf_key;
@@ -795,15 +805,18 @@
   {
     const struct GNUNET_SET_Element *const element = &ke->element->element;
     struct GNUNET_MQ_Message *mqm;
+    struct GNUNET_MessageHeader *mh;
 
     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
-    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
+    mqm = GNUNET_MQ_msg_header_extra (mh, element->size, 
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+    if (NULL == mqm)
     {
+      /* element too large */
       GNUNET_break (0);
-      GNUNET_MQ_discard (mqm);
       continue;
     }
+    memcpy (&mh[1], element->data, element->size);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
     GNUNET_MQ_send (eo->mq, mqm);
     ke = ke->next_colliding;
   }
@@ -975,34 +988,42 @@
                      struct GNUNET_SET_Element *element)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct ResultMessage *rm;
+  struct GNUNET_SET_ResultMessage *rm;
 
   GNUNET_assert (0 != eo->request_id);
-  mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  rm->result_status = htons (GNUNET_SET_STATUS_OK);
-  rm->request_id = htonl (eo->request_id);
-  if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
+  mqm = GNUNET_MQ_msg_extra (rm, element->size, 
GNUNET_MESSAGE_TYPE_SET_RESULT);
+  if (NULL == mqm)
   {
     GNUNET_MQ_discard (mqm);
     GNUNET_break (0);
     return;
   }
-
+  rm->result_status = htons (GNUNET_SET_STATUS_OK);
+  rm->request_id = htonl (eo->request_id);
+  memcpy (&rm[1], element->data, element->size);
   GNUNET_MQ_send (eo->set->client_mq, mqm);
 }
 
 
 /**
- * Callback used for notifications
+ * Completion callback for shutdown
  *
- * @param cls closure
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ *          SHUT_RDWR) 
  */
-static void
-client_done_sent_cb (void *cls)
+/*
+static void 
+stream_shutdown_cb (void *cls,
+                    int operation)
 {
   //struct UnionEvaluateOperation *eo = cls;
-  /* FIXME: destroy eo */
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
+
+  // destroy_union_operation (eo);
 }
+*/
 
 
 /**
@@ -1018,16 +1039,15 @@
 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct ResultMessage *rm;
+  struct GNUNET_SET_ResultMessage *rm;
 
   GNUNET_assert (0 != eo->request_id);
   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
   rm->request_id = htonl (eo->request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
-  GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
   GNUNET_MQ_send (eo->set->client_mq, mqm);
 
-  /* FIXME: destroy the eo */
+  // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
 }
 
 
@@ -1199,18 +1219,25 @@
  * @parem set the set to evaluate the operation with
  */
 void
-_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
+_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
 {
   struct UnionEvaluateOperation *eo;
+  struct GNUNET_MessageHeader *context_msg;
 
   eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->peer = m->peer;
+  eo->peer = m->target_peer;
   eo->set = set;
   eo->request_id = htonl (m->request_id);
   GNUNET_assert (0 != eo->request_id);
   eo->se = strata_estimator_dup (set->state.u->se);
   eo->salt = ntohs (m->salt);
   eo->app_id = m->app_id;
+  
+  context_msg = GNUNET_MQ_extract_nested_mh (m);
+  if (NULL != context_msg)
+  {
+    eo->context_msg = GNUNET_copy_message (context_msg);
+  }
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
              "evaluating union operation, (app %s)\n", 
@@ -1235,7 +1262,7 @@
  * @param incoming information about the requesting remote peer
  */
 void
-_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
+_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
                    struct Incoming *incoming)
 {
   struct UnionEvaluateOperation *eo;
@@ -1250,7 +1277,6 @@
   GNUNET_assert (0 != ntohl (m->request_id));
   eo->request_id = ntohl (m->request_id);
   eo->se = strata_estimator_dup (set->state.u->se);
-  eo->set = set; // FIXME: redundant!?
   eo->mq = incoming->mq;
   /* transfer ownership of mq and socket from incoming to eo */
   incoming->mq = NULL;
@@ -1299,7 +1325,7 @@
  * @param set set to add the element to
  */
 void
-_GSS_union_add (struct ElementMessage *m, struct Set *set)
+_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
 {
   struct ElementEntry *ee;
   struct ElementEntry *ee_dup;
@@ -1357,7 +1383,9 @@
   destroy_elements (set->state.u);
 
   while (NULL != set->state.u->ops_head)
+  {
     destroy_union_operation (set->state.u->ops_head);
+  }
 }
 
 /**
@@ -1368,7 +1396,7 @@
  * @param set set to remove the element from
  */
 void
-_GSS_union_remove (struct ElementMessage *m, struct Set *set)
+_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
 {
   struct GNUNET_HashCode hash;
   struct ElementEntry *ee;

Modified: gnunet/src/set/gnunet-set.c
===================================================================
--- gnunet/src/set/gnunet-set.c 2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/gnunet-set.c 2013-06-03 10:53:49 UTC (rev 27354)
@@ -91,11 +91,12 @@
            const struct GNUNET_MessageHeader *context_msg,
            struct GNUNET_SET_Request *request)
 {
+  struct GNUNET_SET_OperationHandle *oh;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
   GNUNET_SET_listen_cancel (listen_handle);
 
-  GNUNET_SET_accept (request, set2, 
-                     GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+  oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, 
NULL);
+  GNUNET_SET_conclude (oh, set2);
 }
 
 
@@ -107,11 +108,14 @@
 static void
 start (void *cls)
 {
+  struct GNUNET_SET_OperationHandle *oh;
+  
   listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
                                      &app_id, listen_cb, NULL);
-  GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
-                       GNUNET_SET_RESULT_ADDED,
-                       result_cb_set1, NULL);
+  oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
+                            GNUNET_SET_RESULT_ADDED,
+                            result_cb_set1, NULL);
+  GNUNET_SET_conclude (oh, set1);
 }
 
 

Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h        2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/set.h        2013-06-03 10:53:49 UTC (rev 27354)
@@ -29,17 +29,12 @@
 #include "platform.h"
 #include "gnunet_common.h"
 
+#define GNUNET_SET_ACK_WINDOW 10
 
-/**
- * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle,
- * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages.
- */
-#define GNUNET_SET_ACK_WINDOW 8
 
-
 GNUNET_NETWORK_STRUCT_BEGIN
 
-struct SetCreateMessage
+struct GNUNET_SET_CreateMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_CREATE
@@ -54,7 +49,7 @@
 };
 
 
-struct ListenMessage
+struct GNUNET_SET_ListenMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN
@@ -74,32 +69,31 @@
 };
 
 
-struct AcceptMessage
+struct GNUNET_SET_AcceptRejectMessage
 {
   /**
-   * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT
+   * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or
+   *       GNUNET_MESSAGE_TYPE_SET_REJECT
    */
   struct GNUNET_MessageHeader header;
 
   /**
-   * Request id that will be sent along with
-   * results for the accepted operation.
-   * Chosen by the client.
-   * Must be 0 if the request has been rejected.
+   * ID of the incoming request we want to accept / reject.
    */
-  uint32_t request_id GNUNET_PACKED;
+  uint32_t accept_reject_id GNUNET_PACKED;
 
   /**
-   * ID of the incoming request we want to accept / reject.
+   * Request ID to identify responses,
+   * must be 0 if we don't accept the request.
    */
-  uint32_t accept_id GNUNET_PACKED;
+  uint32_t request_id GNUNET_PACKED;
 };
 
 
 /**
  * A request for an operation with another client.
  */
-struct RequestMessage
+struct GNUNET_SET_RequestMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_Request.
@@ -107,21 +101,21 @@
   struct GNUNET_MessageHeader header;
 
   /**
-   * ID of the request we want to accept,
-   * chosen by the service.
+   * Identity of the requesting peer.
    */
-  uint32_t accept_id GNUNET_PACKED;
+  struct GNUNET_PeerIdentity peer_id;
 
   /**
-   * Identity of the requesting peer.
+   * ID of the to identify the request when accepting or
+   * rejecting it.
    */
-  struct GNUNET_PeerIdentity peer_id;
+  uint32_t accept_id GNUNET_PACKED;
 
   /* rest: nested context message */
 };
 
 
-struct EvaluateMessage
+struct GNUNET_SET_EvaluateMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE
@@ -136,7 +130,7 @@
   /**
    * Peer to evaluate the operation with
    */
-  struct GNUNET_PeerIdentity peer;
+  struct GNUNET_PeerIdentity target_peer;
 
   /**
    * Application id
@@ -157,7 +151,7 @@
 };
 
 
-struct ResultMessage
+struct GNUNET_SET_ResultMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_RESULT
@@ -184,7 +178,7 @@
 };
 
 
-struct ElementMessage
+struct GNUNET_SET_ElementMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_SET_ADD or
@@ -200,20 +194,6 @@
 };
 
 
-struct CancelMessage
-{
-  /**
-   * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * id we want to cancel result belongs to
-   */
-  uint32_t request_id GNUNET_PACKED;
-};
-
-
 GNUNET_NETWORK_STRUCT_END
 
 #endif

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/set_api.c    2013-06-03 10:53:49 UTC (rev 27354)
@@ -33,6 +33,7 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
 
+
 /**
  * Opaque handle to a set.
  */
@@ -52,13 +53,33 @@
   int accepted;
 };
 
-
 struct GNUNET_SET_OperationHandle
 {
   GNUNET_SET_ResultIterator result_cb;
   void *result_cls;
+
+  /**
+   * Local set used for the operation,
+   * NULL if no set has been provided by conclude yet.
+   */
   struct GNUNET_SET_Handle *set;
+
+  /**
+   * Request ID to identify the operation within the set.
+   */
   uint32_t request_id;
+
+  /**
+   * Message sent to the server on calling conclude,
+   * NULL if conclude has been called.
+   */
+  struct GNUNET_MQ_Message *conclude_mqm;
+
+  /**
+   * Address of the request if in the conclude message,
+   * used to patch the request id into the message when the set is known.
+   */
+  uint32_t *request_id_addr;
 };
 
 
@@ -83,18 +104,21 @@
 static void
 handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct ResultMessage *msg = (struct ResultMessage *) mh;
+  struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) 
mh;
   struct GNUNET_SET_Handle *set = cls;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_Element e;
 
+
+  GNUNET_assert (NULL != set);
+  GNUNET_assert (NULL != set->mq);
+
   if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
   {
     struct GNUNET_MQ_Message *mqm;
     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
     GNUNET_MQ_send (set->mq, mqm);
   }
-
   oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
   GNUNET_assert (NULL != oh);
   /* status is not STATUS_OK => there's no attached element,
@@ -109,7 +133,7 @@
   }
 
   e.data = &msg[1];
-  e.size = ntohs (mh->size) - sizeof (struct ResultMessage);
+  e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
   e.type = msg->element_type;
   if (NULL != oh->result_cb)
     oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
@@ -124,28 +148,34 @@
 static void
 handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct RequestMessage *msg = (struct RequestMessage *) mh;
+  struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) 
mh;
   struct GNUNET_SET_ListenHandle *lh = cls;
   struct GNUNET_SET_Request *req;
+  struct GNUNET_MessageHeader *context_msg;
 
+  LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
   req = GNUNET_new (struct GNUNET_SET_Request);
   req->accept_id = ntohl (msg->accept_id);
+  context_msg = GNUNET_MQ_extract_nested_mh (msg);
   /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
-  lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
+  lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
 
   if (GNUNET_NO == req->accepted)
   {
     struct GNUNET_MQ_Message *mqm;
-    struct AcceptMessage *amsg;
-    
-    mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+    struct GNUNET_SET_AcceptRejectMessage *amsg;
+
+    mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
     /* no request id, as we refused */
     amsg->request_id = htonl (0);
-    amsg->accept_id = msg->accept_id;
+    amsg->accept_reject_id = msg->accept_id;
     GNUNET_MQ_send (lh->mq, mqm);
     GNUNET_free (req);
+    LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
   }
 
+  LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
+
   /* the accept-case is handled in GNUNET_SET_accept,
    * as we have the accept message available there */
 }
@@ -168,7 +198,7 @@
 {
   struct GNUNET_SET_Handle *set;
   struct GNUNET_MQ_Message *mqm;
-  struct SetCreateMessage *msg;
+  struct GNUNET_SET_CreateMessage *msg;
   static const struct GNUNET_MQ_Handler mq_handlers[] = {
     {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
     GNUNET_MQ_HANDLERS_END
@@ -179,6 +209,7 @@
   LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
   GNUNET_assert (NULL != set->client);
   set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, 
set);
+  GNUNET_assert (NULL != set->mq);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
   msg->operation = htons (op);
   GNUNET_MQ_send (set->mq, mqm);
@@ -204,7 +235,7 @@
                         void *cont_cls)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct ElementMessage *msg;
+  struct GNUNET_SET_ElementMessage *msg;
 
   mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
   msg->element_type = element->type;
@@ -232,7 +263,7 @@
                            void *cont_cls)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct ElementMessage *msg;
+  struct GNUNET_SET_ElementMessage *msg;
 
   mqm = GNUNET_MQ_msg_extra (msg, element->size, 
GNUNET_MESSAGE_TYPE_SET_REMOVE);
   msg->element_type = element->type;
@@ -256,10 +287,10 @@
 
 
 /**
- * Evaluate a set operation with our set and the set of another peer.
+ * Create a set operation for evaluation with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with GNUNET_SET_conclude.
  *
- * @param set set to use
- * @param salt salt for HKDF (explain more here)
  * @param other_peer peer with the other set
  * @param app_id hash for the application using the set
  * @param context_msg additional information for the request
@@ -273,8 +304,7 @@
  * @return a handle to cancel the operation
  */
 struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
-                     const struct GNUNET_PeerIdentity *other_peer,
+GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
                      const struct GNUNET_HashCode *app_id,
                      const struct GNUNET_MessageHeader *context_msg,
                      uint16_t salt,
@@ -283,25 +313,25 @@
                      void *result_cls)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct EvaluateMessage *msg;
   struct GNUNET_SET_OperationHandle *oh;
+  struct GNUNET_SET_EvaluateMessage *msg;
 
   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
   oh->result_cb = result_cb;
   oh->result_cls = result_cls;
-  oh->set = set;
 
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE);
-  msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
-  msg->peer = *other_peer;
-  msg->app_id = *app_id;
-  
+  mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 
context_msg);
+
   if (NULL != context_msg)
-    if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs 
(context_msg->size)))
-      GNUNET_assert (0);
-  
-  GNUNET_MQ_send (set->mq, mqm);
+    LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
 
+  msg->app_id = *app_id;
+  msg->target_peer = *other_peer;
+  msg->salt = salt;
+  msg->reserved = 0;
+  oh->conclude_mqm = mqm;
+  oh->request_id_addr = &msg->request_id;
+
   return oh;
 }
 
@@ -327,7 +357,7 @@
 {
   struct GNUNET_SET_ListenHandle *lh;
   struct GNUNET_MQ_Message *mqm;
-  struct ListenMessage *msg;
+  struct GNUNET_SET_ListenMessage *msg;
   static const struct GNUNET_MQ_Handler mq_handlers[] = {
     {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
     GNUNET_MQ_HANDLERS_END
@@ -363,10 +393,13 @@
 
 
 /**
- * Accept a request we got via GNUNET_SET_listen.
+ * Accept a request we got via GNUNET_SET_listen.  Must be called during
+ * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
+ * afterwards.
+ * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer. 
  *
  * @param request request to accept
- * @param set set used for the requested operation 
  * @param result_mode specified how results will be returned,
  *        see 'GNUNET_SET_ResultMode'.
  * @param result_cb callback for the results
@@ -375,29 +408,27 @@
  */
 struct GNUNET_SET_OperationHandle *
 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
-                   struct GNUNET_SET_Handle *set,
                    enum GNUNET_SET_ResultMode result_mode,
                    GNUNET_SET_ResultIterator result_cb,
-                   void *result_cls)
+                   void *cls)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct AcceptMessage *msg;
   struct GNUNET_SET_OperationHandle *oh;
+  struct GNUNET_SET_AcceptRejectMessage *msg;
 
-  /* don't accept a request twice! */
   GNUNET_assert (GNUNET_NO == request->accepted);
   request->accepted = GNUNET_YES;
 
   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
   oh->result_cb = result_cb;
-  oh->result_cls = result_cls;
-  oh->set = set;
+  oh->result_cls = cls;
 
-  mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
-  msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh));
-  msg->accept_id = htonl (request->accept_id);
-  GNUNET_MQ_send (set->mq, mqm);
+  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+  msg->accept_reject_id = htonl (request->accept_id);
 
+  oh->conclude_mqm = mqm;
+  oh->request_id_addr = &msg->request_id;
+
   return oh;
 }
 
@@ -413,10 +444,43 @@
   struct GNUNET_MQ_Message *mqm;
   struct GNUNET_SET_OperationHandle *h_assoc;
 
-  h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
-  GNUNET_assert (h_assoc == oh);
-  mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
-  GNUNET_MQ_send (oh->set->mq, mqm);
+  if (NULL != oh->set)
+  {
+    h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
+    GNUNET_assert (h_assoc == oh);
+    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+    GNUNET_MQ_send (oh->set->mq, mqm);
+  }
+
+  if (NULL != oh->conclude_mqm)
+    GNUNET_MQ_discard (oh->conclude_mqm);
+
   GNUNET_free (oh);
 }
 
+
+/**
+ * Conclude the given set operation using the given set. 
+ * This function is called once we have fully constructed
+ * the set that we want to use for the operation.  At this
+ * time, the P2P protocol can then begin to exchange the
+ * set information and call the result callback with the
+ * result information.
+ *
+ * @param oh handle to the set operation 
+ * @param set the set to use for the operation
+ */
+void
+GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
+                    struct GNUNET_SET_Handle *set)
+{
+  GNUNET_assert (NULL == oh->set);
+  GNUNET_assert (NULL != oh->conclude_mqm);
+  oh->set = set;
+  oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
+  *oh->request_id_addr = htonl (oh->request_id);
+  GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
+  oh->conclude_mqm = NULL;
+  oh->request_id_addr = NULL;
+}
+

Modified: gnunet/src/set/test_set.conf
===================================================================
--- gnunet/src/set/test_set.conf        2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/test_set.conf        2013-06-03 10:53:49 UTC (rev 27354)
@@ -8,8 +8,8 @@
 HOSTNAME = localhost
 HOME = $SERVICEHOME
 BINARY = gnunet-service-set
-#PREFIX = gdbserver :12345
 #PREFIX = valgrind --leak-check=full
+#PREFIX = gdbserver :1234
 ACCEPT_FROM = 127.0.0.1;
 ACCEPT_FROM6 = ::1;
 UNIXPATH = /tmp/gnunet-service-set.sock

Modified: gnunet/src/set/test_set_api.c
===================================================================
--- gnunet/src/set/test_set_api.c       2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/set/test_set_api.c       2013-06-03 10:53:49 UTC (rev 27354)
@@ -20,7 +20,7 @@
 
 /**
  * @file set/test_set_api.c
- * @brief testcase for consensus_api.c
+ * @brief testcase for set_api.c
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -89,11 +89,13 @@
            const struct GNUNET_MessageHeader *context_msg,
            struct GNUNET_SET_Request *request)
 {
+  struct GNUNET_SET_OperationHandle *oh;
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
   GNUNET_SET_listen_cancel (listen_handle);
 
-  GNUNET_SET_accept (request, set2, 
-                     GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+  oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, 
NULL);
+  GNUNET_SET_conclude (oh, set2);
 }
 
 
@@ -105,11 +107,14 @@
 static void
 start (void *cls)
 {
+  struct GNUNET_SET_OperationHandle *oh;
+
   listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
                                      &app_id, listen_cb, NULL);
-  GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
-                       GNUNET_SET_RESULT_ADDED,
-                       result_cb_set1, NULL);
+  oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
+                            GNUNET_SET_RESULT_ADDED,
+                            result_cb_set1, NULL);
+  GNUNET_SET_conclude (oh, set1);
 }
 
 
@@ -168,12 +173,14 @@
      struct GNUNET_TESTING_Peer *peer)
 {
 
-  static const char* app_str = "gnunet-set";
-
   config = cfg;
+  GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
+  printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey));
   GNUNET_TESTING_peer_get_identity (peer, &local_id);
+  printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey));
   set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
   set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
   init_set1 ();
 }
 

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/stream/stream_api.c      2013-06-03 10:53:49 UTC (rev 27354)
@@ -3785,7 +3785,29 @@
   struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
   struct GNUNET_MQ_Message *mqm;
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
+  switch (status)
+  {
+    case GNUNET_STREAM_OK:
+      break;
+    case GNUNET_STREAM_SHUTDOWN:
+      /* FIXME: call shutdown handler */
+      return;
+    case GNUNET_STREAM_TIMEOUT:
+      if (NULL == mq->error_handler)
+        LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler 
installed for message queue\n");
+      else
+        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+      return;
+    case GNUNET_STREAM_SYSERR:
+      if (NULL == mq->error_handler)
+        LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler 
installed for message queue\n");
+      else
+        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
+      return;
+    default:
+      GNUNET_assert (0);
+      return;
+  }
   
   /* call cb for message we finished sending */
   mqm = mq->current_msg;
@@ -3863,21 +3885,53 @@
  */
 static size_t
 mq_stream_data_processor (void *cls,
-                       enum GNUNET_STREAM_Status status,
-                       const void *data,
-                       size_t size)
+                          enum GNUNET_STREAM_Status status,
+                          const void *data,
+                          size_t size)
 {
   struct GNUNET_MQ_MessageQueue *mq = cls;
   struct MQStreamState *mss;
   int ret;
+
+  switch (status)
+  {
+    case GNUNET_STREAM_OK:
+      break;
+    case GNUNET_STREAM_SHUTDOWN:
+      /* FIXME: call shutdown handler */
+      return 0;
+    case GNUNET_STREAM_TIMEOUT:
+      if (NULL == mq->error_handler)
+        LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler 
installed for message queue\n");
+      else
+        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+      return 0;
+    case GNUNET_STREAM_SYSERR:
+      if (NULL == mq->error_handler)
+        LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler 
installed for message queue\n");
+      else
+        mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+      return 0;
+    default:
+      GNUNET_assert (0);
+      return 0;
+  }
   
   mss = (struct MQStreamState *) mq->impl_state;
   GNUNET_assert (GNUNET_STREAM_OK == status);
   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, 
GNUNET_NO);
-  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_OK != ret)
+  {
+    if (NULL == mq->error_handler)
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "read error (message stream malformed), but no error handler 
installed for message queue\n");
+    else
+      mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+    return 0;
+  }
   /* we always read all data */
-    mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
-                                  mq_stream_data_processor, mq);
+  mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
+                                mq_stream_data_processor, mq);
   return size;
 }
 
@@ -3935,6 +3989,7 @@
   mq->destroy_impl = mq_stream_destroy_impl;
   mq->handlers = msg_handlers;
   mq->handlers_cls = cls;
+  mq->error_handler = error_handler;
   if (NULL != msg_handlers)
   {
     mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);

Modified: gnunet/src/util/crypto_hash.c
===================================================================
--- gnunet/src/util/crypto_hash.c       2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/util/crypto_hash.c       2013-06-03 10:53:49 UTC (rev 27354)
@@ -339,7 +339,7 @@
  */
 void
 GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode,
-                                  struct GNUNET_HashCode * result)
+                                  struct GNUNET_HashCode *result)
 {
   int i;
 

Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c        2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/util/mq.c        2013-06-03 10:53:49 UTC (rev 27354)
@@ -119,33 +119,31 @@
 }
 
 
-int
-GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
-                 const void *data, uint16_t len)
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t 
base_size, uint16_t type,
+                          const struct GNUNET_MessageHeader *nested_mh)
 {
-  size_t new_size;
-  size_t old_size;
+  struct GNUNET_MQ_Message *mqm;
+  uint16_t size;
 
-  GNUNET_assert (NULL != mqmp);
-  /* there's no data to append => do nothing */
-  if (NULL == data)
-    return GNUNET_OK;
-  old_size = ntohs ((*mqmp)->mh->size);
-  /* message too large to concatenate? */
-  if (((uint16_t) (old_size + len)) < len)
-    return GNUNET_SYSERR;
-  new_size = old_size + len;
-  *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
-  (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
-  memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
-  (*mqmp)->mh->size = htons (new_size);
-  return GNUNET_OK;
-}
+  if (NULL == nested_mh)
+    return GNUNET_MQ_msg_ (mhp, base_size, type);
 
+  size = base_size + ntohs (nested_mh->size);
 
+  /* check for uint16_t overflow */
+  if (size < base_size)
+    return NULL;
 
+  mqm = GNUNET_MQ_msg_ (mhp, size, type);
+  memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
 
-/*** Transmit a queued message to the session's client.
+  return mqm;
+}
+
+
+/**
+ * Transmit a queued message to the session's client.
  *
  * @param cls consensus session
  * @param size number of bytes available in buf
@@ -265,7 +263,8 @@
   {
     if (NULL == mq->error_handler)
       LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler 
installed)\n");
-    mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+    else
+      mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
     return;
   }
 
@@ -479,3 +478,39 @@
   GNUNET_free (mq);
 }
 
+
+
+
+struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t 
base_size)
+{
+  uint16_t whole_size;
+  uint16_t nested_size;
+  struct GNUNET_MessageHeader *nested_msg;
+
+  whole_size = ntohs (mh->size);
+  GNUNET_assert (whole_size >= base_size);
+
+  nested_size = whole_size - base_size;
+
+  if (0 == nested_size)
+    return NULL;
+
+  if (nested_size < sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return NULL;
+  }
+
+  nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
+
+  if (ntohs (nested_msg->size) != nested_size)
+  {
+    GNUNET_break_op (0);
+    nested_msg->size = htons (nested_size);
+  }
+
+  return nested_msg;
+}
+
+

Modified: gnunet/src/util/test_mq.c
===================================================================
--- gnunet/src/util/test_mq.c   2013-06-03 10:51:03 UTC (rev 27353)
+++ gnunet/src/util/test_mq.c   2013-06-03 10:53:49 UTC (rev 27354)
@@ -58,35 +58,6 @@
 test2 (void)
 {
   struct GNUNET_MQ_Message *mqm;
-  struct MyMessage *mm;
-  int res;
-  char *s = "foo";
-
-  mqm = GNUNET_MQ_msg (mm, 42);
-  res = GNUNET_MQ_nest (mqm, s, strlen(s));
-  GNUNET_assert (GNUNET_OK == res);
-  res = GNUNET_MQ_nest (mqm, s, strlen(s));
-  GNUNET_assert (GNUNET_OK == res);
-  res = GNUNET_MQ_nest (mqm, NULL, 0);
-  GNUNET_assert (GNUNET_OK == res);
-
-  GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs 
(mm->header.size));
-
-  res = GNUNET_MQ_nest_mh (mqm, &mm->header);
-  GNUNET_assert (GNUNET_OK == res);
-  GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs 
(mm->header.size));
-
-  res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
-  GNUNET_assert (GNUNET_OK != res);
-
-  GNUNET_MQ_discard (mqm);
-}
-
-
-void
-test3 (void)
-{
-  struct GNUNET_MQ_Message *mqm;
   struct GNUNET_MessageHeader *mh;
 
   mqm = GNUNET_MQ_msg_header (42);
@@ -107,7 +78,6 @@
   GNUNET_log_setup ("test-mq", "INFO", NULL);
   test1 ();
   test2 ();
-  test3 ();
 
   return 0;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]