gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25806 - gnunet/src/consensus


From: gnunet
Subject: [GNUnet-SVN] r25806 - gnunet/src/consensus
Date: Thu, 17 Jan 2013 01:53:11 +0100

Author: dold
Date: 2013-01-17 01:53:11 +0100 (Thu, 17 Jan 2013)
New Revision: 25806

Added:
   gnunet/src/consensus/consensus_protocol.h
Modified:
   gnunet/src/consensus/Makefile.am
   gnunet/src/consensus/consensus.h
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/gnunet-consensus-start-peers.c
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/ibf.c
   gnunet/src/consensus/ibf.h
   gnunet/src/consensus/test_consensus.conf
   gnunet/src/consensus/test_consensus_api.c
Log:
- gnunet-consensus now profiling tool
- work on service implementation, not working yet


Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am    2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/Makefile.am    2013-01-17 00:53:11 UTC (rev 25806)
@@ -31,6 +31,7 @@
 gnunet_consensus_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/consensus/libgnunetconsensus.la \
+  $(top_builddir)/src/testbed/libgnunettestbed.la \
   $(GN_LIBINTL)
 gnunet_consensus_DEPENDENCIES = \
   libgnunetconsensus.la
@@ -53,10 +54,12 @@
   $(GN_LIBINTL)
 
 gnunet_service_consensus_SOURCES = \
- gnunet-service-consensus.c
+ gnunet-service-consensus.c \
+ ibf.c
 gnunet_service_consensus_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/core/libgnunetcore.la \
+  $(top_builddir)/src/stream/libgnunetstream.la \
   $(top_builddir)/src/mesh/libgnunetmesh.la \
   $(GN_LIBINTL)
 

Modified: gnunet/src/consensus/consensus.h
===================================================================
--- gnunet/src/consensus/consensus.h    2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/consensus.h    2013-01-17 00:53:11 UTC (rev 25806)
@@ -71,6 +71,10 @@
    */
   struct GNUNET_MessageHeader header;
 
+  uint32_t group_id;
+
+  uint32_t num_elements;
+
   uint16_t num_peers;
 
   /** PeerIdentity[num_peers] */

Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-01-16 17:21:37 UTC (rev 
25805)
+++ gnunet/src/consensus/consensus_api.c        2013-01-17 00:53:11 UTC (rev 
25806)
@@ -54,11 +54,6 @@
   struct GNUNET_MessageHeader *msg;
 
   /**
-   * Size of the message in msg.
-   */
-  size_t size;
-
-  /**
    * Will be called after transmit, if not NULL
    */
   GNUNET_CONSENSUS_InsertDoneCallback idc;
@@ -154,7 +149,7 @@
  * @param consensus consensus handle
  */
 static void
-schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
+send_next (struct GNUNET_CONSENSUS_Handle *consensus);
 
 
 /**
@@ -168,16 +163,17 @@
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
-static size_t transmit_queued (void *cls, size_t size,
-                               void *buf)
+static size_t
+transmit_queued (void *cls, size_t size,
+                 void *buf)
 {
   struct GNUNET_CONSENSUS_Handle *consensus;
   struct QueuedMessage *qmsg;
-  size_t ret_size;
+  size_t msg_size;
 
-  printf("transmitting queued\n");
+  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+  consensus->th = NULL;
 
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
   qmsg = consensus->messages_head;
   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, 
consensus->messages_tail, qmsg);
   GNUNET_assert (qmsg);
@@ -188,10 +184,14 @@
     {
       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
     }
+    return 0;
   }
 
-  memcpy (buf, qmsg->msg, qmsg->size);
-  ret_size = qmsg->size;
+  msg_size = ntohs (qmsg->msg->size);
+
+  GNUNET_assert (size >= msg_size);
+
+  memcpy (buf, qmsg->msg, msg_size);
   if (NULL != qmsg->idc)
   {
     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
@@ -199,9 +199,9 @@
   GNUNET_free (qmsg->msg);
   GNUNET_free (qmsg);
 
-  schedule_transmit (consensus);
+  send_next (consensus);
 
-  return ret_size;
+  return msg_size;
 }
 
 
@@ -211,7 +211,7 @@
  * @param consensus consensus handle
  */
 static void
-schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
+send_next (struct GNUNET_CONSENSUS_Handle *consensus)
 {
   if (NULL != consensus->th)
     return;
@@ -219,9 +219,10 @@
   if (NULL != consensus->messages_head)
   {
     LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
-    GNUNET_CLIENT_notify_transmit_ready (consensus->client, 
consensus->messages_head->size, 
-                                         GNUNET_TIME_UNIT_FOREVER_REL,
-                                         GNUNET_NO, &transmit_queued, 
consensus);
+    consensus->th = 
+        GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs 
(consensus->messages_head->msg->size),
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             GNUNET_NO, &transmit_queued, 
consensus);
   }
 }
 
@@ -270,8 +271,7 @@
                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
 {
   GNUNET_assert (NULL != consensus->conclude_cb);
-  consensus->conclude_cb (consensus->conclude_cls,
-                         0, NULL);
+  consensus->conclude_cb (consensus->conclude_cls, NULL);
   consensus->conclude_cb = NULL;
 }
 
@@ -356,7 +356,7 @@
            consensus->peers,
            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
-  schedule_transmit (consensus);
+  send_next (consensus);
 
   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
                          GNUNET_TIME_UNIT_FOREVER_REL);
@@ -454,13 +454,12 @@
 
   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
-  qmsg->size = element_msg_size;
   qmsg->idc = idc;
   qmsg->idc_cls = idc_cls;
 
   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
 
-  schedule_transmit (consensus);
+  send_next (consensus);
 }
 
 
@@ -500,11 +499,10 @@
 
   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
-  qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
 
   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qmsg);
 
-  schedule_transmit (consensus);
+  send_next (consensus);
 }
 
 

Added: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h                           (rev 0)
+++ gnunet/src/consensus/consensus_protocol.h   2013-01-17 00:53:11 UTC (rev 
25806)
@@ -0,0 +1,71 @@
+/*
+      This file is part of GNUnet
+      (C) 2012 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+      Boston, MA 02111-1307, USA.
+*/
+
+
+/**
+ * @file consensus/consensus_protocol.h
+ * @brief p2p message definitions for consensus
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_PROTOCOL_H
+#define GNUNET_CONSENSUS_PROTOCOL_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct StrataMessage
+{
+  struct GNUNET_MessageHeader header;
+  /**
+   * Number of strata in this estimator.
+   */
+  uint16_t num_strata;
+  /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */
+  /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */
+  /* uint8_t count_buckets[ibf_size*num_strata] */
+};
+
+struct DifferenceDigest
+{
+
+  struct GNUNET_MessageHeader header;
+};
+
+struct Element
+{
+  struct GNUNET_MessageHeader header;
+};
+
+struct ConsensusHello
+{
+  struct GNUNET_MessageHeader header;
+  struct GNUNET_HashCode global_id;
+  uint8_t round;
+};
+
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif

Modified: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-16 17:21:37 UTC 
(rev 25805)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-17 00:53:11 UTC 
(rev 25806)
@@ -147,9 +147,6 @@
                                   NULL,
                                   test_master,
                                   NULL);
-
-
-  printf("hello there!\n");
 }
 
 

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-01-16 17:21:37 UTC (rev 
25805)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-01-17 00:53:11 UTC (rev 
25806)
@@ -20,224 +20,308 @@
 
 /**
  * @file consensus/gnunet-consensus.c
- * @brief 
+ * @brief profiling tool for gnunet-consensus
  * @author Florian Dold
  */
 #include "platform.h"
+#include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_consensus_service.h"
+#include "gnunet_testbed_service.h"
 
+static unsigned int num_peers = 2;
 
+static unsigned int replication = 1;
 
-/**
- * Handle to the consensus service
- */
-static struct GNUNET_CONSENSUS_Handle *consensus;
-/**
- * Session id
- */
-static char *session_id_str;
+static unsigned int num_values = 5;
 
