gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25866 - gnunet/src/consensus


From: gnunet
Subject: [GNUnet-SVN] r25866 - gnunet/src/consensus
Date: Thu, 24 Jan 2013 03:55:32 +0100

Author: dold
Date: 2013-01-24 03:55:31 +0100 (Thu, 24 Jan 2013)
New Revision: 25866

Modified:
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/consensus_protocol.h
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/ibf.c
   gnunet/src/consensus/test_consensus.conf
Log:
implemented value exchange, various fixes


Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-01-23 15:18:43 UTC (rev 
25865)
+++ gnunet/src/consensus/consensus_api.c        2013-01-24 02:55:31 UTC (rev 
25866)
@@ -176,7 +176,6 @@
 
   qmsg = consensus->messages_head;
   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, 
consensus->messages_tail, qmsg);
-  GNUNET_assert (qmsg);
 
   if (NULL == buf)
   {
@@ -196,9 +195,9 @@
   {
     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
   }
-  GNUNET_free (qmsg->msg);
-  GNUNET_free (qmsg);
 
+  /* FIXME: free the messages */
+
   send_next (consensus);
 
   return msg_size;
@@ -218,7 +217,6 @@
 
   if (NULL != consensus->messages_head)
   {
-    LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
     consensus->th = 
         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs 
(consensus->messages_head->msg->size),
                                              GNUNET_TIME_UNIT_FOREVER_REL,
@@ -226,7 +224,16 @@
   }
 }
 
+static void
+queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct 
GNUNET_MessageHeader *msg)
+{
+  struct QueuedMessage *qm;
+  qm = GNUNET_malloc (sizeof *qm);
+  qm->msg = msg;
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail, qm);
+}
 
+
 /**
  * Called when the server has sent is a new element
  * 
@@ -239,23 +246,24 @@
 {
   struct GNUNET_CONSENSUS_Element element;
   struct GNUNET_CONSENSUS_AckMessage *ack_msg;
-  struct QueuedMessage *queued_msg;
   int ret;
 
+  LOG (GNUNET_ERROR_TYPE_INFO, "received new element\n");
+
   element.type = msg->element_type;
-  element.size = msg->header.size - sizeof (struct 
GNUNET_CONSENSUS_ElementMessage);
+  element.size = ntohs (msg->header.size) - sizeof (struct 
GNUNET_CONSENSUS_ElementMessage);
   element.data = &msg[1];
 
   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
 
-  queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct 
GNUNET_CONSENSUS_AckMessage));
-  queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
-
-  ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
+  ack_msg = GNUNET_malloc (sizeof *ack_msg);
+  ack_msg->header.size = htons (sizeof *ack_msg);
+  ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
   ack_msg->keep = ret;
 
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, 
consensus->messages_tail,
-                                    queued_msg);
+  queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
+
+  send_next (consensus);
 }
 
 

Modified: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h   2013-01-23 15:18:43 UTC (rev 
25865)
+++ gnunet/src/consensus/consensus_protocol.h   2013-01-24 02:55:31 UTC (rev 
25866)
@@ -49,13 +49,15 @@
 
 struct DifferenceDigest
 {
-
   struct GNUNET_MessageHeader header;
+  uint8_t order;
+  uint8_t round;
 };
 
 struct Element
 {
   struct GNUNET_MessageHeader header;
+  struct GNUNET_HashCode hash;
 };
 
 struct ConsensusHello

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-01-23 15:18:43 UTC (rev 
25865)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-01-24 02:55:31 UTC (rev 
25866)
@@ -177,6 +177,7 @@
 new_element_cb (void *cls,
                 struct GNUNET_CONSENSUS_Element *element)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
   return GNUNET_YES;
 }
 
@@ -263,8 +264,11 @@
   int i;
 
 
+  GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
 
+
   peers = started_peers;
 
   peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-01-23 15:18:43 UTC 
(rev 25865)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-01-24 02:55:31 UTC 
(rev 25866)
@@ -55,12 +55,16 @@
  */
 #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
 
+#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
 
+#define MAX_IBF_ORDER (64)
 
+
 /* forward declarations */
 
 struct ConsensusSession;
 struct IncomingSocket;
