[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25806 - gnunet/src/consensus
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25806 - gnunet/src/consensus |
Date: |
Thu, 17 Jan 2013 01:53:11 +0100 |
Author: dold
Date: 2013-01-17 01:53:11 +0100 (Thu, 17 Jan 2013)
New Revision: 25806
Added:
gnunet/src/consensus/consensus_protocol.h
Modified:
gnunet/src/consensus/Makefile.am
gnunet/src/consensus/consensus.h
gnunet/src/consensus/consensus_api.c
gnunet/src/consensus/gnunet-consensus-start-peers.c
gnunet/src/consensus/gnunet-consensus.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/consensus/ibf.c
gnunet/src/consensus/ibf.h
gnunet/src/consensus/test_consensus.conf
gnunet/src/consensus/test_consensus_api.c
Log:
- gnunet-consensus now profiling tool
- work on service implementation, not working yet
Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am 2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/Makefile.am 2013-01-17 00:53:11 UTC (rev 25806)
@@ -31,6 +31,7 @@
gnunet_consensus_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/consensus/libgnunetconsensus.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la \
$(GN_LIBINTL)
gnunet_consensus_DEPENDENCIES = \
libgnunetconsensus.la
@@ -53,10 +54,12 @@
$(GN_LIBINTL)
gnunet_service_consensus_SOURCES = \
- gnunet-service-consensus.c
+ gnunet-service-consensus.c \
+ ibf.c
gnunet_service_consensus_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(GN_LIBINTL)
Modified: gnunet/src/consensus/consensus.h
===================================================================
--- gnunet/src/consensus/consensus.h 2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/consensus.h 2013-01-17 00:53:11 UTC (rev 25806)
@@ -71,6 +71,10 @@
*/
struct GNUNET_MessageHeader header;
+ uint32_t group_id;
+
+ uint32_t num_elements;
+
uint16_t num_peers;
/** PeerIdentity[num_peers] */
Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c 2013-01-16 17:21:37 UTC (rev
25805)
+++ gnunet/src/consensus/consensus_api.c 2013-01-17 00:53:11 UTC (rev
25806)
@@ -54,11 +54,6 @@
struct GNUNET_MessageHeader *msg;
/**
- * Size of the message in msg.
- */
- size_t size;
-
- /**
* Will be called after transmit, if not NULL
*/
GNUNET_CONSENSUS_InsertDoneCallback idc;
@@ -154,7 +149,7 @@
* @param consensus consensus handle
*/
static void
-schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
+send_next (struct GNUNET_CONSENSUS_Handle *consensus);
/**
@@ -168,16 +163,17 @@
* @param buf where the callee should write the message
* @return number of bytes written to buf
*/
-static size_t transmit_queued (void *cls, size_t size,
- void *buf)
+static size_t
+transmit_queued (void *cls, size_t size,
+ void *buf)
{
struct GNUNET_CONSENSUS_Handle *consensus;
struct QueuedMessage *qmsg;
- size_t ret_size;
+ size_t msg_size;
- printf("transmitting queued\n");
+ consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+ consensus->th = NULL;
- consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
qmsg = consensus->messages_head;
GNUNET_CONTAINER_DLL_remove (consensus->messages_head,
consensus->messages_tail, qmsg);
GNUNET_assert (qmsg);
@@ -188,10 +184,14 @@
{
qmsg->idc (qmsg->idc_cls, GNUNET_YES);
}
+ return 0;
}
- memcpy (buf, qmsg->msg, qmsg->size);
- ret_size = qmsg->size;
+ msg_size = ntohs (qmsg->msg->size);
+
+ GNUNET_assert (size >= msg_size);
+
+ memcpy (buf, qmsg->msg, msg_size);
if (NULL != qmsg->idc)
{
qmsg->idc (qmsg->idc_cls, GNUNET_YES);
@@ -199,9 +199,9 @@
GNUNET_free (qmsg->msg);
GNUNET_free (qmsg);
- schedule_transmit (consensus);
+ send_next (consensus);
- return ret_size;
+ return msg_size;
}
@@ -211,7 +211,7 @@
* @param consensus consensus handle
*/
static void
-schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
+send_next (struct GNUNET_CONSENSUS_Handle *consensus)
{
if (NULL != consensus->th)
return;
@@ -219,9 +219,10 @@
if (NULL != consensus->messages_head)
{
LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
consensus->messages_head->size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_queued,
consensus);
+ consensus->th =
+ GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs
(consensus->messages_head->msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO, &transmit_queued,
consensus);
}
}
@@ -270,8 +271,7 @@
struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
{
GNUNET_assert (NULL != consensus->conclude_cb);
- consensus->conclude_cb (consensus->conclude_cls,
- 0, NULL);
+ consensus->conclude_cb (consensus->conclude_cls, NULL);
consensus->conclude_cb = NULL;
}
@@ -356,7 +356,7 @@
consensus->peers,
consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
- schedule_transmit (consensus);
+ send_next (consensus);
GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
GNUNET_TIME_UNIT_FOREVER_REL);
@@ -454,13 +454,12 @@
qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
- qmsg->size = element_msg_size;
qmsg->idc = idc;
qmsg->idc_cls = idc_cls;
GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head,
consensus->messages_tail, qmsg);
- schedule_transmit (consensus);
+ send_next (consensus);
}
@@ -500,11 +499,10 @@
qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
- qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head,
consensus->messages_tail, qmsg);
- schedule_transmit (consensus);
+ send_next (consensus);
}
Added: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h (rev 0)
+++ gnunet/src/consensus/consensus_protocol.h 2013-01-17 00:53:11 UTC (rev
25806)
@@ -0,0 +1,71 @@
+/*
+ This file is part of GNUnet
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+
+/**
+ * @file consensus/consensus_protocol.h
+ * @brief p2p message definitions for consensus
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_PROTOCOL_H
+#define GNUNET_CONSENSUS_PROTOCOL_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct StrataMessage
+{
+ struct GNUNET_MessageHeader header;
+ /**
+ * Number of strata in this estimator.
+ */
+ uint16_t num_strata;
+ /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */
+ /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */
+ /* uint8_t count_buckets[ibf_size*num_strata] */
+};
+
+struct DifferenceDigest
+{
+
+ struct GNUNET_MessageHeader header;
+};
+
+struct Element
+{
+ struct GNUNET_MessageHeader header;
+};
+
+struct ConsensusHello
+{
+ struct GNUNET_MessageHeader header;
+ struct GNUNET_HashCode global_id;
+ uint8_t round;
+};
+
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif
Modified: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-16 17:21:37 UTC
(rev 25805)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-01-17 00:53:11 UTC
(rev 25806)
@@ -147,9 +147,6 @@
NULL,
test_master,
NULL);
-
-
- printf("hello there!\n");
}
Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c 2013-01-16 17:21:37 UTC (rev
25805)
+++ gnunet/src/consensus/gnunet-consensus.c 2013-01-17 00:53:11 UTC (rev
25806)
@@ -20,224 +20,308 @@
/**
* @file consensus/gnunet-consensus.c
- * @brief
+ * @brief profiling tool for gnunet-consensus
* @author Florian Dold
*/
#include "platform.h"
+#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#include "gnunet_consensus_service.h"
+#include "gnunet_testbed_service.h"
+static unsigned int num_peers = 2;
+static unsigned int replication = 1;
-/**
- * Handle to the consensus service
- */
-static struct GNUNET_CONSENSUS_Handle *consensus;
-/**
- * Session id
- */
-static char *session_id_str;
+static unsigned int num_values = 5;
-/**
- * File handle to STDIN
- */
-static struct GNUNET_DISK_FileHandle *stdin_fh;
+static struct GNUNET_TIME_Relative conclude_timeout;
+static struct GNUNET_CONSENSUS_Handle **consensus_handles;
+
+static unsigned int num_connected_handles;
+
+static struct GNUNET_TESTBED_Peer **peers;
+
+static struct GNUNET_PeerIdentity *peer_ids;
+
+static unsigned int num_retrieved_peer_ids;
+
+static struct GNUNET_HashCode session_id;
+
+
/**
- * Task for reading from stdin
+ * Signature of the event handler function called by the
+ * respective event controller.
+ *
+ * @param cls closure
+ * @param event information about the event
*/
-static GNUNET_SCHEDULER_TaskIdentifier stdin_tid = GNUNET_SCHEDULER_NO_TASK;
-
-
static void
-stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+controller_cb(void *cls,
+ const struct GNUNET_TESTBED_EventInformation *event)
+{
+ GNUNET_assert (0);
+}
/**
* Called when a conclusion was successful.
*
* @param cls
- * @param num_peers_in_consensus
- * @param peers_in_consensus
+ * @param group
+ * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if
not
*/
+static int
+conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group)
+{
+ return GNUNET_NO;
+}
+
+
+
static void
-conclude_cb (void *cls,
- unsigned int consensus_group_count,
- const struct GNUNET_CONSENSUS_Group *groups)
+generate_indices (int *indices)
{
- printf("reached conclusion\n");
- GNUNET_SCHEDULER_shutdown ();
+ int j;
+ j = 0;
+ while (j < replication)
+ {
+ int n;
+ int k;
+ int repeat;
+ n = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, num_peers);
+ repeat = GNUNET_NO;
+ for (k = 0; k < j; k++)
+ if (indices[k] == n)
+ {
+ repeat = GNUNET_YES;
+ break;
+ }
+ if (GNUNET_NO == repeat)
+ indices[j++] = n;
+ }
}
static void
-insert_done_cb (void *cls,
- int success)
+do_consensus ()
{
- struct GNUNET_CONSENSUS_Element *element = cls;
+ int unique_indices[replication];
+ int i;
- GNUNET_free (element);
- if (GNUNET_YES != success)
+ for (i = 0; i < num_values; i++)
{
- printf ("insert failed\n");
- GNUNET_SCHEDULER_shutdown ();
- return;
+ int j;
+ struct GNUNET_HashCode *val;
+ struct GNUNET_CONSENSUS_Element *element;
+ generate_indices(unique_indices);
+
+ val = GNUNET_malloc (sizeof *val);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, val);
+
+ element = GNUNET_malloc (sizeof *element);
+ element->data = val;
+ element->size = sizeof *val;
+
+ for (j = 0; j < replication; j++)
+ {
+ int cid;
+ cid = unique_indices[j];
+ GNUNET_CONSENSUS_insert (consensus_handles[cid], element, NULL, NULL);
+ }
}
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == stdin_tid);
- stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
stdin_fh,
- &stdin_cb, NULL);
+
+ for (i = 0; i < num_peers; i++)
+ GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0,
conclude_cb, consensus_handles[i]);
}
/**
- * Called whenever we can read stdin non-blocking
+ * Callback to be called when a service connect operation is completed
*
- * @param cls unused
- * @param tc scheduler context
+ * @param cls the callback closure from functions generating an operation
+ * @param op the operation that has been finished
+ * @param ca_result the service handle returned from
GNUNET_TESTBED_ConnectAdapter()
+ * @param emsg error message in case the operation has failed; will be NULL if
+ * operation has executed successfully.
*/
static void
-stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+connect_complete (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ void *ca_result,
+ const char *emsg)
{
- char buf[1024];
- char *ret;
- struct GNUNET_CONSENSUS_Element *element;
+ struct GNUNET_CONSENSUS_Handle **chp;
- stdin_tid = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return; /* we're done here */
- ret = fgets (buf, 1024, stdin);
- if (NULL == ret)
+ if (NULL != emsg)
{
- if (feof (stdin))
- {
- printf ("concluding ...\n");
- GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0,
conclude_cb, NULL);
- }
- return;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "testbed connect emsg: %s\n", emsg);
+ GNUNET_assert (0);
}
- printf("read: %s", buf);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect complete\n");
- element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) +
strlen(buf) + 1);
- element->type = 0;
- element->size = strlen(buf) + 1;
- element->data = &element[1];
- strcpy ((char *) &element[1], buf);
- GNUNET_CONSENSUS_insert (consensus, element, &insert_done_cb, element);
+ chp = (struct GNUNET_CONSENSUS_Handle **) cls;
+ *chp = (struct GNUNET_CONSENSUS_Handle *) ca_result;
+ num_connected_handles++;
+
+ if (num_connected_handles == num_peers)
+ {
+ do_consensus ();
+ }
}
+static int
+new_element_cb (void *cls,
+ struct GNUNET_CONSENSUS_Element *element)
+{
+ return GNUNET_YES;
+}
+
+
/**
- * Called when a new element was received from another peer, or an error
occured.
+ * Adapter function called to establish a connection to
+ * a service.
*
- * May deliver duplicate values.
- *
- * Elements given to a consensus operation by the local peer are NOT given
- * to this callback.
- *
* @param cls closure
- * @param element new element, NULL on error
- * @return GNUNET_OK if the valid is well-formed and should be added to the
consensus,
- * GNUNET_SYSERR if the element should be ignored and not be propagated
+ * @param cfg configuration of the peer to connect to; will be available until
+ * GNUNET_TESTBED_operation_done() is called on the operation returned
+ * from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
*/
-static int
-cb (void *cls,
- struct GNUNET_CONSENSUS_Element *element)
+static void *
+connect_adapter (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- if (NULL == element)
- {
- printf("error receiving from consensus\n");
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_NO;
- }
- printf("got element\n");
- return GNUNET_YES;
+ struct GNUNET_CONSENSUS_Handle *consensus;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "connect adapter, %d peers\n",
num_peers);
+ consensus = GNUNET_CONSENSUS_create (cfg, num_peers, peer_ids, &session_id,
new_element_cb, NULL);
+ GNUNET_assert (NULL != consensus);
+ return consensus;
}
/**
- * Function run on shutdown to clean up.
+ * Adapter function called to destroy a connection to
+ * a service.
*
- * @param cls the statistics handle
- * @param tc scheduler context
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
*/
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+disconnect_adapter(void *cls, void *op_result)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
- if (NULL != consensus)
- {
- GNUNET_CONSENSUS_destroy (consensus);
- consensus = NULL;
- }
+ /* FIXME: what to do here? */
}
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed; will be NULL if the
+ * operation is successfull
+ */
static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+peer_info_cb (void *cb_cls,
+ struct GNUNET_TESTBED_Operation *op,
+ const struct GNUNET_TESTBED_PeerInformation *pinfo,
+ const char *emsg)
{
- struct GNUNET_HashCode sid;
- struct GNUNET_PeerIdentity *pids;
- int count;
+ struct GNUNET_PeerIdentity *p;
int i;
- if (NULL == session_id_str)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing
-s/--session-id)\n");
- return;
- }
+ GNUNET_assert (NULL == emsg);
- GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid);
+ p = (struct GNUNET_PeerIdentity *) cb_cls;
- for (count = 0; NULL != args[count]; count++);
-
- if (0 != count)
- {
- pids = GNUNET_malloc (count * sizeof (struct GNUNET_PeerIdentity));
+ if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
+ {
+ *p = *pinfo->result.id;
+ num_retrieved_peer_ids++;
+ if (num_retrieved_peer_ids == num_peers)
+ for (i = 0; i < num_peers; i++)
+ GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus",
connect_complete, &consensus_handles[i],
+ connect_adapter, disconnect_adapter,
NULL);
}
else
{
- pids = NULL;
+ GNUNET_assert (0);
}
+}
- for (i = 0; i < count; i++)
- {
- int ret;
- ret = GNUNET_CRYPTO_hash_from_string (args[i], &pids[i].hashPubKey);
- if (GNUNET_OK != ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "peer identity '%s' is
malformed\n", args[i]);
- return;
- }
- }
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
- &shutdown_task, NULL);
-
- consensus =
- GNUNET_CONSENSUS_create (cfg,
- count, pids,
- &sid,
- &cb, NULL);
+static void
+test_master (void *cls,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **started_peers)
+{
+ int i;
- stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
- stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
stdin_fh,
- &stdin_cb, NULL);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
+
+ peers = started_peers;
+
+ peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+
+ consensus_handles = GNUNET_malloc (num_peers * sizeof (struct
ConsensusHandle *));
+
+ for (i = 0; i < num_peers; i++)
+ GNUNET_TESTBED_peer_get_information (peers[i],
+ GNUNET_TESTBED_PIT_IDENTITY,
+ peer_info_cb,
+ &peer_ids[i]);
}
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ static char *session_str = "gnunet-consensus/test";
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "running gnunet-consensus\n");
+
+ GNUNET_CRYPTO_hash (session_str, strlen(session_str), &session_id);
+
+ (void) GNUNET_TESTBED_test_run ("gnunet-consensus",
+ cfgfile,
+ num_peers,
+ 0,
+ controller_cb,
+ NULL,
+ test_master,
+ NULL);
+}
+
+
int
main (int argc, char **argv)
{
static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- { 's', "session-id", "ID",
- gettext_noop ("session identifier"),
- GNUNET_YES, &GNUNET_GETOPT_set_string, &session_id_str },
- GNUNET_GETOPT_OPTION_END
- };
- GNUNET_PROGRAM_run (argc, argv, "gnunet-consensus",
+ { 'n', "num-peers", NULL,
+ gettext_noop ("number of peers in consensus"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers },
+ { 'k', "value-replication", NULL,
+ gettext_noop ("how many peers receive one value?"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication },
+ { 'x', "num-values", NULL,
+ gettext_noop ("number of values"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_values },
+ { 't', "timeout", NULL,
+ gettext_noop ("consensus timeout"),
+ GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &conclude_timeout },
+ GNUNET_GETOPT_OPTION_END
+ };
+ conclude_timeout = GNUNET_TIME_UNIT_SECONDS;
+ GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
"help",
- options, &run, NULL);
+ options, &run, NULL, GNUNET_YES);
return 0;
}
+
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2013-01-16 17:21:37 UTC
(rev 25805)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2013-01-17 00:53:11 UTC
(rev 25806)
@@ -32,16 +32,46 @@
#include "gnunet_util_lib.h"
#include "gnunet_consensus_service.h"
#include "gnunet_core_service.h"
-#include "gnunet_mesh_service.h"
+#include "gnunet_stream_lib.h"
+#include "consensus_protocol.h"
+#include "ibf.h"
#include "consensus.h"
+/**
+ * Number of IBFs in a strata estimator.
+ */
+#define STRATA_COUNT 32
+/**
+ * Number of buckets per IBF.
+ */
+#define STRATA_IBF_BUCKETS 80
+/**
+ * hash num parameter of the IBF
+ */
+#define STRATA_HASH_NUM 3
+/**
+ * Number of strata that can be transmitted in one message.
+ */
+#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
+
+
+
+/* forward declarations */
+
struct ConsensusSession;
+struct IncomingSocket;
static void
send_next (struct ConsensusSession *session);
+static void
+write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct
ConsensusSession *session);
+
+
/**
* An element that is waiting to be transmitted to a client.
*/
@@ -63,25 +93,78 @@
struct GNUNET_CONSENSUS_Element *element;
};
-
-/*
- * A peer that is also in a consensus session.
- * Note that 'this' peer is not in the list.
- */
-struct ConsensusPeer
+struct ConsensusPeerInformation
{
- struct GNUNET_PeerIdentity *peer_id;
+ struct GNUNET_STREAM_Socket *socket;
/**
- * Incoming tunnel from the peer.
+ * Is socket's connection established, i.e. can we write to it?
+ * Only relevent on outgoing cpi.
*/
- struct GNUNET_MESH_Tunnel *incoming_tunnel;
+ int is_connected;
- struct InvertibleBloomFilter *last_ibf;
+ /**
+ * Type of the peer in the all-to-all rounds,
+ * GNUNET_YES if we initiate reconciliation.
+ */
+ int is_outgoing;
+ /**
+ * Did we receive/send a consensus hello?
+ */
+ int hello;
+
+ /**
+ * Handle for currently active read
+ */
+ struct GNUNET_STREAM_ReadHandle *rh;
+
+ /**
+ * Handle for currently active read
+ */
+ struct GNUNET_STREAM_WriteHandle *wh;
+
+ /**
+ * How many of the strate in the ibf were
+ * sent or received in this round?
+ */
+ int strata_counter;
+
+ struct InvertibleBloomFilter *my_ibf;
+
+ int my_ibf_bucket_counter;
+
+ struct InvertibleBloomFilter *peer_ibf;
+
+ int peer_ibf_bucket_counter;
+
+ /**
+ * Strata estimator of the peer, NULL if our peer
+ * initiated the reconciliation.
+ */
+ struct InvertibleBloomFilter **strata;
+
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ struct ConsensusSession *session;
};
+struct QueuedMessage
+{
+ struct GNUNET_MessageHeader *msg;
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *next;
+
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *prev;
+};
+
+
/**
* A consensus session consists of one local client and the remote authorities.
*/
@@ -98,15 +181,17 @@
struct ConsensusSession *prev;
/**
- * Local consensus identification, chosen by clients.
+ * Join message. Used to initialize the session later,
+ * if the identity of the local peer is not yet known.
+ * NULL if the session has been fully initialized.
*/
- struct GNUNET_HashCode *local_id;
-
+ struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+
/**
* Global consensus identification, computed
* from the local id and participating authorities.
*/
- struct GNUNET_HashCode *global_id;
+ struct GNUNET_HashCode global_id;
/**
* Local client in this consensus session.
@@ -140,6 +225,10 @@
*/
struct PendingElement *approval_pending_tail;
+ struct QueuedMessage *client_messages_head;
+
+ struct QueuedMessage *client_messages_tail;
+
/**
* Currently active transmit handle for sending to the client
*/
@@ -152,11 +241,6 @@
int conclude_requested;
/**
- * Client has been informed about the conclusion.
- */
- int conclude_sent;
-
- /**
* Minimum number of peers to form a consensus group
*/
int conclude_group_min;
@@ -178,30 +262,74 @@
*/
unsigned int num_peers;
- /**
- * Other peers in the consensus, array of ConsensusPeer
- */
- struct ConsensusPeer *peers;
+ struct ConsensusPeerInformation *info;
/**
- * Tunnel for broadcasting to all other authorities
+ * Sorted array of peer identities in this consensus session,
+ * includes the local peer.
*/
- struct GNUNET_MESH_Tunnel *broadcast_tunnel;
+ struct GNUNET_PeerIdentity *peers;
/**
- * Time limit for one round of pairwise exchange.
- * FIXME: should not actually be a constant
+ * Index of the local peer in the peers array
*/
- struct GNUNET_TIME_Relative round_time;
+ int local_peer_idx;
/**
* Task identifier for the round timeout task
*/
GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
+
+ struct InvertibleBloomFilter **strata;
};
/**
+ * Sockets from other peers who want to communicate with us.
+ * It may not be known yet which consensus session they belong to.
+ */
+struct IncomingSocket
+{
+ /**
+ * Incoming sockets are kept in a double linked list.
+ */
+ struct IncomingSocket *next;
+
+ /**
+ * Incoming sockets are kept in a double linked list.
+ */
+ struct IncomingSocket *prev;
+
+ /**
+ * The actual socket.
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Handle for currently active read
+ */
+ struct GNUNET_STREAM_ReadHandle *rh;
+
+ /**
+ * Peer that connected to us with the socket.
+ */
+ struct GNUNET_PeerIdentity *peer;
+
+ /**
+ * Message stream tokenizer for this socket.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ /**
+ * Peer-in-session this socket belongs to, once known, otherwise NULL.
+ */
+ struct ConsensusPeerInformation *cpi;
+};
+
+static struct IncomingSocket *incoming_sockets_head;
+static struct IncomingSocket *incoming_sockets_tail;
+
+/**
* Linked list of sesstions this peer participates in.
*/
static struct ConsensusSession *sessions_head;
@@ -222,32 +350,349 @@
static struct GNUNET_SERVER_Handle *srv;
/**
- * Peer that runs this service
+ * Peer that runs this service.
*/
static struct GNUNET_PeerIdentity *my_peer;
/**
- * Handle to the mesh service.
+ * Handle to the core service. Only used during service startup, will be NULL
after that.
*/
-static struct GNUNET_MESH_Handle *mesh;
+static struct GNUNET_CORE_Handle *core;
/**
- * Handle to the core service. Only used during service startup, will be NULL
after that.
+ * Listener for sockets from peers that want to reconcile with us.
*/
-static struct GNUNET_CORE_Handle *core;
+static struct GNUNET_STREAM_ListenSocket *listener;
+
+static int
+estimate_difference (struct InvertibleBloomFilter** strata1,
+ struct InvertibleBloomFilter** strata2)
+{
+ int i;
+ int count;
+ count = 0;
+ for (i = STRATA_COUNT - 1; i >= 0; i--)
+ {
+ struct InvertibleBloomFilter *diff;
+ int ibf_count;
+ int more;
+ ibf_count = 0;
+ diff = ibf_dup (strata1[i]);
+ ibf_subtract (diff, strata2[i]);
+ for (;;)
+ {
+ more = ibf_decode (diff, NULL, NULL);
+ if (GNUNET_NO == more)
+ {
+ count += ibf_count;
+ break;
+ }
+ if (GNUNET_SYSERR == more)
+ {
+ return count * (1 << (i + 1));
+ }
+ ibf_count++;
+ }
+ ibf_destroy (diff);
+ }
+ return count;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on
timeout
+ * @return number of bytes of processed from 'data' (any data remaining should
be
+ * given to the next time the read processor is called).
+ */
+static size_t
+stream_data_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct IncomingSocket *incoming;
+ int ret;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ incoming = (struct IncomingSocket *) cls;
+
+ ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size,
GNUNET_NO, GNUNET_NO);
+ if (GNUNET_SYSERR == ret)
+ {
+ /* FIXME: handle this correctly */
+ GNUNET_assert (0);
+ }
+
+ /* read again */
+ incoming->rh = GNUNET_STREAM_read (incoming->socket,
GNUNET_TIME_UNIT_FOREVER_REL,
+ &stream_data_processor, incoming);
+
+ /* we always read all data */
+ return size;
+}
+
+static int
+handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct
StrataMessage *strata_msg)
+{
+ int i;
+ int num_strata;
+ struct GNUNET_HashCode *hash_src;
+ uint8_t *count_src;
+
+ GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
+
+ if (NULL == cpi->strata)
+ {
+ cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct
InvertibleBloomFilter *));
+ for (i = 0; i < STRATA_COUNT; i++)
+ cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
+ }
+
+ num_strata = ntohs (strata_msg->num_strata);
+
+ /* for correct message alignment, copy bucket types seperately */
+ hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
+
+ for (i = 0; i < num_strata; i++)
+ {
+ memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src,
STRATA_IBF_BUCKETS * sizeof *hash_src);
+ hash_src += STRATA_IBF_BUCKETS;
+ }
+
+ for (i = 0; i < num_strata; i++)
+ {
+ memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src,
STRATA_IBF_BUCKETS * sizeof *hash_src);
+ hash_src += STRATA_IBF_BUCKETS;
+ }
+
+ count_src = (uint8_t *) hash_src;
+
+ for (i = 0; i < num_strata; i++)
+ {
+ uint8_t zero[STRATA_IBF_BUCKETS];
+ memset (zero, 0, STRATA_IBF_BUCKETS);
+ memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src,
STRATA_IBF_BUCKETS);
+ count_src += STRATA_IBF_BUCKETS;
+ }
+
+ GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) +
STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
+
+ cpi->strata_counter += num_strata;
+
+ if (STRATA_COUNT == cpi->strata_counter)
+ {
+ int diff;
+ diff = estimate_difference (cpi->session->strata, cpi->strata);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
+ }
+
+ return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct
DifferenceDigest *strata)
+{
+ return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element
*strata)
+{
+ return GNUNET_YES;
+}
+
+
+static int
+handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello
*hello)
+{
+ struct ConsensusSession *session;
+ session = sessions_head;
+ while (NULL != session)
+ {
+ if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
+ {
+ int idx;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
+ idx = get_peer_idx (inc->peer, session);
+ GNUNET_assert (-1 != idx);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
+ inc->cpi = &session->info[idx];
+ GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
+ inc->cpi->mst = inc->mst;
+ inc->cpi->hello = GNUNET_YES;
+ inc->cpi->socket = inc->socket;
+ return GNUNET_YES;
+ }
+ session = session->next;
+ }
+ GNUNET_assert (0);
+ return GNUNET_NO;
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mst_session_callback (void *cls, void *client, const struct
GNUNET_MessageHeader *message)
+{
+ struct ConsensusPeerInformation *cpi;
+ cpi = (struct ConsensusPeerInformation *) cls;
+ switch (ntohs( message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
+ return handle_p2p_strata (cpi, (struct StrataMessage *) message);
+ case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
+ return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
+ case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
+ return handle_p2p_element (cpi, (struct Element *) message);
+ default:
+ /* FIXME: handle correctly */
+ GNUNET_assert (0);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle tokenized messages from stream sockets.
+ * Delegate them if the socket belongs to a session,
+ * handle hello messages otherwise.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure, unused
+ * @param client incoming socket this message comes from
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mst_incoming_callback (void *cls, void *client, const struct
GNUNET_MessageHeader *message)
+{
+ struct IncomingSocket *inc;
+ inc = (struct IncomingSocket *) client;
+ switch (ntohs( message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
+ return handle_p2p_hello (inc, (struct ConsensusHello *) message);
+ default:
+ if (NULL != inc->cpi)
+ return mst_session_callback (inc->cpi, client, message);
+ /* FIXME: disconnect peer properly */
+ GNUNET_assert (0);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other
peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ struct IncomingSocket *incoming;
+
+ GNUNET_assert (NULL != socket);
+
+ incoming = GNUNET_malloc (sizeof *incoming);
+
+ incoming->socket = socket;
+ incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
+
+ incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ &stream_data_processor, incoming);
+
+
+ incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
+
+ GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head,
incoming_sockets_tail, incoming);
+
+ return GNUNET_OK;
+}
+
+
static void
+destroy_session (struct ConsensusSession *session)
+{
+ /* FIXME: more stuff to free! */
+ GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+ GNUNET_SERVER_client_drop (session->client);
+ GNUNET_free (session);
+}
+
+
+/**
+ * Disconnect a client, and destroy all sessions associated with it.
+ *
+ * @param client the client to disconnect
+ */
+static void
disconnect_client (struct GNUNET_SERVER_Client *client)
{
+ struct ConsensusSession *session;
GNUNET_SERVER_client_disconnect (client);
- /* FIXME: free data structures that this client owns */
+
+ /* if the client owns a session, remove it */
+ session = sessions_head;
+ while (NULL != session)
+ {
+ if (client == session->client)
+ {
+ destroy_session (session);
+ break;
+ }
+ session = session->next;
+ }
}
+
+/**
+ * Compute a global, (hopefully) unique consensus session id,
+ * from the local id of the consensus session, and the identities of all
participants.
+ * Thus, if the local id of two consensus sessions coincide, but are not
comprised of
+ * exactly the same peers, the global id will be different.
+ *
+ * @param local_id local id of the consensus session
+ * @param peers array of all peers participating in the consensus session
+ * @param num_peers number of elements in the peers array
+ * @param dst where the result is stored, may not be NULL
+ */
static void
-compute_global_id (struct GNUNET_HashCode *dst,
- const struct GNUNET_HashCode *local_id,
- const struct GNUNET_PeerIdentity *peers,
- int num_peers)
+compute_global_id (const struct GNUNET_HashCode *local_id,
+ const struct GNUNET_PeerIdentity *peers, int num_peers,
+ struct GNUNET_HashCode *dst)
{
int i;
struct GNUNET_HashCode tmp;
@@ -263,45 +708,50 @@
}
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_queued (void *cls, size_t size,
+ void *buf)
{
- struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_CONSENSUS_ElementMessage *msg;
struct ConsensusSession *session;
+ struct QueuedMessage *qmsg;
+ size_t msg_size;
session = (struct ConsensusSession *) cls;
- msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
- element = session->transmit_pending_head->element;
-
- GNUNET_assert (NULL != element);
-
session->th = NULL;
- msg->element_type = element->type;
- msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
- msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
element->size);
- memcpy (&msg[1], element->data, element->size);
- session->transmit_pending_head = session->transmit_pending_head->next;
+ qmsg = session->client_messages_head;
+ GNUNET_CONTAINER_DLL_remove (session->client_messages_head,
session->client_messages_tail, qmsg);
+ GNUNET_assert (qmsg);
- send_next (session);
+ if (NULL == buf)
+ {
+ destroy_session (session);
+ return 0;
+ }
- return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
-}
+ msg_size = ntohs (qmsg->msg->size);
+ GNUNET_assert (size >= msg_size);
-static size_t
-transmit_conclude_done (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
+ memcpy (buf, qmsg->msg, msg_size);
+ GNUNET_free (qmsg->msg);
+ GNUNET_free (qmsg);
- msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
- msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
- msg->header.size = htons (sizeof (struct
GNUNET_CONSENSUS_ConcludeDoneMessage));
- msg->num_peers = htons (0);
+ send_next (session);
- return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
+ return msg_size;
}
@@ -313,65 +763,253 @@
static void
send_next (struct ConsensusSession *session)
{
- int msize;
GNUNET_assert (NULL != session);
if (NULL != session->th)
- {
return;
- }
- if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent
== GNUNET_NO))
+ if (NULL != session->client_messages_head)
{
- /* FIXME */
- msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
- session->th =
- GNUNET_SERVER_notify_transmit_ready (session->client, msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
&transmit_conclude_done, session);
- session->conclude_sent = GNUNET_YES;
+ int msize;
+ msize = ntohs (session->client_messages_head->msg->size);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
+ session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
+
GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_queued,
session);
}
- else if (NULL != session->transmit_pending_head)
+}
+
+
+/**
+ * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
+ * the correct signature to be used with e.g. qsort.
+ * We use this function instead.
+ *
+ * @param h1 some hash code
+ * @param h2 some hash code
+ * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
+ */
+static int
+hash_cmp (const void *a, const void *b)
+{
+ return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct
GNUNET_HashCode *) b);
+}
+
+
+/**
+ * Search peer in the list of peers in session.
+ *
+ * @param peer peer to find
+ * @param session session with peer
+ * @return index of peer, -1 if peer is not in session
+ */
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct
ConsensusSession *session)
+{
+ const struct GNUNET_PeerIdentity *needle;
+ needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct
GNUNET_PeerIdentity), &hash_cmp);
+ if (NULL == needle)
+ return -1;
+ return needle - session->peers;
+}
+
+
+
+static void
+hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+ struct ConsensusPeerInformation *cpi;
+
+ cpi = (struct ConsensusPeerInformation *) cls;
+ cpi->hello = GNUNET_YES;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ cpi = (struct ConsensusPeerInformation *) cls;
+
+ if (cpi->session->conclude_requested)
{
- msize = session->transmit_pending_head->element->size + sizeof (struct
GNUNET_CONSENSUS_ElementMessage);
- session->th =
- GNUNET_SERVER_notify_transmit_ready (session->client, msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
&transmit_pending, session);
- /* TODO: insert into ack pending */
+ write_strata (cpi, GNUNET_STREAM_OK, 0);
}
}
/**
- * Method called whenever a peer has disconnected from the tunnel.
- * Implementations of this callback must NOT call
- * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
- * to run in some other task later. However, calling
- * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
+ * Functions of this type will be called when a stream is established
*
- * @param cls closure
- * @param peer peer identity the tunnel stopped working with
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
*/
static void
-disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
+open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
{
- /* FIXME: how do we handle this */
+ struct ConsensusPeerInformation *cpi;
+ struct ConsensusHello *hello;
+
+
+ cpi = (struct ConsensusPeerInformation *) cls;
+ cpi->is_connected = GNUNET_YES;
+
+ hello = GNUNET_malloc (sizeof *hello);
+ hello->header.size = htons (sizeof *hello);
+ hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
+ memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct
GNUNET_HashCode));
+
+
+ cpi->wh =
+ GNUNET_STREAM_write (socket, hello, sizeof *hello,
GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
+
}
+static void
+initialize_session_info (struct ConsensusSession *session)
+{
+ int i;
+ int last;
+
+ for (i = 0; i < session->num_peers; ++i)
+ {
+ /* initialize back-references, so consensus peer information can
+ * be used as closure */
+ session->info[i].session = session;
+
+ }
+
+ last = (session->local_peer_idx + (session->num_peers / 2)) %
session->num_peers;
+ i = (session->local_peer_idx + 1) % session->num_peers;
+ while (i != last)
+ {
+ session->info[i].is_outgoing = GNUNET_YES;
+ session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i],
GNUNET_APPLICATION_TYPE_CONSENSUS,
+ open_cb, &session->info[i],
GNUNET_STREAM_OPTION_END);
+ session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback,
session);
+ i = (i + 1) % session->num_peers;
+ }
+ // tie-breaker for even number of peers
+ if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
+ {
+ session->info[last].is_outgoing = GNUNET_YES;
+ session->info[last].socket = GNUNET_STREAM_open (cfg,
&session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
+ open_cb,
&session->info[last], GNUNET_STREAM_OPTION_END);
+ }
+}
+
+
/**
- * Method called whenever a peer has connected to the tunnel.
+ * Create the sorted list of peers for the session,
+ * add the local peer if not in the join message.
+ */
+static void
+initialize_session_peer_list (struct ConsensusSession *session)
+{
+ int local_peer_in_list;
+ int listed_peers;
+ const struct GNUNET_PeerIdentity *msg_peers;
+ unsigned int i;
+
+ GNUNET_assert (NULL != session->join_msg);
+
+ /* peers in the join message, may or may not include the local peer */
+ listed_peers = ntohs (session->join_msg->num_peers);
+
+ session->num_peers = listed_peers;
+
+ msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
+
+ local_peer_in_list = GNUNET_NO;
+ for (i = 0; i < listed_peers; i++)
+ {
+ if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct
GNUNET_PeerIdentity)))
+ {
+ local_peer_in_list = GNUNET_YES;
+ break;
+ }
+ }
+
+ if (GNUNET_NO == local_peer_in_list)
+ session->num_peers++;
+
+ session->peers = GNUNET_malloc (session->num_peers * sizeof (struct
GNUNET_PeerIdentity));
+
+ if (GNUNET_NO == local_peer_in_list)
+ session->peers[session->num_peers - 1] = *my_peer;
+
+ memcpy (session->peers, msg_peers, listed_peers * sizeof (struct
GNUNET_PeerIdentity));
+ qsort (session->peers, session->num_peers, sizeof (struct
GNUNET_PeerIdentity), &hash_cmp);
+}
+
+
+static void
+strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode
*key)
+{
+ uint32_t v;
+ int i;
+ v = key->bits[0];
+ /* count trailing '1'-bits of v */
+ for (i = 0; v & 1; v>>=1, i++);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
+
+ ibf_insert (strata[i], key);
+}
+
+
+/**
+ * Initialize the session, continue receiving messages from the owning client
*
- * @param cls closure
- * @param peer peer identity the tunnel was created to, NULL on timeout
- * @param atsi performance data for the connection
+ * @param session the session to initialize
*/
static void
-connect_handler (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_ATS_Information *atsi)
+initialize_session (struct ConsensusSession *session)
{
- /* not much we can do here, now we know the other peer has been added to our
broadcast tunnel */
+ const struct ConsensusSession *other_session;
+ int i;
+
+ GNUNET_assert (NULL != session->join_msg);
+
+ initialize_session_peer_list (session);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n",
session->num_peers);
+
+ compute_global_id (&session->join_msg->session_id, session->peers,
session->num_peers, &session->global_id);
+
+ /* Check if some local client already owns the session. */
+ other_session = sessions_head;
+ while (NULL != other_session)
+ {
+ if ((other_session != session) &&
+ (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
&other_session->global_id)))
+ {
+ /* session already owned by another client */
+ GNUNET_break (0);
+ disconnect_client (session->client);
+ return;
+ }
+ other_session = other_session->next;
+ }
+
+ session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
+
+ session->local_peer_idx = get_peer_idx (my_peer, session);
+ GNUNET_assert (-1 != session->local_peer_idx);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n",
session->local_peer_idx);
+
+ session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct
InvertibleBloomFilter *));
+ for (i = 0; i < STRATA_COUNT; i++)
+ session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
+
+ session->info = GNUNET_malloc (session->num_peers * sizeof (struct
ConsensusPeerInformation));
+
+ initialize_session_info (session);
+
+ GNUNET_free (session->join_msg);
+ session->join_msg = NULL;
+
+ GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s
(&session->global_id));
}
@@ -387,88 +1025,49 @@
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
- struct GNUNET_HashCode global_id;
- const struct GNUNET_CONSENSUS_JoinMessage *msg;
struct ConsensusSession *session;
- unsigned int i;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n");
- msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s
(&msg->session_id));
-
- compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity
*) &m[1], msg->num_peers);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s
(&global_id));
-
+ // make sure the client has not already joined a session
session = sessions_head;
while (NULL != session)
{
- if (client == session->client)
+ if (session->client == client)
{
-
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
+ GNUNET_break (0);
disconnect_client (client);
return;
}
- if (0 == memcmp (session->global_id, &global_id, sizeof (struct
GNUNET_HashCode)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another
client\n");
- disconnect_client (client);
- return;
- }
session = session->next;
}
- GNUNET_SERVER_client_keep (client);
-
- /* session does not exist yet, create it */
session = GNUNET_malloc (sizeof (struct ConsensusSession));
- session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct
GNUNET_HashCode));
- session->global_id = GNUNET_memdup (&global_id, sizeof (struct
GNUNET_HashCode));
- session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
+ session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *)
GNUNET_copy_message (m);
session->client = client;
- /* FIXME: should not be a constant, but chosen adaptively */
- session->round_time = GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 5);
+ GNUNET_SERVER_client_keep (client);
- session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session,
connect_handler, disconnect_handler, session);
+ GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
- session->num_peers = 0;
-
- /* count the peers that are not the local peer */
- for (i = 0; i < msg->num_peers; i++)
+ // Initialize session later if local peer identity is not known yet.
+ if (NULL == my_peer)
{
- struct GNUNET_PeerIdentity *peers;
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
- if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
- session->num_peers++;
+ GNUNET_SERVER_disable_receive_done_warning (client);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
+ return;
}
- session->peers = GNUNET_malloc (session->num_peers * sizeof (struct
ConsensusPeer));
-
- /* copy the peer identities and add peers to broadcast tunnel */
- for (i = 0; i < msg->num_peers; i++)
- {
- struct GNUNET_PeerIdentity *peers;
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
- if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
- {
- *session->peers->peer_id = peers[i];
- GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel,
&peers[i]);
- }
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
-
- GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
-
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
+ initialize_session (session);
}
/**
* Called when a client performs an insert operation.
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
*/
void
client_insert (void *cls,
@@ -510,39 +1109,155 @@
GNUNET_CRYPTO_hash (element, element_size, &key);
GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ strata_insert (session->strata, &key);
+
GNUNET_SERVER_receive_done (client, GNUNET_OK);
send_next (session);
}
+
/**
- * Do one round of the conclusion.
- * Start by broadcasting the set difference estimator (IBF strata).
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
*
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
*/
-void
-conclude_do_round (struct ConsensusSession *session)
+static void
+write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
{
- /* FIXME */
+ struct ConsensusPeerInformation *cpi;
+ struct StrataMessage *strata_msg;
+ size_t msize;
+ int i;
+ struct GNUNET_HashCode *hash_dst;
+ uint8_t *count_dst;
+ int num_strata;
+
+ cpi = (struct ConsensusPeerInformation *) cls;
+
+ GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
+
+ /* FIXME: handle this */
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ if (STRATA_COUNT == cpi->strata_counter)
+ {
+ /* strata have been written, wait for other side's IBF */
+ return;
+ }
+
+ if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
+ num_strata = (STRATA_COUNT - cpi->strata_counter);
+ else
+ num_strata = STRATA_PER_MESSAGE;
+
+
+ msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE *
STRATA_IBF_BUCKETS);
+
+ strata_msg = GNUNET_malloc (msize);
+ strata_msg->header.size = htons (msize);
+ strata_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
+ strata_msg->num_strata = htons (num_strata);
+
+ /* for correct message alignment, copy bucket types seperately */
+ hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
+
+ for (i = 0; i < num_strata; i++)
+ {
+ memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum,
STRATA_IBF_BUCKETS * sizeof *hash_dst);
+ hash_dst += STRATA_IBF_BUCKETS;
+ }
+
+ for (i = 0; i < num_strata; i++)
+ {
+ memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum,
STRATA_IBF_BUCKETS * sizeof *hash_dst);
+ hash_dst += STRATA_IBF_BUCKETS;
+ }
+
+ count_dst = (uint8_t *) hash_dst;
+
+ for (i = 0; i < num_strata; i++)
+ {
+ memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count,
STRATA_IBF_BUCKETS);
+ count_dst += STRATA_IBF_BUCKETS;
+ }
+
+ cpi->strata_counter += num_strata;
+
+ cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize,
GNUNET_TIME_UNIT_FOREVER_REL,
+ write_strata, cpi);
+
+ GNUNET_assert (NULL != cpi->wh);
}
/**
- * Cancel the current round if necessary, decide to run another round or
- * terminate.
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
*/
-void
-conclude_round_done (struct ConsensusSession *session)
+static void
+write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
{
- /* FIXME */
+ struct ConsensusPeerInformation *cpi;
+
+ cpi = (struct ConsensusPeerInformation *) cls;
}
/**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
+ */
+static void
+write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+ struct ConsensusPeerInformation *cpi;
+
+ cpi = (struct ConsensusPeerInformation *) cls;
+}
+
+
+/**
* Called when a client performs the conclude operation.
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
*/
void
client_conclude (void *cls,
@@ -550,40 +1265,55 @@
const struct GNUNET_MessageHeader *message)
{
struct ConsensusSession *session;
+ int i;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
session = sessions_head;
while ((session != NULL) && (session->client != client))
- {
session = session->next;
- }
if (NULL == session)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
+ /* client not found */
+ GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
return;
}
if (GNUNET_YES == session->conclude_requested)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude
twice\n");
- GNUNET_SERVER_client_disconnect (client);
+ /* client requested conclude twice */
+ GNUNET_break (0);
+ disconnect_client (client);
return;
}
session->conclude_requested = GNUNET_YES;
- conclude_do_round (session);
+ /* FIXME: write to already connected sockets */
+ for (i = 0; i < session->num_peers; i++)
+ {
+ if ( (GNUNET_YES == session->info[i].is_outgoing) &&
+ (GNUNET_YES == session->info[i].hello) )
+ {
+ /* kick off transmitting strata by calling the write continuation */
+ write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
+ }
+ }
+
+
GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
send_next (session);
}
/**
* Called when a client sends an ack
+ *
+ * @param cls (unused)
+ * @param client client handle
+ * @param message message sent by the client
*/
void
client_ack (void *cls,
@@ -614,72 +1344,24 @@
struct GNUNET_CORE_Handle *core,
const struct GNUNET_PeerIdentity *peer)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
- {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
- {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
- sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
- {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
- sizeof (struct GNUNET_CONSENSUS_AckMessage)},
- {NULL, NULL, 0, 0}
- };
+ struct ConsensusSession *session;
- GNUNET_SERVER_add_handlers (srv, handlers);
my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
/* core can't be disconnected directly in the core startup callback,
schedule a task to do it! */
GNUNET_SCHEDULER_add_now (&disconnect_core, core);
GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
-}
-
-
-/**
- * Method called whenever another peer has added us to a tunnel
- * the other peer initiated.
- * Only called (once) upon reception of data with a message type which was
- * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
- * causes te tunnel to be ignored and no further notifications are sent about
- * the same tunnel.
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- * (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_ATS_Information *atsi)
-{
- /* there's nothing we can do here, as we don't have the global consensus id
yet */
- return NULL;
+ session = sessions_head;
+ while (NULL != session)
+ {
+ if (NULL != session->join_msg)
+ initialize_session (session);
+ session = session->next;
+ }
}
/**
- * Function called whenever an inbound tunnel is destroyed. Should clean up
- * any associated state. This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- * with the tunnel is stored
- */
-static void
-cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
-{
- /* FIXME: what to do here? */
-}
-
-
-
-/**
* Called to clean up, after a shutdown has been requested.
*
* @param cls closure
@@ -689,109 +1371,31 @@
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- /* mesh requires all the tunnels to be destroyed manually */
while (NULL != sessions_head)
{
struct ConsensusSession *session;
session = sessions_head;
- GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
sessions_head = sessions_head->next;
GNUNET_free (session);
}
- if (NULL != mesh)
- {
- GNUNET_MESH_disconnect (mesh);
- mesh = NULL;
- }
if (NULL != core)
{
GNUNET_CORE_disconnect (core);
core = NULL;
}
-}
+ if (NULL != listener)
+ {
+ GNUNET_STREAM_listen_close (listener);
+ listener = NULL;
+ }
-
-/**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_delta_estimate (void *cls,
- struct GNUNET_MESH_Tunnel * tunnel,
- void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information *atsi)
-{
- /* FIXME */
- return GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
}
/**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_difference_digest (void *cls,
- struct GNUNET_MESH_Tunnel * tunnel,
- void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information *atsi)
-{
- /* FIXME */
- return GNUNET_OK;
-}
-
-
-/**
- * Functions with this signature are called whenever a message is
- * received.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-p2p_elements_and_requests (void *cls,
- struct GNUNET_MESH_Tunnel * tunnel,
- void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information *atsi)
-{
- /* FIXME */
- return GNUNET_OK;
-}
-
-
-/**
* Start processing consensus requests.
*
* @param cls closure
@@ -801,33 +1405,38 @@
static void
run (void *cls, struct GNUNET_SERVER_Handle *server, const struct
GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_CORE_MessageHandler handlers[] = {
+ static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
{NULL, 0, 0}
};
- static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
- {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
- {p2p_difference_digest,
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
- {p2p_elements_and_requests,
GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
- {NULL, 0, 0}
+ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+ {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
+ {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+ {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
+ sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+ {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
+ sizeof (struct GNUNET_CONSENSUS_AckMessage)},
+ {NULL, NULL, 0, 0}
};
- static const GNUNET_MESH_ApplicationType app_types[] = {
- GNUNET_APPLICATION_TYPE_CONSENSUS,
- GNUNET_APPLICATION_TYPE_END
- };
- GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
-
cfg = c;
srv = server;
+ GNUNET_SERVER_add_handlers (server, server_handlers);
+
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
- mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers,
app_types);
- GNUNET_assert (NULL != mesh);
+ listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
+ listen_cb, NULL,
+ GNUNET_STREAM_OPTION_END);
+
+
/* we have to wait for the core_startup callback before proceeding with the
consensus service startup */
- core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL,
GNUNET_NO, NULL, GNUNET_NO, handlers);
+ core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL,
GNUNET_NO, NULL, GNUNET_NO, core_handlers);
GNUNET_assert (NULL != core);
+
+ GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n",
STRATA_PER_MESSAGE);
}
@@ -843,6 +1452,7 @@
{
int ret;
ret = GNUNET_SERVICE_run (argc, argv, "consensus",
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
return (GNUNET_OK == ret) ? 0 : 1;
}
Modified: gnunet/src/consensus/ibf.c
===================================================================
--- gnunet/src/consensus/ibf.c 2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/ibf.c 2013-01-17 00:53:11 UTC (rev 25806)
@@ -25,53 +25,11 @@
* @author Florian Dold
*/
+
#include "ibf.h"
/**
- * Opaque handle to an invertible bloom filter (IBF).
- *
- * An IBF is a counting bloom filter that has the ability to restore
- * the hashes of its stored elements with high probability.
- */
-struct InvertibleBloomFilter
-{
- /**
- * How many cells does this IBF have?
- */
- unsigned int size;
-
- /**
- * In how many cells do we hash one element?
- * Usually 4 or 3.
- */
- unsigned int hash_num;
-
- /**
- * Salt for mingling hashes
- */
- uint32_t salt;
-
- /**
- * How many times has a bucket been hit?
- * Can be negative, as a result of IBF subtraction.
- */
- int8_t *count;
-
- /**
- * xor sums of the elements' hash codes, used to identify the elements.
- */
- struct GNUNET_HashCode *id_sum;
-
- /**
- * xor sums of the "hash of the hash".
- */
- struct GNUNET_HashCode *hash_sum;
-
-};
-
-
-/**
* Create an invertible bloom filter.
*
* @param size number of IBF buckets
@@ -152,6 +110,8 @@
used_buckets[i] = bucket;
ibf->count[bucket] += side;
+
+ GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d
\n", bucket);
GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
&ibf->id_sum[bucket]);
@@ -214,8 +174,6 @@
int i;
GNUNET_assert (NULL != ibf);
- GNUNET_assert (NULL != ret_id);
- GNUNET_assert (NULL != ret_side);
for (i = 0; i < ibf->size; i++)
{
@@ -227,8 +185,10 @@
if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct
GNUNET_HashCode)))
continue;
- *ret_side = ibf->count[i];
- *ret_id = ibf->id_sum[i];
+ if (NULL != ret_side)
+ *ret_side = ibf->count[i];
+ if (NULL != ret_id)
+ *ret_id = ibf->id_sum[i];
/* insert on the opposite side, effectively removing the element */
ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]);
@@ -269,3 +229,36 @@
}
}
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (struct InvertibleBloomFilter *ibf)
+{
+ struct InvertibleBloomFilter *copy;
+ copy = GNUNET_malloc (sizeof *copy);
+ copy->hash_num = ibf->hash_num;
+ copy->salt = ibf->salt;
+ copy->size = ibf->size;
+ copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct
GNUNET_HashCode));
+ copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct
GNUNET_HashCode));
+ copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t));
+ return copy;
+}
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf)
+{
+ GNUNET_free (ibf->hash_sum);
+ GNUNET_free (ibf->id_sum);
+ GNUNET_free (ibf->count);
+ GNUNET_free (ibf);
+}
Modified: gnunet/src/consensus/ibf.h
===================================================================
--- gnunet/src/consensus/ibf.h 2013-01-16 17:21:37 UTC (rev 25805)
+++ gnunet/src/consensus/ibf.h 2013-01-17 00:53:11 UTC (rev 25806)
@@ -39,16 +39,54 @@
#endif
#endif
+/**
+ * Size of one ibf bucket in bytes
+ */
+#define IBF_BUCKET_SIZE (64+64+1)
+
/**
- * Opaque handle to an invertible bloom filter (IBF).
+ * Invertible bloom filter (IBF).
*
* An IBF is a counting bloom filter that has the ability to restore
* the hashes of its stored elements with high probability.
*/
-struct InvertibleBloomFilter;
+struct InvertibleBloomFilter
+{
+ /**
+ * How many cells does this IBF have?
+ */
+ unsigned int size;
+ /**
+ * In how many cells do we hash one element?
+ * Usually 4 or 3.
+ */
+ unsigned int hash_num;
+ /**
+ * Salt for mingling hashes
+ */
+ uint32_t salt;
+
+ /**
+ * xor sums of the elements' hash codes, used to identify the elements.
+ */
+ struct GNUNET_HashCode *id_sum;
+
+ /**
+ * xor sums of the "hash of the hash".
+ */
+ struct GNUNET_HashCode *hash_sum;
+
+ /**
+ * How many times has a bucket been hit?
+ * Can be negative, as a result of IBF subtraction.
+ */
+ int8_t *count;
+};
+
+
/**
* Create an invertible bloom filter.
*
@@ -106,15 +144,6 @@
struct InvertibleBloomFilter *
ibf_dup (struct InvertibleBloomFilter *ibf);
-
-/*
-ibf_hton ();
-
-ibf_ntoh ();
-
-ibf_get_nbo_size ();
-*/
-
/**
* Destroy all resources associated with the invertible bloom filter.
* No more ibf_*-functions may be called on ibf after calling destroy.
Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf 2013-01-16 17:21:37 UTC (rev
25805)
+++ gnunet/src/consensus/test_consensus.conf 2013-01-17 00:53:11 UTC (rev
25806)
@@ -1,6 +1,6 @@
[consensus]
AUTOSTART = YES
-PORT = 2103
+PORT = 2110
HOSTNAME = localhost
HOME = $SERVICEHOME
BINARY = gnunet-service-consensus
Modified: gnunet/src/consensus/test_consensus_api.c
===================================================================
--- gnunet/src/consensus/test_consensus_api.c 2013-01-16 17:21:37 UTC (rev
25805)
+++ gnunet/src/consensus/test_consensus_api.c 2013-01-17 00:53:11 UTC (rev
25806)
@@ -17,6 +17,7 @@
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*/
+
/**
* @file consensus/test_consensus_api.c
* @brief testcase for consensus_api.c
@@ -29,18 +30,20 @@
static struct GNUNET_CONSENSUS_Handle *consensus;
-static int insert;
-
static struct GNUNET_HashCode session_id;
-static void conclude_done (void *cls,
- unsigned int num_peers_in_consensus,
- const struct GNUNET_PeerIdentity
*peers_in_consensus)
+static int
+conclude_done (void *cls, const struct GNUNET_CONSENSUS_Group *group)
{
- struct GNUNET_CONSENSUS_Handle *consensus;
- consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n");
+ if (NULL == group)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude over\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_NO;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "concluded\n");
+ return GNUNET_YES;
}
static int
@@ -54,11 +57,34 @@
static void
insert_done (void *cls, int success)
{
+ /* make sure cb is only called once */
+ static int called = GNUNET_NO;
+ GNUNET_assert (GNUNET_NO == called);
+ called = GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "insert done\n");
+ GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_SECONDS, 0,
&conclude_done, NULL);
}
+/**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
static void
+on_shutdown (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != consensus)
+ {
+ GNUNET_CONSENSUS_destroy (consensus);
+ consensus = NULL;
+ }
+}
+
+
+static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_TESTING_Peer *peer)
@@ -69,18 +95,19 @@
struct GNUNET_CONSENSUS_Element el2 = {"bar", 4, 0};
GNUNET_log_setup ("test_consensus_api",
- "DEBUG",
+ "INFO",
NULL);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &on_shutdown,
NULL);
+
GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id,
on_new_element, &consensus);
GNUNET_assert (consensus != NULL);
- /*
- GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1);
- GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2);
- */
+
+ GNUNET_CONSENSUS_insert (consensus, &el1, NULL, &consensus);
+ GNUNET_CONSENSUS_insert (consensus, &el2, &insert_done, &consensus);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25806 - gnunet/src/consensus,
gnunet <=