-/**
- * File handle to STDIN
- */
-static struct GNUNET_DISK_FileHandle *stdin_fh;
+static struct GNUNET_TIME_Relative conclude_timeout;
 
+static struct GNUNET_CONSENSUS_Handle **consensus_handles;
+
+static unsigned int num_connected_handles;
+
+static struct GNUNET_TESTBED_Peer **peers;
+
+static struct GNUNET_PeerIdentity *peer_ids;
+
+static unsigned int num_retrieved_peer_ids;
+
+static struct GNUNET_HashCode session_id;
+
+
 /**
- * Task for reading from stdin
+ * Signature of the event handler function called by the
+ * respective event controller.
+ *
+ * @param cls closure
+ * @param event information about the event
  */
-static GNUNET_SCHEDULER_TaskIdentifier stdin_tid = GNUNET_SCHEDULER_NO_TASK;
-
-
 static void
-stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+controller_cb(void *cls,
+              const struct GNUNET_TESTBED_EventInformation *event)
+{
+  GNUNET_assert (0);
+}
 
 
 /**
  * Called when a conclusion was successful.
  *
  * @param cls
- * @param num_peers_in_consensus
- * @param peers_in_consensus
+ * @param group
+ * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if 
not
  */
+static int
+conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group)
+{
+  return GNUNET_NO;
+}
+
+
+
 static void
-conclude_cb (void *cls, 
-             unsigned int consensus_group_count,
-             const struct GNUNET_CONSENSUS_Group *groups)
+generate_indices (int *indices)
 {
-  printf("reached conclusion\n");
-  GNUNET_SCHEDULER_shutdown ();
+  int j;
+  j = 0;
+  while (j < replication)
+  {
+    int n;
+    int k;
+    int repeat;
+    n = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+    repeat = GNUNET_NO;
+    for (k = 0; k < j; k++)
+      if (indices[k] == n)
+      {
+        repeat = GNUNET_YES;
+        break;
+      }
+    if (GNUNET_NO == repeat)
+      indices[j++] = n;
+  }
 }
 
 
 static void
-insert_done_cb (void *cls,
-                int success)
+do_consensus ()
 {
-  struct GNUNET_CONSENSUS_Element *element = cls;
+  int unique_indices[replication];
+  int i;
 
-  GNUNET_free (element);
-  if (GNUNET_YES != success)
+  for (i = 0; i < num_values; i++)
   {
-    printf ("insert failed\n");
-    GNUNET_SCHEDULER_shutdown ();
-    return;
+    int j;
+    struct GNUNET_HashCode *val;
+    struct GNUNET_CONSENSUS_Element *element;
+    generate_indices(unique_indices);
+
+    val = GNUNET_malloc (sizeof *val);
+    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, val);
+
+    element = GNUNET_malloc (sizeof *element);
+    element->data = val;
+    element->size = sizeof *val;
+
+    for (j = 0; j < replication; j++)
+    {
+      int cid;
+      cid = unique_indices[j];
+      GNUNET_CONSENSUS_insert (consensus_handles[cid], element, NULL, NULL);
+    }
   }
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == stdin_tid);
-  stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, 
stdin_fh,
-                                             &stdin_cb, NULL);    
+
+  for (i = 0; i < num_peers; i++)
+    GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0, 
conclude_cb, consensus_handles[i]);
 }
 
 
 /**
- * Called whenever we can read stdin non-blocking 
+ * Callback to be called when a service connect operation is completed
  *
- * @param cls unused
- * @param tc scheduler context 
+ * @param cls the callback closure from functions generating an operation
+ * @param op the operation that has been finished
+ * @param ca_result the service handle returned from 
GNUNET_TESTBED_ConnectAdapter()
+ * @param emsg error message in case the operation has failed; will be NULL if
+ *          operation has executed successfully.
  */
 static void
-stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+connect_complete (void *cls,
+                  struct GNUNET_TESTBED_Operation *op,
+                  void *ca_result,
+                  const char *emsg)
 {
-  char buf[1024];
-  char *ret;
-  struct GNUNET_CONSENSUS_Element *element;
+  struct GNUNET_CONSENSUS_Handle **chp;
 
-  stdin_tid = GNUNET_SCHEDULER_NO_TASK;
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return; /* we're done here */
-  ret = fgets (buf, 1024, stdin);
-  if (NULL == ret)
+  if (NULL != emsg)
   {
-    if (feof (stdin))
-    {
-      printf ("concluding ...\n");
-      GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, 
conclude_cb, NULL);
-    }
-    return;
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "testbed connect emsg: %s\n", emsg);
+    GNUNET_assert (0);
   }
 
-  printf("read: %s", buf);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect complete\n");
 
-  element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + 
strlen(buf) + 1);
-  element->type = 0;
-  element->size = strlen(buf) + 1;
-  element->data = &element[1];
-  strcpy ((char *) &element[1], buf);
-  GNUNET_CONSENSUS_insert (consensus, element, &insert_done_cb, element); 
+  chp = (struct GNUNET_CONSENSUS_Handle **) cls;
+  *chp = (struct GNUNET_CONSENSUS_Handle *) ca_result;
+  num_connected_handles++;
+
+  if (num_connected_handles == num_peers)
+  {
+    do_consensus ();
+  }
 }
 
 
+static int
+new_element_cb (void *cls,
+                struct GNUNET_CONSENSUS_Element *element)
+{
+  return GNUNET_YES;
+}
+
+
 /**
- * Called when a new element was received from another peer, or an error 
occured.
+ * Adapter function called to establish a connection to
+ * a service.
  *
- * May deliver duplicate values.
- *
- * Elements given to a consensus operation by the local peer are NOT given
- * to this callback.
- *
  * @param cls closure
- * @param element new element, NULL on error
- * @return GNUNET_OK if the valid is well-formed and should be added to the 
consensus,
- *         GNUNET_SYSERR if the element should be ignored and not be propagated
+ * @param cfg configuration of the peer to connect to; will be available until
+ *          GNUNET_TESTBED_operation_done() is called on the operation returned
+ *          from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
  */
-static int
-cb (void *cls,
-    struct GNUNET_CONSENSUS_Element *element)
+static void *
+connect_adapter (void *cls,
+                 const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  if (NULL == element)
-  {
-    printf("error receiving from consensus\n");
-    GNUNET_SCHEDULER_shutdown ();
-    return GNUNET_NO;
-  }
-  printf("got element\n");
-  return GNUNET_YES;
+  struct GNUNET_CONSENSUS_Handle *consensus;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect adapter, %d peers\n", 
num_peers);
+  consensus = GNUNET_CONSENSUS_create (cfg, num_peers, peer_ids, &session_id, 
new_element_cb, NULL);
+  GNUNET_assert (NULL != consensus);
+  return consensus;
 }
 
 
 /**
- * Function run on shutdown to clean up.
+ * Adapter function called to destroy a connection to
+ * a service.
  *
- * @param cls the statistics handle
- * @param tc scheduler context
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
  */
 static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+disconnect_adapter(void *cls, void *op_result)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
-  if (NULL != consensus)
-  {
-    GNUNET_CONSENSUS_destroy (consensus);
-    consensus = NULL;
-  }
+  /* FIXME: what to do here? */
 }
 
 
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed; will be NULL if the
+ *          operation is successfull
+ */
 static void
-run (void *cls, char *const *args, const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
+peer_info_cb (void *cb_cls,
+              struct GNUNET_TESTBED_Operation *op,
+              const struct GNUNET_TESTBED_PeerInformation *pinfo,
+              const char *emsg)
 {
-  struct GNUNET_HashCode sid;
-  struct GNUNET_PeerIdentity *pids;
-  int count;
+  struct GNUNET_PeerIdentity *p;
   int i;
 
-  if (NULL == session_id_str)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing 
-s/--session-id)\n");
-    return;
-  }
+  GNUNET_assert (NULL == emsg);
 
-  GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid);
+  p = (struct GNUNET_PeerIdentity *) cb_cls;
 