+struct ConsensusPeerInformation;
 
 static void
 send_next (struct ConsensusSession *session);
@@ -68,6 +72,12 @@
 static void 
 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
 
+static void 
+write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
+
+static void 
+write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size);
+
 static int
 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct 
ConsensusSession *session);
 
@@ -91,6 +101,9 @@
    * The actual element
    */
   struct GNUNET_CONSENSUS_Element *element;
+
+  /* peer this element is coming from */
+  struct ConsensusPeerInformation *cpi;
 };
 
 struct ConsensusPeerInformation
@@ -130,20 +143,30 @@
    */
   int strata_counter;
 
-  struct InvertibleBloomFilter *my_ibf;
+  int ibf_order;
 
-  int my_ibf_bucket_counter;
+  struct InvertibleBloomFilter *outgoing_ibf;
 
-  struct InvertibleBloomFilter *peer_ibf;
+  int outgoing_bucket_counter;
 
-  int peer_ibf_bucket_counter;
+  struct InvertibleBloomFilter *incoming_ibf;
 
+  int incoming_bucket_counter;
+
   /**
+   * NULL or incoming_ibf - outgoing_ibf.
+   * Decoded values of side '1' are to be requested from the the peer.
+   */
+  struct InvertibleBloomFilter *diff_ibf;
+
+  /**
    * Strata estimator of the peer, NULL if our peer
    * initiated the reconciliation.
    */
   struct InvertibleBloomFilter **strata;
 
+  unsigned int diff;
+
   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
 
   struct ConsensusSession *session;
@@ -206,16 +229,6 @@
   struct GNUNET_CONTAINER_MultiHashMap *values;
 
   /**
-   * Elements that have not been sent to the client yet.
-   */
-  struct PendingElement *transmit_pending_head;
-
-  /**
-   * Elements that have not been sent to the client yet.
-   */
-  struct PendingElement *transmit_pending_tail;
-
-  /**
    * Elements that have not been approved (or rejected) by the client yet.
    */
   struct PendingElement *approval_pending_head;
@@ -281,6 +294,8 @@
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   struct InvertibleBloomFilter **strata;
+
+  struct InvertibleBloomFilter **ibfs;
 };
 
 
@@ -365,6 +380,16 @@
 static struct GNUNET_STREAM_ListenSocket *listener;
 
 
+static void
+queue_client_message (struct ConsensusSession *session, struct 
GNUNET_MessageHeader *msg)
+{
+  struct QueuedMessage *qm;
+  qm = GNUNET_malloc (sizeof *qm);
+  qm->msg = msg;
+  GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, 
session->client_messages_tail, qm);
+}
+
+
 static int
 estimate_difference (struct InvertibleBloomFilter** strata1,
                      struct InvertibleBloomFilter** strata2)
@@ -400,6 +425,7 @@
 }
 
 
+
 /**
  * Functions of this signature are called whenever data is available from the
  * stream.
@@ -412,19 +438,60 @@
  *         given to the next time the read processor is called).
  */
 static size_t
-stream_data_processor (void *cls,
+session_stream_data_processor (void *cls,
                        enum GNUNET_STREAM_Status status,
                        const void *data,
                        size_t size)
 {
+  struct ConsensusPeerInformation *cpi;
+  int ret;
+
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+
+  cpi = cls;
+
+  GNUNET_assert (NULL != cpi->mst);
+
+  ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, 
GNUNET_YES);
+  if (GNUNET_SYSERR == ret)
+  {
+    /* FIXME: handle this correctly */
+    GNUNET_assert (0);
+  }
+
+  /* read again */
+  cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
+                                &session_stream_data_processor, cpi);
+
+  /* we always read all data */
+  return size;
+}
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on 
timeout 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+incoming_stream_data_processor (void *cls,
+                       enum GNUNET_STREAM_Status status,
+                       const void *data,
+                       size_t size)
+{
   struct IncomingSocket *incoming;
   int ret;
 
   GNUNET_assert (GNUNET_STREAM_OK == status);
 
-  incoming = (struct IncomingSocket *) cls;
+  incoming = cls;
 
-  ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, 
GNUNET_NO, GNUNET_NO);
+  ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, 
GNUNET_NO, GNUNET_YES);
   if (GNUNET_SYSERR == ret)
   {
     /* FIXME: handle this correctly */
@@ -433,13 +500,47 @@
 
   /* read again */
   incoming->rh = GNUNET_STREAM_read (incoming->socket, 
GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &stream_data_processor, incoming);
+                                     &incoming_stream_data_processor, 
incoming);
 
   /* we always read all data */
   return size;
 }
 
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
 static int
+ibf_values_iterator (void *cls,
+                     const struct GNUNET_HashCode *key,
+                     void *value)
+{
+  struct ConsensusPeerInformation *cpi;
+  cpi = cls;
+  ibf_insert (cpi->session->ibfs[cpi->ibf_order], key);
+  return GNUNET_YES;
+}
+
+
+static void
+create_outgoing_ibf (struct ConsensusPeerInformation *cpi)
+{
+  if (NULL == cpi->session->ibfs[cpi->ibf_order])
+  {
+    cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, 
STRATA_HASH_NUM, 0);
+    GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, 
ibf_values_iterator, cpi);
+  }
+  cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
+}
+
+static int
 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct 
StrataMessage *strata_msg)
 {
   int i;
@@ -477,8 +578,6 @@
 
   for (i = 0; i < num_strata; i++)
   {
-    uint8_t zero[STRATA_IBF_BUCKETS];
-    memset (zero, 0, STRATA_IBF_BUCKETS);
     memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, 
STRATA_IBF_BUCKETS);
     count_src += STRATA_IBF_BUCKETS;
   }
@@ -489,9 +588,17 @@
 
   if (STRATA_COUNT == cpi->strata_counter)
   {
-    int diff;
-    diff = estimate_difference (cpi->session->strata, cpi->strata);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
+
+    cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", 
cpi->diff);
+    cpi->ibf_order = 0;
+    while ((1 << cpi->ibf_order) < cpi->diff)
+      cpi->ibf_order++;
+    if (cpi->ibf_order > MAX_IBF_ORDER)
+      cpi->ibf_order = MAX_IBF_ORDER;
+    cpi->ibf_order += 2;
+    create_outgoing_ibf (cpi);
+    write_ibf (cpi, GNUNET_STREAM_OK, 0);
   }
 
   return GNUNET_YES;
@@ -499,15 +606,97 @@
 
 
 static int
-handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct 
DifferenceDigest *strata)
+handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct 
DifferenceDigest *digest)
 {
+  struct GNUNET_HashCode *hash_src;
+  int num_buckets;
+  uint8_t *count_src;
+
+  num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / 
IBF_BUCKET_SIZE;
+
+  if (cpi->is_outgoing == GNUNET_YES)
+  {
+    /* we receive the ibf as an initiator, thus we're interested in the order 
*/
+    cpi->ibf_order = digest->order;
+    if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh))
+    {
+      create_outgoing_ibf (cpi);
+      write_ibf (cpi, GNUNET_STREAM_OK, 0);
+    }
+    /* FIXME: ensure that orders do not differ each time */
+  }
+  else
+  {
+    /* FIXME: handle correctly */
+    GNUNET_assert (cpi->ibf_order == digest->order);
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", 
num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order));
+
+  if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order))
+  {
+    /* TODO: handle this */
+    GNUNET_assert (0);
+  }
+
+  if (NULL == cpi->incoming_ibf)
+    cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
+
+  hash_src = (struct GNUNET_HashCode *) &digest[1];
+
+  memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof 
*hash_src);
+  hash_src += num_buckets;
+
+  memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
+  hash_src += num_buckets;
+
+  count_src = (uint8_t *) hash_src;
+
+  memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof 
*count_src);
+
+  cpi->incoming_bucket_counter += num_buckets;
+
+  if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
+    if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << 
cpi->ibf_order)))
+      write_values (cpi, GNUNET_STREAM_OK, 0);
+  }
   return GNUNET_YES;
 }
 
 
 static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element 