-  for (count = 0; NULL != args[count]; count++);
- 
-  if (0 != count)
-  { 
-    pids = GNUNET_malloc (count * sizeof (struct GNUNET_PeerIdentity));
+  if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
+  {
+    *p = *pinfo->result.id;
+    num_retrieved_peer_ids++;
+    if (num_retrieved_peer_ids == num_peers)
+      for (i = 0; i < num_peers; i++)
+        GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", 
connect_complete, &consensus_handles[i],
+                                        connect_adapter, disconnect_adapter, 
NULL);
   }
   else
   {
-    pids = NULL;
+    GNUNET_assert (0);
   }
+}
 
-  for (i = 0; i < count; i++)
-  {
-    int ret;
-    ret = GNUNET_CRYPTO_hash_from_string (args[i], &pids[i].hashPubKey);
-    if (GNUNET_OK != ret)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "peer identity '%s' is 
malformed\n", args[i]);
-      return;
-    }
-  }
 
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
-                                &shutdown_task, NULL);
-  
-  consensus = 
-      GNUNET_CONSENSUS_create (cfg,
-                               count, pids,
-                               &sid,
-                               &cb, NULL);
+static void
+test_master (void *cls,
+             unsigned int num_peers,
+             struct GNUNET_TESTBED_Peer **started_peers)
+{
+  int i;
 
-  stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
-  stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, 
stdin_fh,
-                                        &stdin_cb, NULL);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
+
+  peers = started_peers;
+
+  peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+
+  consensus_handles = GNUNET_malloc (num_peers * sizeof (struct 
ConsensusHandle *));
+
+  for (i = 0; i < num_peers; i++)
+    GNUNET_TESTBED_peer_get_information (peers[i],
+                                         GNUNET_TESTBED_PIT_IDENTITY,
+                                         peer_info_cb,
+                                         &peer_ids[i]);
 }
 
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  static char *session_str = "gnunet-consensus/test";
 
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "running gnunet-consensus\n");
+
+  GNUNET_CRYPTO_hash (session_str, strlen(session_str), &session_id);
+
+  (void) GNUNET_TESTBED_test_run ("gnunet-consensus",
+                                  cfgfile,
+                                  num_peers,
+                                  0,
+                                  controller_cb,
+                                  NULL,
+                                  test_master,
+                                  NULL);
+}
+
+
 int
 main (int argc, char **argv)
 {
    static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-      { 's', "session-id", "ID",
-        gettext_noop ("session identifier"),
-        GNUNET_YES, &GNUNET_GETOPT_set_string, &session_id_str },
-        GNUNET_GETOPT_OPTION_END
-   };
-  GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus",
+      { 'n', "num-peers", NULL,
+        gettext_noop ("number of peers in consensus"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers },
+      { 'k', "value-replication", NULL,
+        gettext_noop ("how many peers receive one value?"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication },
+      { 'x', "num-values", NULL,
+        gettext_noop ("number of values"),
+        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_values },
+      { 't', "timeout", NULL,
+        gettext_noop ("consensus timeout"),
+        GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &conclude_timeout },
+      GNUNET_GETOPT_OPTION_END
+  };
+  conclude_timeout = GNUNET_TIME_UNIT_SECONDS;
+  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
                      "help",
-                     options, &run, NULL);
+                     options, &run, NULL, GNUNET_YES);
   return 0;
 }
+

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-01-16 17:21:37 UTC 
(rev 25805)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-01-17 00:53:11 UTC 
(rev 25806)
@@ -32,16 +32,46 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_consensus_service.h"
 #include "gnunet_core_service.h"
-#include "gnunet_mesh_service.h"
+#include "gnunet_stream_lib.h"
+#include "consensus_protocol.h"
+#include "ibf.h"
 #include "consensus.h"
 
 
+/**
+ * Number of IBFs in a strata estimator.
+ */
+#define STRATA_COUNT 32
+/**
+ * Number of buckets per IBF.
+ */
+#define STRATA_IBF_BUCKETS 80
+/**
+ * hash num parameter of the IBF
+ */
+#define STRATA_HASH_NUM 3
+/**
+ * Number of strata that can be transmitted in one message.
+ */
+#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
+
+
+
+/* forward declarations */
+
 struct ConsensusSession;
+struct IncomingSocket;
 
 static void
 send_next (struct ConsensusSession *session);
 
+static void 
+write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
 
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct 
ConsensusSession *session);
+
+
 /**
  * An element that is waiting to be transmitted to a client.
  */
@@ -63,25 +93,78 @@
   struct GNUNET_CONSENSUS_Element *element;
 };
 
-
-/*
- * A peer that is also in a consensus session.
- * Note that 'this' peer is not in the list.
- */
-struct ConsensusPeer
+struct ConsensusPeerInformation
 {
-  struct GNUNET_PeerIdentity *peer_id;
+  struct GNUNET_STREAM_Socket *socket;
 
   /**
-   * Incoming tunnel from the peer.
+   * Is socket's connection established, i.e. can we write to it?
+   * Only relevent on outgoing cpi.
    */
-  struct GNUNET_MESH_Tunnel *incoming_tunnel;
+  int is_connected;
 
-  struct InvertibleBloomFilter *last_ibf;
+  /**
+   * Type of the peer in the all-to-all rounds,
+   * GNUNET_YES if we initiate reconciliation.
+   */
+  int is_outgoing;
 
+  /**
+   * Did we receive/send a consensus hello?
+   */
+  int hello;
+
+  /**
+   * Handle for currently active read
+   */
+  struct GNUNET_STREAM_ReadHandle *rh;
+
+  /**
+   * Handle for currently active read
+   */
+  struct GNUNET_STREAM_WriteHandle *wh;
+
+  /**
+   * How many of the strate in the ibf were
+   * sent or received in this round?
+   */
+  int strata_counter;
+
+  struct InvertibleBloomFilter *my_ibf;
+
+  int my_ibf_bucket_counter;
+
+  struct InvertibleBloomFilter *peer_ibf;
+
+  int peer_ibf_bucket_counter;
+
+  /**
+   * Strata estimator of the peer, NULL if our peer
+   * initiated the reconciliation.
+   */
+  struct InvertibleBloomFilter **strata;
+
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+  struct ConsensusSession *session;
 };
 
+struct QueuedMessage
+{
+  struct GNUNET_MessageHeader *msg;
 
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *next;
+
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *prev;
+};
+
+
 /**
  * A consensus session consists of one local client and the remote authorities.
  */
@@ -98,15 +181,17 @@
   struct ConsensusSession *prev;
 
   /**
-   * Local consensus identification, chosen by clients.
+   * Join message. Used to initialize the session later,
+   * if the identity of the local peer is not yet known.
+   * NULL if the session has been fully initialized.
    */
-  struct GNUNET_HashCode *local_id;
- 
+  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+
   /**
   * Global consensus identification, computed
   * from the local id and participating authorities.
   */
-  struct GNUNET_HashCode *global_id;
+  struct GNUNET_HashCode global_id;
 
   /**
    * Local client in this consensus session.
@@ -140,6 +225,10 @@
    */
   struct PendingElement *approval_pending_tail;
 
+  struct QueuedMessage *client_messages_head;
+
+  struct QueuedMessage *client_messages_tail;
+
   /**
    * Currently active transmit handle for sending to the client
    */
@@ -152,11 +241,6 @@
   int conclude_requested;
 
   /**
-   * Client has been informed about the conclusion.
-   */
-  int conclude_sent;
-
-  /**
    * Minimum number of peers to form a consensus group
    */
   int conclude_group_min;
@@ -178,30 +262,74 @@
    */
   unsigned int num_peers;
 
-  /**
-   * Other peers in the consensus, array of ConsensusPeer
-   */
-  struct ConsensusPeer *peers;
+  struct ConsensusPeerInformation *info;
 
   /**
-   * Tunnel for broadcasting to all other authorities
+   * Sorted array of peer identities in this consensus session,
+   * includes the local peer.
    */
-  struct GNUNET_MESH_Tunnel *broadcast_tunnel;
+  struct GNUNET_PeerIdentity *peers;
 
   /**
-   * Time limit for one round of pairwise exchange.
-   * FIXME: should not actually be a constant
+   * Index of the local peer in the peers array
    */