*strata)
+handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct 
GNUNET_MessageHeader *element_msg)
 {
+  struct PendingElement *pending_element;
+  struct GNUNET_CONSENSUS_Element *element;
+  struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
+  size_t size;
+
+  size = ntohs (element_msg->size) - sizeof *element_msg;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
+
+  element = GNUNET_malloc (size + sizeof *element);
+  element->size = size;
+  memcpy (&element[1], &element_msg[1], size);
+
+  pending_element = GNUNET_malloc (sizeof *pending_element);
+  pending_element->element = element;
+  GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, 
cpi->session->approval_pending_tail, pending_element);
+
+  client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
+  client_element_msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+  client_element_msg->header.size = htons (size + sizeof *client_element_msg);
+  memcpy (&client_element_msg[1], &element[1], size);
+
+  queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) 
client_element_msg);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
+
+  send_next (cpi->session);
+  
   return GNUNET_YES;
 }
 
@@ -556,16 +745,17 @@
 mst_session_callback (void *cls, void *client, const struct 
GNUNET_MessageHeader *message)
 {
   struct ConsensusPeerInformation *cpi;
-  cpi = (struct ConsensusPeerInformation *) cls;
-  switch (ntohs( message->type))
+  cpi =  cls;
+  switch (ntohs (message->type))
   {
     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
-      return handle_p2p_element (cpi, (struct Element *) message);
+      return handle_p2p_element (cpi, message);
     default:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: 
%u\n", ntohs (message->type));
       /* FIXME: handle correctly */
       GNUNET_assert (0);
   }
@@ -632,7 +822,7 @@
   incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
 
   incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &stream_data_processor, incoming);
+                                     &incoming_stream_data_processor, 
incoming);
 
 
   incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
@@ -727,7 +917,7 @@
   struct QueuedMessage *qmsg;
   size_t msg_size;
 
-  session = (struct ConsensusSession *) cls;
+  session = cls;
   session->th = NULL;
 
 
@@ -773,7 +963,6 @@
   {
     int msize;
     msize = ntohs (session->client_messages_head->msg->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
     session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
                                                        
GNUNET_TIME_UNIT_FOREVER_REL,
                                                        &transmit_queued, 
session);
@@ -821,13 +1010,11 @@
 {
   struct ConsensusPeerInformation *cpi;
 
-  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi = cls;
   cpi->hello = GNUNET_YES;
   
   GNUNET_assert (GNUNET_STREAM_OK == status);
 
-  cpi = (struct ConsensusPeerInformation *) cls;
-
   if (cpi->session->conclude_requested)
   {
     write_strata (cpi, GNUNET_STREAM_OK, 0);  
@@ -848,7 +1035,7 @@
   struct ConsensusHello *hello;
 
 
-  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi = cls;
   cpi->is_connected = GNUNET_YES;
 
   hello = GNUNET_malloc (sizeof *hello);
@@ -856,10 +1043,12 @@
   hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct 
GNUNET_HashCode));
 
-
   cpi->wh =
       GNUNET_STREAM_write (socket, hello, sizeof *hello, 
GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
 
+  cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
+                                &session_stream_data_processor, cpi);
+
 }
 
 
@@ -874,18 +1063,19 @@
     /* initialize back-references, so consensus peer information can
      * be used as closure */
     session->info[i].session = session;
-
   }
 
-  last = (session->local_peer_idx + (session->num_peers / 2)) % 
session->num_peers;
+  last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % 
session->num_peers;
   i = (session->local_peer_idx + 1) % session->num_peers;
   while (i != last)
   {
     session->info[i].is_outgoing = GNUNET_YES;
     session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], 
GNUNET_APPLICATION_TYPE_CONSENSUS,
                                                   open_cb, &session->info[i], 
GNUNET_STREAM_OPTION_END);
-    session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, 
session);
+    session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, 
&session->info[i]);
     i = (i + 1) % session->num_peers;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", 
session->local_peer_idx, i);
   }
   // tie-breaker for even number of peers
   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
@@ -893,6 +1083,9 @@
     session->info[last].is_outgoing = GNUNET_YES;
     session->info[last].socket = GNUNET_STREAM_open (cfg, 
&session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
                                                      open_cb, 
&session->info[last], GNUNET_STREAM_OPTION_END);
+    session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, 
&session->info[last]);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d 
(tiebreaker)\n", session->local_peer_idx, last);
   }
 }
 