-  struct GNUNET_TIME_Relative round_time;
+  int local_peer_idx;
 
   /**
    * Task identifier for the round timeout task
    */
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
+
+  struct InvertibleBloomFilter **strata;
 };
 
 
 /**
+ * Sockets from other peers who want to communicate with us.
+ * It may not be known yet which consensus session they belong to.
+ */
+struct IncomingSocket
+{
+  /**
+   * Incoming sockets are kept in a double linked list.
+   */
+  struct IncomingSocket *next;
+
+  /**
+   * Incoming sockets are kept in a double linked list.
+   */
+  struct IncomingSocket *prev;
+
+  /**
+   * The actual socket.
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
+   * Handle for currently active read
+   */
+  struct GNUNET_STREAM_ReadHandle *rh;
+
+  /**
+   * Peer that connected to us with the socket.
+   */
+  struct GNUNET_PeerIdentity *peer;
+
+  /**
+   * Message stream tokenizer for this socket.
+   */
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+  /**
+   * Peer-in-session this socket belongs to, once known, otherwise NULL.
+   */
+  struct ConsensusPeerInformation *cpi;
+};
+
+static struct IncomingSocket *incoming_sockets_head;
+static struct IncomingSocket *incoming_sockets_tail;
+
+/**
  * Linked list of sesstions this peer participates in.
  */
 static struct ConsensusSession *sessions_head;
@@ -222,32 +350,349 @@
 static struct GNUNET_SERVER_Handle *srv;
 
 /**
- * Peer that runs this service
+ * Peer that runs this service.
  */
 static struct GNUNET_PeerIdentity *my_peer;
 
 /**
- * Handle to the mesh service.
+ * Handle to the core service. Only used during service startup, will be NULL 
after that.
  */
-static struct GNUNET_MESH_Handle *mesh;
+static struct GNUNET_CORE_Handle *core;
 
 /**
- * Handle to the core service. Only used during service startup, will be NULL 
after that.
+ * Listener for sockets from peers that want to reconcile with us.
  */
-static struct GNUNET_CORE_Handle *core;
+static struct GNUNET_STREAM_ListenSocket *listener;
 
+
+static int
+estimate_difference (struct InvertibleBloomFilter** strata1,
+                     struct InvertibleBloomFilter** strata2)
+{
+  int i;
+  int count;
+  count = 0;
+  for (i = STRATA_COUNT - 1; i >= 0; i--)
+  {
+    struct InvertibleBloomFilter *diff;
+    int ibf_count;
+    int more;
+    ibf_count = 0;
+    diff = ibf_dup (strata1[i]);
+    ibf_subtract (diff, strata2[i]);
+    for (;;)
+    {
+      more = ibf_decode (diff, NULL, NULL);
+      if (GNUNET_NO == more)
+      {
+        count += ibf_count;
+        break;
+      }
+      if (GNUNET_SYSERR == more)
+      {
+        return count * (1 << (i + 1));
+      }
+      ibf_count++;
+    }
+    ibf_destroy (diff);
+  }
+  return count;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on 
timeout 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+stream_data_processor (void *cls,
+                       enum GNUNET_STREAM_Status status,
+                       const void *data,
+                       size_t size)
+{
+  struct IncomingSocket *incoming;
+  int ret;
+
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+
+  incoming = (struct IncomingSocket *) cls;
+
+  ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, 
GNUNET_NO, GNUNET_NO);
+  if (GNUNET_SYSERR == ret)
+  {
+    /* FIXME: handle this correctly */
+    GNUNET_assert (0);
+  }
+
+  /* read again */
+  incoming->rh = GNUNET_STREAM_read (incoming->socket, 
GNUNET_TIME_UNIT_FOREVER_REL,
+                                     &stream_data_processor, incoming);
+
+  /* we always read all data */
+  return size;
+}
+
+static int
+handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct 
StrataMessage *strata_msg)
+{
+  int i;
+  int num_strata;
+  struct GNUNET_HashCode *hash_src;
+  uint8_t *count_src;
+
+  GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
+
+  if (NULL == cpi->strata)
+  {
+    cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct 
InvertibleBloomFilter *));
+    for (i = 0; i < STRATA_COUNT; i++)
+      cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
+  }
+
+  num_strata = ntohs (strata_msg->num_strata);
+
+  /* for correct message alignment, copy bucket types seperately */
+  hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
+
+  for (i = 0; i < num_strata; i++)
+  {
+    memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, 
STRATA_IBF_BUCKETS * sizeof *hash_src);
+    hash_src += STRATA_IBF_BUCKETS;
+  }
+
+  for (i = 0; i < num_strata; i++)
+  {
+    memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, 
STRATA_IBF_BUCKETS * sizeof *hash_src);
+    hash_src += STRATA_IBF_BUCKETS;
+  }
+
+  count_src = (uint8_t *) hash_src;
+
+  for (i = 0; i < num_strata; i++)
+  {
+    uint8_t zero[STRATA_IBF_BUCKETS];
+    memset (zero, 0, STRATA_IBF_BUCKETS);
+    memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, 
STRATA_IBF_BUCKETS);
+    count_src += STRATA_IBF_BUCKETS;
+  }
+
+  GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + 
STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
+
+  cpi->strata_counter += num_strata;
+
+  if (STRATA_COUNT == cpi->strata_counter)
+  {
+    int diff;
+    diff = estimate_difference (cpi->session->strata, cpi->strata);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
+  }
+
+  return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct 
DifferenceDigest *strata)
+{
+  return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element 
*strata)
+{
+  return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello 
*hello)
+{
+  struct ConsensusSession *session;
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
+    {
+      int idx;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
+      idx = get_peer_idx (inc->peer, session);
+      GNUNET_assert (-1 != idx);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
+      inc->cpi = &session->info[idx];
+      GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
+      inc->cpi->mst = inc->mst;
+      inc->cpi->hello = GNUNET_YES;
+      inc->cpi->socket = inc->socket;
+      return GNUNET_YES;
+    }
+    session = session->next;
+  }
+  GNUNET_assert (0);
+  return GNUNET_NO;
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mst_session_callback (void *cls, void *client, const struct 
GNUNET_MessageHeader *message)
+{
+  struct ConsensusPeerInformation *cpi;
+  cpi = (struct ConsensusPeerInformation *) cls;
+  switch (ntohs( message->type))
+  {
+    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
+      return handle_p2p_strata (cpi, (struct StrataMessage *) message);
+    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
+      return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
+    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
+      return handle_p2p_element (cpi, (struct Element *) message);
+    default:
+      /* FIXME: handle correctly */
+      GNUNET_assert (0);
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle tokenized messages from stream sockets.
+ * Delegate them if the socket belongs to a session,
+ * handle hello messages otherwise.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure, unused
+ * @param client incoming socket this message comes from
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mst_incoming_callback (void *cls, void *client, const struct 
GNUNET_MessageHeader *message)
+{
+  struct IncomingSocket *inc;
+  inc = (struct IncomingSocket *) client;
+  switch (ntohs( message->type))
+  {
+    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
+      return handle_p2p_hello (inc, (struct ConsensusHello *) message);
+    default:
+      if (NULL != inc->cpi)
+        return mst_session_callback (inc->cpi, client, message);
+      /* FIXME: disconnect peer properly */
+      GNUNET_assert (0);
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other 
peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ *            with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ *             stream (the socket will be invalid after the call)
+ */
+static int
+listen_cb (void *cls,
+           struct GNUNET_STREAM_Socket *socket,
+           const struct GNUNET_PeerIdentity *initiator)
+{
+  struct IncomingSocket *incoming;
+
+  GNUNET_assert (NULL != socket);
+
+  incoming = GNUNET_malloc (sizeof *incoming);
+
+  incoming->socket = socket;
+  incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
+
+  incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
+                                     &stream_data_processor, incoming);
+
+
+  incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
+
+  GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, 
incoming_sockets_tail, incoming);
+
+  return GNUNET_OK;
+}
+
+
 static void
+destroy_session (struct ConsensusSession *session)
+{
+  /* FIXME: more stuff to free! */
+  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+  GNUNET_SERVER_client_drop (session->client);
+  GNUNET_free (session);
+}
+
+
+/**
+ * Disconnect a client, and destroy all sessions associated with it.
+ *
+ * @param client the client to disconnect
+ */
+static void
 disconnect_client (struct GNUNET_SERVER_Client *client)
 {
+  struct ConsensusSession *session;
   GNUNET_SERVER_client_disconnect (client);
-  /* FIXME: free data structures that this client owns */
+  
+  /* if the client owns a session, remove it */
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (client == session->client)
+    {
+      destroy_session (session);
+      break;
+    }
+    session = session->next;
+  }
 }
 
+
+/**
+ * Compute a global, (hopefully) unique consensus session id,
+ * from the local id of the consensus session, and the identities of all 
participants.
+ * Thus, if the local id of two consensus sessions coincide, but are not 
comprised of
+ * exactly the same peers, the global id will be different.
+ *
+ * @param local_id local id of the consensus session
+ * @param peers array of all peers participating in the consensus session
+ * @param num_peers number of elements in the peers array
+ * @param dst where the result is stored, may not be NULL
+ */
 static void
-compute_global_id (struct GNUNET_HashCode *dst,
-                   const struct GNUNET_HashCode *local_id,
-                   const struct GNUNET_PeerIdentity *peers,
-                   int num_peers)
+compute_global_id (const struct GNUNET_HashCode *local_id,
+                   const struct GNUNET_PeerIdentity *peers, int num_peers, 
+                   struct GNUNET_HashCode *dst)
 {
   int i;
   struct GNUNET_HashCode tmp;
@@ -263,45 +708,50 @@
 }
 
 
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
 static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_queued (void *cls, size_t size,
+                 void *buf)
 {
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_CONSENSUS_ElementMessage *msg;
   struct ConsensusSession *session;
+  struct QueuedMessage *qmsg;
+  size_t msg_size;
 
   session = (struct ConsensusSession *) cls;
-  msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
-  element = session->transmit_pending_head->element;
-
-  GNUNET_assert (NULL != element);
-
   session->th = NULL;
 
-  msg->element_type = element->type;
-  msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
-  msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 
element->size);
-  memcpy (&msg[1], element->data, element->size);
 
-  session->transmit_pending_head = session->transmit_pending_head->next;
+  qmsg = session->client_messages_head;
+  GNUNET_CONTAINER_DLL_remove (session->client_messages_head, 
session->client_messages_tail, qmsg);
+  GNUNET_assert (qmsg);
 
-  send_next (session);
+  if (NULL == buf)
+  {
+    destroy_session (session);
+    return 0;
+  }
 
-  return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
-}
+  msg_size = ntohs (qmsg->msg->size);
 