@@ -949,9 +1142,6 @@
   v = key->bits[0];
   /* count trailing '1'-bits of v */
   for (i = 0; v & 1; v>>=1, i++);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
-
   ibf_insert (strata[i], key);
 }
 
@@ -1001,8 +1191,9 @@
   for (i = 0; i < STRATA_COUNT; i++)
     session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
 
+  session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct 
InvertibleBloomFilter *));
+
   session->info = GNUNET_malloc (session->num_peers * sizeof (struct 
ConsensusPeerInformation));
-
   initialize_session_info (session);
 
   GNUNET_free (session->join_msg);
@@ -1053,11 +1244,9 @@
   if (NULL == my_peer)
   {
     GNUNET_SERVER_disable_receive_done_warning (client);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
     return;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
   initialize_session (session);
 }
 
@@ -1097,7 +1286,7 @@
   }
 
   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
-  element_size = msg->header.size - sizeof (struct 
GNUNET_CONSENSUS_ElementMessage);
+  element_size = ntohs (msg->header.size )- sizeof (struct 
GNUNET_CONSENSUS_ElementMessage);
 
   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + 
element_size);
 
@@ -1146,7 +1335,8 @@
   uint8_t *count_dst;
   int num_strata;
 
-  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi = cls;
+  cpi->wh = NULL;
 
   GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
 
@@ -1156,6 +1346,7 @@
   if (STRATA_COUNT == cpi->strata_counter)
   {
     /* strata have been written, wait for other side's IBF */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
     return;
   }
 
@@ -1223,8 +1414,57 @@
 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
 {
   struct ConsensusPeerInformation *cpi;
+  struct DifferenceDigest *digest;
+  int msize;
+  struct GNUNET_HashCode *hash_dst;
+  uint8_t *count_dst;
+  int num_buckets;
 
-  cpi = (struct ConsensusPeerInformation *) cls;
+  cpi = cls;
+  cpi->wh = NULL;
+
+  if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
+    if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
+      write_values (cpi, GNUNET_STREAM_OK, 0);
+    return;
+  }
+
+  /* remaining buckets */
+  num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter;
+
+  /* limit to maximum */
+  if (num_buckets > BUCKETS_PER_MESSAGE)
+    num_buckets = BUCKETS_PER_MESSAGE;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", 
cpi->outgoing_bucket_counter, (1<<cpi->ibf_order));
+
+  msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
+
+  digest = GNUNET_malloc (msize);
+  digest->header.size = htons (msize);
+  digest->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
+  digest->order = cpi->ibf_order;
+
+  hash_dst = (struct GNUNET_HashCode *) &digest[1];
+
+  memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof 
*hash_dst);
+  hash_dst += num_buckets;
+
+  memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst);
+  hash_dst += num_buckets;
+
+  count_dst = (uint8_t *) hash_dst;
+
+  memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof 
*count_dst);
+
+  cpi->outgoing_bucket_counter += num_buckets;
+
+  cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, 
GNUNET_TIME_UNIT_FOREVER_REL,
+                                 write_ibf, cpi);
+
+  GNUNET_assert (NULL != cpi->wh);
 }
 
 
@@ -1247,8 +1487,71 @@
 write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
 {
   struct ConsensusPeerInformation *cpi;
+  struct GNUNET_HashCode key;
+  struct GNUNET_CONSENSUS_Element *element;
+  struct GNUNET_MessageHeader *element_msg;
+  int side;
+  int msize;
 
-  cpi = (struct ConsensusPeerInformation *) cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
+
+  cpi = cls;
+  cpi->wh = NULL;
+
+  if (NULL == cpi->diff_ibf)
+  {
+    GNUNET_assert (NULL != cpi->incoming_ibf);
+    GNUNET_assert (NULL != cpi->outgoing_ibf);
+    GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size);
+    cpi->diff_ibf = ibf_dup (cpi->incoming_ibf);
+    ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf);
+  }
+
+  for (;;)
+  {
+    int res;
+    res = ibf_decode (cpi->diff_ibf, &side, &key);
+    if (GNUNET_SYSERR == res)
+    {
+      /* TODO: handle this correctly, request new ibf */
+      GNUNET_break (0);
+      return;
+    }
+    if (GNUNET_NO == res)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
+      return;
+    }
+    if (-1 == side)
+      break;
+  }
+
+  element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key);
+
+  if (NULL == element)
+  {
+    /* FIXME: handle correctly */
+    GNUNET_break (0);
+    return;
+  }
+
+  msize = sizeof (struct GNUNET_MessageHeader) + element->size;
+
+  element_msg = GNUNET_malloc (msize);
+  element_msg->size = htons (msize);
+  element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
+
+  memcpy (&element_msg[1], element->data, element->size);
+
+  cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, 
GNUNET_TIME_UNIT_FOREVER_REL,
+                                 write_values, cpi);
+
+  GNUNET_free (element_msg);
+
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
+
+  GNUNET_assert (NULL != cpi->wh);
 }
 
 