+  GNUNET_assert (size >= msg_size);
 
-static size_t
-transmit_conclude_done (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
+  memcpy (buf, qmsg->msg, msg_size);
+  GNUNET_free (qmsg->msg);
+  GNUNET_free (qmsg);
 
-  msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
-  msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
-  msg->header.size = htons (sizeof (struct 
GNUNET_CONSENSUS_ConcludeDoneMessage));
-  msg->num_peers = htons (0);
+  send_next (session);
 
-  return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
+  return msg_size;
 }
 
 
@@ -313,65 +763,253 @@
 static void
 send_next (struct ConsensusSession *session)
 {
-  int msize;
 
   GNUNET_assert (NULL != session);
 
   if (NULL != session->th)
-  {
     return;
-  }
 
-  if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent 
== GNUNET_NO))
+  if (NULL != session->client_messages_head)
   {
-    /* FIXME */
-    msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-    session->th =
-        GNUNET_SERVER_notify_transmit_ready (session->client, msize,
-                                             GNUNET_TIME_UNIT_FOREVER_REL, 
&transmit_conclude_done, session);
-    session->conclude_sent = GNUNET_YES;
+    int msize;
+    msize = ntohs (session->client_messages_head->msg->size);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
+    session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
+                                                       
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_queued, 
session);
   }
-  else if (NULL != session->transmit_pending_head)
+}
+
+
+/**
+ * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
+ * the correct signature to be used with e.g. qsort.
+ * We use this function instead.
+ *
+ * @param h1 some hash code
+ * @param h2 some hash code
+ * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
+ */
+static int
+hash_cmp (const void *a, const void *b)
+{
+  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct 
GNUNET_HashCode *) b);
+}
+
+
+/**
+ * Search peer in the list of peers in session.
+ *
+ * @param peer peer to find
+ * @param session session with peer
+ * @return index of peer, -1 if peer is not in session
+ */
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct 
ConsensusSession *session)
+{
+  const struct GNUNET_PeerIdentity *needle;
+  needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct 
GNUNET_PeerIdentity), &hash_cmp);
+  if (NULL == needle)
+    return -1;
+  return needle - session->peers;
+}
+
+
+
+static void
+hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+  struct ConsensusPeerInformation *cpi;
+
+  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi->hello = GNUNET_YES;
+  
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+
+  cpi = (struct ConsensusPeerInformation *) cls;
+
+  if (cpi->session->conclude_requested)
   {
-    msize = session->transmit_pending_head->element->size + sizeof (struct 
GNUNET_CONSENSUS_ElementMessage);
-    session->th =
-        GNUNET_SERVER_notify_transmit_ready (session->client, msize,
-                                             GNUNET_TIME_UNIT_FOREVER_REL, 
&transmit_pending, session);
-    /* TODO: insert into ack pending */
+    write_strata (cpi, GNUNET_STREAM_OK, 0);  
   }
 }
 
 
 /**
- * Method called whenever a peer has disconnected from the tunnel.
- * Implementations of this callback must NOT call
- * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
- * to run in some other task later.  However, calling 
- * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
+ * Functions of this type will be called when a stream is established
  *
- * @param cls closure
- * @param peer peer identity the tunnel stopped working with
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
  */
 static void
-disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
+open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
 {
-  /* FIXME: how do we handle this */
+  struct ConsensusPeerInformation *cpi;
+  struct ConsensusHello *hello;
+
+
+  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi->is_connected = GNUNET_YES;
+
+  hello = GNUNET_malloc (sizeof *hello);
+  hello->header.size = htons (sizeof *hello);
+  hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
+  memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct 
GNUNET_HashCode));
+
+
+  cpi->wh =
+      GNUNET_STREAM_write (socket, hello, sizeof *hello, 
GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
+
 }
 
 
+static void
+initialize_session_info (struct ConsensusSession *session)
+{
+  int i;
+  int last;
+
+  for (i = 0; i < session->num_peers; ++i)
+  {
+    /* initialize back-references, so consensus peer information can
+     * be used as closure */
+    session->info[i].session = session;
+
+  }
+
+  last = (session->local_peer_idx + (session->num_peers / 2)) % 
session->num_peers;
+  i = (session->local_peer_idx + 1) % session->num_peers;
+  while (i != last)
+  {
+    session->info[i].is_outgoing = GNUNET_YES;
+    session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], 
GNUNET_APPLICATION_TYPE_CONSENSUS,
+                                                  open_cb, &session->info[i], 
GNUNET_STREAM_OPTION_END);
+    session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, 
session);
+    i = (i + 1) % session->num_peers;
+  }
+  // tie-breaker for even number of peers
+  if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
+  {
+    session->info[last].is_outgoing = GNUNET_YES;
+    session->info[last].socket = GNUNET_STREAM_open (cfg, 
&session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
+                                                     open_cb, 
&session->info[last], GNUNET_STREAM_OPTION_END);
+  }
+}
+
+
 /**
- * Method called whenever a peer has connected to the tunnel.
+ * Create the sorted list of peers for the session,
+ * add the local peer if not in the join message.
+ */
+static void
+initialize_session_peer_list (struct ConsensusSession *session)
+{
+  int local_peer_in_list;
+  int listed_peers;
+  const struct GNUNET_PeerIdentity *msg_peers;
+  unsigned int i;
+
+  GNUNET_assert (NULL != session->join_msg);
+
+  /* peers in the join message, may or may not include the local peer */
+  listed_peers = ntohs (session->join_msg->num_peers);
+  
+  session->num_peers = listed_peers;
+
+  msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
+
+  local_peer_in_list = GNUNET_NO;
+  for (i = 0; i < listed_peers; i++)
+  {
+    if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct 
GNUNET_PeerIdentity)))
+    {
+      local_peer_in_list = GNUNET_YES;
+      break;
+    }
+  }
+
+  if (GNUNET_NO == local_peer_in_list)
+    session->num_peers++;
+
+  session->peers = GNUNET_malloc (session->num_peers * sizeof (struct 
GNUNET_PeerIdentity));
+
+  if (GNUNET_NO == local_peer_in_list)
+    session->peers[session->num_peers - 1] = *my_peer;
+
+  memcpy (session->peers, msg_peers, listed_peers * sizeof (struct 
GNUNET_PeerIdentity));
+  qsort (session->peers, session->num_peers, sizeof (struct 
GNUNET_PeerIdentity), &hash_cmp);
+}
+
+
+static void
+strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode 
*key)
+{
+  uint32_t v;
+  int i;
+  v = key->bits[0];
+  /* count trailing '1'-bits of v */
+  for (i = 0; v & 1; v>>=1, i++);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
+
+  ibf_insert (strata[i], key);
+}
+
+
+/**
+ * Initialize the session, continue receiving messages from the owning client
  *
- * @param cls closure
- * @param peer peer identity the tunnel was created to, NULL on timeout
- * @param atsi performance data for the connection
+ * @param session the session to initialize
  */
 static void
-connect_handler (void *cls,
-                 const struct GNUNET_PeerIdentity *peer,
-                 const struct GNUNET_ATS_Information *atsi)
+initialize_session (struct ConsensusSession *session)
 {
-  /* not much we can do here, now we know the other peer has been added to our 
broadcast tunnel */
+  const struct ConsensusSession *other_session;
+  int i;
+
+  GNUNET_assert (NULL != session->join_msg);
+
+  initialize_session_peer_list (session);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", 
session->num_peers);
+
+  compute_global_id (&session->join_msg->session_id, session->peers, 
session->num_peers, &session->global_id);
+
+  /* Check if some local client already owns the session. */
+  other_session = sessions_head;
+  while (NULL != other_session)
+  {
+    if ((other_session != session) && 
+        (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, 
&other_session->global_id)))
+    {
+      /* session already owned by another client */
+      GNUNET_break (0);
+      disconnect_client (session->client);
+      return;
+    }
+    other_session = other_session->next;
+  }
+
+  session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
+
+  session->local_peer_idx = get_peer_idx (my_peer, session);
+  GNUNET_assert (-1 != session->local_peer_idx);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", 
session->local_peer_idx);
+
+  session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct 
InvertibleBloomFilter *));
+  for (i = 0; i < STRATA_COUNT; i++)
+    session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
+
+  session->info = GNUNET_malloc (session->num_peers * sizeof (struct 
ConsensusPeerInformation));
+
+  initialize_session_info (session);
+
+  GNUNET_free (session->join_msg);
+  session->join_msg = NULL;
+
+  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s 
(&session->global_id));
 }
 
 
@@ -387,88 +1025,49 @@
              struct GNUNET_SERVER_Client *client,
              const struct GNUNET_MessageHeader *m)
 {
-  struct GNUNET_HashCode global_id;
-  const struct GNUNET_CONSENSUS_JoinMessage *msg;
   struct ConsensusSession *session;
-  unsigned int i;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n");
 
-  msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s 
(&msg->session_id));
-
-  compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity 
*) &m[1], msg->num_peers);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s 
(&global_id));
-
+  // make sure the client has not already joined a session
   session = sessions_head;
   while (NULL != session)
   {
-    if (client == session->client)
+    if (session->client == client)
     {
-
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
+      GNUNET_break (0);
       disconnect_client (client);
       return;
     }
-    if (0 == memcmp (session->global_id, &global_id, sizeof (struct 
GNUNET_HashCode)))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another 
client\n");
-      disconnect_client (client);
-      return;
-    }
     session = session->next;
   }
 
-  GNUNET_SERVER_client_keep (client);
-
-  /* session does not exist yet, create it */
   session = GNUNET_malloc (sizeof (struct ConsensusSession));
-  session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct 
GNUNET_HashCode));
-  session->global_id = GNUNET_memdup (&global_id, sizeof (struct 
GNUNET_HashCode));
-  session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
+  session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) 
GNUNET_copy_message (m);
   session->client = client;
-  /* FIXME: should not be a constant, but chosen adaptively */
-  session->round_time = GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5);
+  GNUNET_SERVER_client_keep (client);
 
-  session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, 
connect_handler, disconnect_handler, session);
+  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
 
-  session->num_peers = 0;
-
-  /* count the peers that are not the local peer */
-  for (i = 0; i < msg->num_peers; i++)
+  // Initialize session later if local peer identity is not known yet.
+  if (NULL == my_peer)
   {
-    struct GNUNET_PeerIdentity *peers;
-    peers = (struct GNUNET_PeerIdentity *) &msg[1];
-    if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
-      session->num_peers++;
+    GNUNET_SERVER_disable_receive_done_warning (client);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
+    return;
   }
 
-  session->peers = GNUNET_malloc (session->num_peers * sizeof (struct 
ConsensusPeer));
-
-  /* copy the peer identities and add peers to broadcast tunnel */
-  for (i = 0; i < msg->num_peers; i++)
-  {
-    struct GNUNET_PeerIdentity *peers;
-    peers = (struct GNUNET_PeerIdentity *) &msg[1];
-    if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
-    {
-      *session->peers->peer_id = peers[i];
-      GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, 
&peers[i]);
-    }
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
-
-  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
+  initialize_session (session);
 }
 
 
 /**
  * Called when a client performs an insert operation.
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
  */
 void
 client_insert (void *cls,
@@ -510,39 +1109,155 @@
   GNUNET_CRYPTO_hash (element, element_size, &key);
 
   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 
+  strata_insert (session->strata, &key);
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
   send_next (session);
 }
 
 
+
 /**
- * Do one round of the conclusion.
- * Start by broadcasting the set difference estimator (IBF strata).
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
  *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ *          GNUNET_STREAM_OK if writing to stream was completed successfully;
+ *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ *          (this doesn't mean that the data is never sent, the receiver may
+ *          have read the data but its ACKs may have been lost);
+ *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ *          be processed.
+ * @param size the number of bytes written
  */
-void
-conclude_do_round (struct ConsensusSession *session)
+static void 
+write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
 {
-  /* FIXME */
+  struct ConsensusPeerInformation *cpi;
+  struct StrataMessage *strata_msg;
+  size_t msize;
+  int i;
+  struct GNUNET_HashCode *hash_dst;
+  uint8_t *count_dst;
+  int num_strata;
+
+  cpi = (struct ConsensusPeerInformation *) cls;
+
+  GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
+
+  /* FIXME: handle this */
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+
+  if (STRATA_COUNT == cpi->strata_counter)
+  {
+    /* strata have been written, wait for other side's IBF */
+    return;
+  }
+
+  if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
+    num_strata = (STRATA_COUNT - cpi->strata_counter);
+  else
+    num_strata = STRATA_PER_MESSAGE;
+
+
+  msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * 
STRATA_IBF_BUCKETS);
+
+  strata_msg = GNUNET_malloc (msize);
+  strata_msg->header.size = htons (msize);
+  strata_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
+  strata_msg->num_strata = htons (num_strata);
+
+  /* for correct message alignment, copy bucket types seperately */
+  hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
+
+  for (i = 0; i < num_strata; i++)
+  {
+    memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, 
STRATA_IBF_BUCKETS * sizeof *hash_dst);
+    hash_dst += STRATA_IBF_BUCKETS;
+  }
+
+  for (i = 0; i < num_strata; i++)
+  {
+    memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, 
STRATA_IBF_BUCKETS * sizeof *hash_dst);
+    hash_dst += STRATA_IBF_BUCKETS;
+  }
+
+  count_dst = (uint8_t *) hash_dst;
+
+  for (i = 0; i < num_strata; i++)
+  {
+    memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, 
STRATA_IBF_BUCKETS);
+    count_dst += STRATA_IBF_BUCKETS;
+  }
+
+  cpi->strata_counter += num_strata;
+
+  cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, 
GNUNET_TIME_UNIT_FOREVER_REL,
+                                 write_strata, cpi);
+
+  GNUNET_assert (NULL != cpi->wh);
 }
 
 
 /**
- * Cancel the current round if necessary, decide to run another round or
- * terminate.
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ *          GNUNET_STREAM_OK if writing to stream was completed successfully;
+ *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ *          (this doesn't mean that the data is never sent, the receiver may
+ *          have read the data but its ACKs may have been lost);
+ *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ *          be processed.
+ * @param size the number of bytes written
  */