@@ -1301,7 +1604,6 @@
       write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
     }
   }
-  
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   send_next (session);
@@ -1320,7 +1622,48 @@
              struct GNUNET_SERVER_Client *client,
              const struct GNUNET_MessageHeader *message)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
+  struct ConsensusSession *session;
+  struct GNUNET_CONSENSUS_AckMessage *msg;
+  struct PendingElement *pending;
+  struct GNUNET_CONSENSUS_Element *element;
+  struct GNUNET_HashCode key;
+
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (session->client == client)
+      break;
+  }
+
+  if (NULL == session)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is 
not in any session\n");
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
+
+  pending = session->approval_pending_head;
+
+  GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, 
session->approval_pending_tail, pending);
+
+  msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
+
+  if (msg->keep)
+  {
+
+    element = pending->element;
+
+    GNUNET_CRYPTO_hash (element, element->size, &key);
+
+    GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+    strata_insert (session->strata, &key);
+  }
+
+  /* FIXME: also remove element from strata */
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 /**
@@ -1371,10 +1714,41 @@
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+
+  /* FIXME: complete; write separate destructors for different data types */
+
+  while (NULL != incoming_sockets_head)
+  {
+    struct IncomingSocket *socket;
+    socket = incoming_sockets_head;
+    if (NULL == socket->cpi)
+    {
+      GNUNET_STREAM_close (socket->socket);
+    }
+    incoming_sockets_head = incoming_sockets_head->next;
+    GNUNET_free (socket);
+  }
+
   while (NULL != sessions_head)
   {
     struct ConsensusSession *session;
+    int i;
+
     session = sessions_head;
+
+    for (i = 0; session->num_peers; i++)
+    {
+      struct ConsensusPeerInformation *cpi;
+      cpi = &session->info[i];
+      if ((NULL != cpi) && (NULL != cpi->socket))
+      {
+        GNUNET_STREAM_close (cpi->socket);
+      }
+    }
+
+    if (NULL != session->client)
+      GNUNET_SERVER_client_disconnect (session->client);
+
     sessions_head = sessions_head->next;
     GNUNET_free (session);
   }
@@ -1436,7 +1810,6 @@
   GNUNET_assert (NULL != core);
 
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", 
STRATA_PER_MESSAGE);
 }
 
 

Modified: gnunet/src/consensus/ibf.c
===================================================================
--- gnunet/src/consensus/ibf.c  2013-01-23 15:18:43 UTC (rev 25865)
+++ gnunet/src/consensus/ibf.c  2013-01-24 02:55:31 UTC (rev 25866)
@@ -111,8 +111,6 @@
       
       ibf->count[bucket] += side;
 
-      GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d 
\n", bucket);
-      
       GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
                              &ibf->id_sum[bucket]);
       GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket],

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2013-01-23 15:18:43 UTC (rev 
25865)
+++ gnunet/src/consensus/test_consensus.conf    2013-01-24 02:55:31 UTC (rev 
25866)
@@ -5,6 +5,7 @@
 HOME = $SERVICEHOME
 BINARY = gnunet-service-consensus
 #PREFIX = gdbserver :12345
+PREFIX = valgrind
 ACCEPT_FROM = 127.0.0.1;
 ACCEPT_FROM6 = ::1;
 UNIXPATH = /tmp/gnunet-service-consensus.sock




reply via email to

[Prev in Thread] Current Thread [Next in Thread]