-void
-conclude_round_done (struct ConsensusSession *session)
+static void 
+write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
 {
-  /* FIXME */
+  struct ConsensusPeerInformation *cpi;
+
+  cpi = (struct ConsensusPeerInformation *) cls;
 }
 
 
 /**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ *          GNUNET_STREAM_OK if writing to stream was completed successfully;
+ *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ *          (this doesn't mean that the data is never sent, the receiver may
+ *          have read the data but its ACKs may have been lost);
+ *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ *          be processed.
+ * @param size the number of bytes written
+ */
+static void 
+write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+  struct ConsensusPeerInformation *cpi;
+
+  cpi = (struct ConsensusPeerInformation *) cls;
+}
+
+
+/**
  * Called when a client performs the conclude operation.
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
  */
 void
 client_conclude (void *cls,
@@ -550,40 +1265,55 @@
              const struct GNUNET_MessageHeader *message)
 {
   struct ConsensusSession *session;
+  int i;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
 
   session = sessions_head;
   while ((session != NULL) && (session->client != client))
-  {
     session = session->next;
-  }
   if (NULL == session)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
+    /* client not found */
+    GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
   if (GNUNET_YES == session->conclude_requested)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude 
twice\n");
-    GNUNET_SERVER_client_disconnect (client);
+    /* client requested conclude twice */
+    GNUNET_break (0);
+    disconnect_client (client);
     return;
   }
 
   session->conclude_requested = GNUNET_YES;
 
-  conclude_do_round (session);
+  /* FIXME: write to already connected sockets */
 
+  for (i = 0; i < session->num_peers; i++)
+  {
+    if ( (GNUNET_YES == session->info[i].is_outgoing) &&
+         (GNUNET_YES == session->info[i].hello) )
+    {
+      /* kick off transmitting strata by calling the write continuation */
+      write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
+    }
+  }
+  
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
   send_next (session);
 }
 
 
 /**
  * Called when a client sends an ack
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
  */
 void
 client_ack (void *cls,
@@ -614,72 +1344,24 @@
               struct GNUNET_CORE_Handle *core,
               const struct GNUNET_PeerIdentity *peer)
 {
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
-    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
-    {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
-        sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
-    {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
-        sizeof (struct GNUNET_CONSENSUS_AckMessage)},
-    {NULL, NULL, 0, 0}
-  };
+  struct ConsensusSession *session;
 
-  GNUNET_SERVER_add_handlers (srv, handlers);
   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
   /* core can't be disconnected directly in the core startup callback, 
schedule a task to do it! */
   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
-}
 
-
-
-/**
- * Method called whenever another peer has added us to a tunnel
- * the other peer initiated.
- * Only called (once) upon reception of data with a message type which was
- * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
- * causes te tunnel to be ignored and no further notifications are sent about
- * the same tunnel.
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- *         (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel (void *cls,
-            struct GNUNET_MESH_Tunnel *tunnel,
-            const struct GNUNET_PeerIdentity *initiator,
-            const struct GNUNET_ATS_Information *atsi)
-{
-  /* there's nothing we can do here, as we don't have the global consensus id 
yet */
-  return NULL;
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (NULL != session->join_msg)
+      initialize_session (session);
+    session = session->next;
+  }
 }
 
 
 /**
- * Function called whenever an inbound tunnel is destroyed.  Should clean up
- * any associated state.  This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- *                   with the tunnel is stored
- */
-static void
-cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
-{
-  /* FIXME: what to do here? */
-}
-
-
-
-/**
  * Called to clean up, after a shutdown has been requested.
  *
  * @param cls closure
@@ -689,109 +1371,31 @@
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  /* mesh requires all the tunnels to be destroyed manually */
   while (NULL != sessions_head)
   {
     struct ConsensusSession *session;
     session = sessions_head;
-    GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
     sessions_head = sessions_head->next;
     GNUNET_free (session);
   }
 
-  if (NULL != mesh)
-  {
-    GNUNET_MESH_disconnect (mesh);
-    mesh = NULL;
-  }
   if (NULL != core)
   {
     GNUNET_CORE_disconnect (core);
     core = NULL;
   }
-}
 
+  if (NULL != listener)
+  {
+    GNUNET_STREAM_listen_close (listener);
+    listener = NULL;
+  } 
 
-
-/**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_delta_estimate (void *cls,
-                    struct GNUNET_MESH_Tunnel * tunnel,
-                    void **tunnel_ctx,
-                    const struct GNUNET_PeerIdentity *sender,
-                    const struct GNUNET_MessageHeader *message,
-                    const struct GNUNET_ATS_Information *atsi)
-{
-  /* FIXME */
-  return GNUNET_OK;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
 }
 
 
 /**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_difference_digest (void *cls,
-                       struct GNUNET_MESH_Tunnel * tunnel,
-                       void **tunnel_ctx,
-                       const struct GNUNET_PeerIdentity *sender,
-                       const struct GNUNET_MessageHeader *message,
-                       const struct GNUNET_ATS_Information *atsi)
-{
-  /* FIXME */
-  return GNUNET_OK;
-}
-
-
-/**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_elements_and_requests (void *cls,
-                           struct GNUNET_MESH_Tunnel * tunnel,
-                           void **tunnel_ctx,
-                           const struct GNUNET_PeerIdentity *sender,
-                           const struct GNUNET_MessageHeader *message,
-                           const struct GNUNET_ATS_Information *atsi)
-{
-  /* FIXME */
-  return GNUNET_OK;
-}
-
-
-/**
  * Start processing consensus requests.
  *
  * @param cls closure
@@ -801,33 +1405,38 @@
 static void
 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct 
GNUNET_CONFIGURATION_Handle *c)
 {
-  static const struct GNUNET_CORE_MessageHandler handlers[] = {
+  static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
     {NULL, 0, 0}
   };
-  static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
-    {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
-    {p2p_difference_digest, 
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
-    {p2p_elements_and_requests, 
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
-    {NULL, 0, 0}
+  static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
+    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+    {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
+        sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+    {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
+        sizeof (struct GNUNET_CONSENSUS_AckMessage)},
+    {NULL, NULL, 0, 0}
   };
-  static const GNUNET_MESH_ApplicationType app_types[] = { 
-    GNUNET_APPLICATION_TYPE_CONSENSUS,
-    GNUNET_APPLICATION_TYPE_END
-  };
 
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
-
   cfg = c;
   srv = server;
 
+  GNUNET_SERVER_add_handlers (server, server_handlers);
+
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 
NULL);
 
-  mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, 
app_types);
-  GNUNET_assert (NULL != mesh);
 
+  listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
+                                   listen_cb, NULL,
+                                   GNUNET_STREAM_OPTION_END);
+
+
   /* we have to wait for the core_startup callback before proceeding with the 
consensus service startup */
-  core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, 
GNUNET_NO, NULL, GNUNET_NO, handlers);
+  core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, 
GNUNET_NO, NULL, GNUNET_NO, core_handlers);
   GNUNET_assert (NULL != core);
+
+  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", 
STRATA_PER_MESSAGE);
 }
 
 
@@ -843,6 +1452,7 @@
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", 
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 

Modified: gnunet/src/consensus/ibf.c
===================================================================
--- gnunet/src/consensus/ibf.c  2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/ibf.c  2013-01-17 00:53:11 UTC (rev 25806)
@@ -25,53 +25,11 @@
  * @author Florian Dold
  */
 
+
 #include "ibf.h"
 
 
 /**
- * Opaque handle to an invertible bloom filter (IBF).
- *
- * An IBF is a counting bloom filter that has the ability to restore
- * the hashes of its stored elements with high probability.
- */
-struct InvertibleBloomFilter
-{
-  /**
-   * How many cells does this IBF have?
-   */
-  unsigned int size;
-
-  /**
-   * In how many cells do we hash one element?
-   * Usually 4 or 3.
-   */
-  unsigned int hash_num;
-
-  /**
-   * Salt for mingling hashes
-   */
-  uint32_t salt;
-
-  /**
-   * How many times has a bucket been hit?
-   * Can be negative, as a result of IBF subtraction.
-   */
-  int8_t *count;
-
-  /**
-   * xor sums of the elements' hash codes, used to identify the elements.
-   */
-  struct GNUNET_HashCode *id_sum;
-
-  /**
-   * xor sums of the "hash of the hash".
-   */
-  struct GNUNET_HashCode *hash_sum;
-
-};
-
-
-/**
  * Create an invertible bloom filter.
  *
  * @param size number of IBF buckets
@@ -152,6 +110,8 @@
       used_buckets[i] = bucket;
       
       ibf->count[bucket] += side;
+
+      GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d 
\n", bucket);
       
       GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
                              &ibf->id_sum[bucket]);
@@ -214,8 +174,6 @@
   int i;
 
   GNUNET_assert (NULL != ibf);
-  GNUNET_assert (NULL != ret_id);
-  GNUNET_assert (NULL != ret_side);
 
   for (i = 0; i < ibf->size; i++)
   {
@@ -227,8 +185,10 @@
     if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct 
GNUNET_HashCode)))
       continue;
 
-    *ret_side = ibf->count[i];
-    *ret_id = ibf->id_sum[i];
+    if (NULL != ret_side)
+      *ret_side = ibf->count[i];
+    if (NULL != ret_id)
+      *ret_id = ibf->id_sum[i];
 
     /* insert on the opposite side, effectively removing the element */
     ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]);
@@ -269,3 +229,36 @@
   }
 }
 
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (struct InvertibleBloomFilter *ibf)
+{
+  struct InvertibleBloomFilter *copy;
+  copy = GNUNET_malloc (sizeof *copy);
+  copy->hash_num = ibf->hash_num;
+  copy->salt = ibf->salt;
+  copy->size = ibf->size;
+  copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct 
GNUNET_HashCode));
+  copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct 
GNUNET_HashCode));
+  copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t));
+  return copy;
+}
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf)
+{
+  GNUNET_free (ibf->hash_sum);
+  GNUNET_free (ibf->id_sum);
+  GNUNET_free (ibf->count);
+  GNUNET_free (ibf);
+}

Modified: gnunet/src/consensus/ibf.h
===================================================================
--- gnunet/src/consensus/ibf.h  2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/ibf.h  2013-01-17 00:53:11 UTC (rev 25806)
@@ -39,16 +39,54 @@
 #endif
 #endif
 
+/**
+ * Size of one ibf bucket in bytes
+ */
+#define IBF_BUCKET_SIZE (64+64+1)
 
+
 /**
- * Opaque handle to an invertible bloom filter (IBF).
+ * Invertible bloom filter (IBF).
  *
  * An IBF is a counting bloom filter that has the ability to restore
  * the hashes of its stored elements with high probability.
  */
-struct InvertibleBloomFilter;
+struct InvertibleBloomFilter
+{
+  /**
+   * How many cells does this IBF have?
+   */
+  unsigned int size;
 
+  /**
+   * In how many cells do we hash one element?
+   * Usually 4 or 3.
+   */
+  unsigned int hash_num;
 
+  /**
+   * Salt for mingling hashes
+   */
+  uint32_t salt;
+
+  /**
+   * xor sums of the elements' hash codes, used to identify the elements.
+   */
+  struct GNUNET_HashCode *id_sum;
+
+  /**
+   * xor sums of the "hash of the hash".
+   */
+  struct GNUNET_HashCode *hash_sum;
+
+  /**
+   * How many times has a bucket been hit?
+   * Can be negative, as a result of IBF subtraction.
+   */
+  int8_t *count;
+};
+
+
 /**
  * Create an invertible bloom filter.
  *
@@ -106,15 +144,6 @@
 struct InvertibleBloomFilter *
 ibf_dup (struct InvertibleBloomFilter *ibf);
 
-
-/*
-ibf_hton ();
-
-ibf_ntoh ();
-
-ibf_get_nbo_size ();
-*/
-
 /**
  * Destroy all resources associated with the invertible bloom filter.
  * No more ibf_*-functions may be called on ibf after calling destroy.

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2013-01-16 17:21:37 UTC (rev 
25805)
+++ gnunet/src/consensus/test_consensus.conf    2013-01-17 00:53:11 UTC (rev 
25806)
@@ -1,6 +1,6 @@
 [consensus]
 AUTOSTART = YES
-PORT = 2103
+PORT = 2110
 HOSTNAME = localhost
 HOME = $SERVICEHOME
 BINARY = gnunet-service-consensus

Modified: gnunet/src/consensus/test_consensus_api.c
===================================================================
--- gnunet/src/consensus/test_consensus_api.c   2013-01-16 17:21:37 UTC (rev 
25805)
+++ gnunet/src/consensus/test_consensus_api.c   2013-01-17 00:53:11 UTC (rev 
25806)
@@ -17,6 +17,7 @@
      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
      Boston, MA 02111-1307, USA.
 */
+
 /**
  * @file consensus/test_consensus_api.c
  * @brief testcase for consensus_api.c
@@ -29,18 +30,20 @@
 
 static struct GNUNET_CONSENSUS_Handle *consensus;
 
-static int insert;
-
 static struct GNUNET_HashCode session_id;
 
 
-static void conclude_done (void *cls, 
-                           unsigned int num_peers_in_consensus,
-                           const struct GNUNET_PeerIdentity 
*peers_in_consensus)
+static int
+conclude_done (void *cls, const struct GNUNET_CONSENSUS_Group *group)
 {
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n");
+  if (NULL == group)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude over\n");
+    GNUNET_SCHEDULER_shutdown ();
+    return GNUNET_NO;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "concluded\n");
+  return GNUNET_YES;
 }
 
 static int
@@ -54,11 +57,34 @@
 static void
 insert_done (void *cls, int success)
 {
+  /* make sure cb is only called once */
+  static int called = GNUNET_NO;
+  GNUNET_assert (GNUNET_NO == called);
+  called = GNUNET_YES;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n");
+  GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_SECONDS, 0, 
&conclude_done, NULL);
 }
 
 
+/**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
 static void
+on_shutdown (void *cls,
+          const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (NULL != consensus)
+  {
+    GNUNET_CONSENSUS_destroy (consensus);
+    consensus = NULL;
+  }
+}
+
+
+static void
 run (void *cls, 
      const struct GNUNET_CONFIGURATION_Handle *cfg,
      struct GNUNET_TESTING_Peer *peer)
@@ -69,18 +95,19 @@
   struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0};
 
   GNUNET_log_setup ("test_consensus_api",
-                    "DEBUG",
+                    "INFO",
                     NULL);
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
 
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &on_shutdown, 
NULL);
+
   GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
   consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, 
on_new_element, &consensus);
   GNUNET_assert (consensus != NULL);
-  /*
-  GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1);
-  GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2);
-  */
+
+  GNUNET_CONSENSUS_insert (consensus, &el1, NULL, &consensus);
+  GNUNET_CONSENSUS_insert (consensus, &el2, &insert_done, &consensus);
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]