[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 04/04: implement union via sending whole set
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 04/04: implement union via sending whole set |
Date: |
Thu, 23 Feb 2017 17:47:48 +0100 |
This is an automated email from the git hooks/post-receive script.
dold pushed a commit to branch master
in repository gnunet.
commit caf375948ecc718bac6d75f415cc1c8324a9199c
Author: Florian Dold <address@hidden>
AuthorDate: Thu Feb 23 17:13:39 2017 +0100
implement union via sending whole set
---
src/include/gnunet_protocols.h | 8 +-
src/set/gnunet-service-set.c | 16 ++
src/set/gnunet-service-set_union.c | 351 +++++++++++++++++++++++++++++++++----
src/set/test_set.conf | 2 +-
4 files changed, 343 insertions(+), 34 deletions(-)
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index a10c0ca5d..f478edd27 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1800,7 +1800,13 @@ extern "C"
* based on their sets and the elements we previously sent
* with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
*/
-#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_GET_MISSING 597
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE 597
+
+/**
+ * Send a set element, not as response to a demand but because
+ * we're sending the full set.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT 598
/*******************************************************************************
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index a545e8a06..1072407f1 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1371,6 +1371,10 @@ handle_client_listen (void *cls,
struct GNUNET_MessageHeader,
NULL),
GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (p2p_message,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
struct GNUNET_MessageHeader,
NULL),
@@ -1379,6 +1383,10 @@ handle_client_listen (void *cls,
struct GNUNET_MessageHeader,
NULL),
GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (p2p_message,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
struct GNUNET_MessageHeader,
NULL),
@@ -1634,6 +1642,14 @@ handle_client_evaluate (void *cls,
struct GNUNET_MessageHeader,
op),
GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (p2p_message,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
struct GNUNET_MessageHeader,
op),
diff --git a/src/set/gnunet-service-set_union.c
b/src/set/gnunet-service-set_union.c
index 137216ed7..d2dfe049b 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -115,14 +115,22 @@ enum UnionOperationPhase
* In the penultimate phase,
* we wait until all our demands
* are satisfied. Then we send a done
- * message, and wait for another done message.*/
+ * message, and wait for another done message.
+ */
PHASE_FINISH_WAITING,
/**
* In the ultimate phase, we wait until
* our demands are satisfied and then
- * quit (sending another DONE message). */
- PHASE_DONE
+ * quit (sending another DONE message).
+ */
+ PHASE_DONE,
+
+ /**
+ * After sending the full set, wait for responses with the elements
+ * that the local peer is missing.
+ */
+ PHASE_FULL_SENDING,
};
@@ -214,6 +222,14 @@ struct KeyEntry
* is #GNUNET_YES.
*/
struct ElementEntry *element;
+
+ /**
+ * Did we receive this element?
+ * Even if element->is_foreign is false, we might
+ * have received the element, so this indicates that
+ * the other peer has it.
+ */
+ int received;
};
@@ -373,6 +389,16 @@ get_ibf_key (const struct GNUNET_HashCode *src)
/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+ struct GNUNET_HashCode hash;
+ struct KeyEntry *k;
+};
+
+
+/**
* Iterator over the mapping from IBF keys to element entries. Checks if we
* have an element with a given GNUNET_HashCode.
*
@@ -383,17 +409,20 @@ get_ibf_key (const struct GNUNET_HashCode *src)
* #GNUNET_NO if we've found the element.
*/
static int
-op_has_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
uint32_t key,
void *value)
{
- struct GNUNET_HashCode *element_hash = cls;
+ struct GetElementContext *ctx = cls;
struct KeyEntry *k = value;
GNUNET_assert (NULL != k);
if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
- element_hash))
+ &ctx->hash))
+ {
+ ctx->k = k;
return GNUNET_NO;
+ }
return GNUNET_YES;
}
@@ -406,23 +435,29 @@ op_has_element_iterator (void *cls,
* @param element_hash hash of the element to look for
* @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
*/
-static int
-op_has_element (struct Operation *op,
+static struct KeyEntry *
+op_get_element (struct Operation *op,
const struct GNUNET_HashCode *element_hash)
{
int ret;
struct IBF_Key ibf_key;
+ struct GetElementContext ctx = { 0 };
+
+ ctx.hash = *element_hash;
ibf_key = get_ibf_key (element_hash);
ret = GNUNET_CONTAINER_multihashmap32_get_multiple
(op->state->key_to_element,
(uint32_t)
ibf_key.key_val,
- op_has_element_iterator,
- (void *) element_hash);
+ op_get_element_iterator,
+ &ctx);
/* was the iteration aborted because we found the element? */
if (GNUNET_SYSERR == ret)
- return GNUNET_YES;
- return GNUNET_NO;
+ {
+ GNUNET_assert (NULL != ctx.k);
+ return ctx.k;
+ }
+ return NULL;
}
@@ -438,10 +473,12 @@ op_has_element (struct Operation *op,
*
* @param op the union operation
* @param ee the element entry
+ * @parem received was this element received from the remote peer?
*/
static void
op_register_element (struct Operation *op,
- struct ElementEntry *ee)
+ struct ElementEntry *ee,
+ int received)
{
struct IBF_Key ibf_key;
struct KeyEntry *k;
@@ -450,6 +487,7 @@ op_register_element (struct Operation *op,
k = GNUNET_new (struct KeyEntry);
k->element = ee;
k->ibf_key = ibf_key;
+ k->received = received;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap32_put
(op->state->key_to_element,
(uint32_t)
ibf_key.key_val,
@@ -535,12 +573,30 @@ init_key_to_element_iterator (void *cls,
GNUNET_assert (GNUNET_NO == ee->remote);
- op_register_element (op, ee);
+ op_register_element (op, ee, GNUNET_NO);
return GNUNET_YES;
}
/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+ unsigned int len;
+
+ GNUNET_assert (NULL == op->state->key_to_element);
+ len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+ op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+ GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
init_key_to_element_iterator, op);
+}
+
+
+/**
* Create an ibf with the operation's elements
* of the specified size
*
@@ -552,15 +608,8 @@ static int
prepare_ibf (struct Operation *op,
uint32_t size)
{
- if (NULL == op->state->key_to_element)
- {
- unsigned int len;
+ GNUNET_assert (NULL != op->state->key_to_element);
- len = GNUNET_CONTAINER_multihashmap_size
(op->spec->set->content->elements);
- op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len +
1);
- GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
- init_key_to_element_iterator, op);
- }
if (NULL != op->state->local_ibf)
ibf_destroy (op->state->local_ibf);
op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -709,6 +758,47 @@ get_order_from_difference (unsigned int diff)
/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct GNUNET_SET_ElementMessage *emsg;
+ struct GNUNET_SET_Element *el = value;
+ struct GNUNET_MQ_Envelope *ev;
+
+ ev = GNUNET_MQ_msg_extra (emsg, el->size,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ emsg->element_type = htonl (el->element_type);
+ GNUNET_memcpy (&emsg[1], el->data, el->size);
+ GNUNET_MQ_send (op->mq, ev);
+ return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+ struct GNUNET_MQ_Envelope *ev;
+
+ op->state->phase = PHASE_FULL_SENDING;
+
+ (void) GNUNET_CONTAINER_multihashmap_iterate
(op->spec->set->content->elements,
+ &send_element_iterator, op);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+}
+
+
+/**
* Handle a strata estimator from a remote peer
*
* @param cls the union operation
@@ -776,16 +866,29 @@ handle_p2p_strata_estimator (void *cls,
"got se diff=%d, using ibf size %d\n",
diff,
1<<get_order_from_difference (diff));
- if (GNUNET_OK !=
- send_ibf (op,
- get_order_from_difference (diff)))
+
+ if (diff > GNUNET_CONTAINER_multihashmap_size
(op->spec->set->content->elements) / 2)
{
- /* Internal error, best we can do is shut the connection */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to send IBF, closing connection\n");
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Sending full set (diff=%d, own set=%u)\n",
+ diff,
+ GNUNET_CONTAINER_multihashmap_size
(op->spec->set->content->elements));
+ send_full_set (op);
+ }
+ else
+ {
+ if (GNUNET_OK !=
+ send_ibf (op,
+ get_order_from_difference (diff)))
+ {
+ /* Internal error, best we can do is shut the connection */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to send IBF, closing connection\n");
+ fail_union_operation (op);
+ return GNUNET_SYSERR;
+ }
}
+
return GNUNET_OK;
}
@@ -1288,7 +1391,9 @@ handle_p2p_elements (void *cls,
op->state->received_total += 1;
- if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+ struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+ if (NULL != ke)
{
/* Got repeated element. Should not happen since
* we track demands. */
@@ -1296,6 +1401,7 @@ handle_p2p_elements (void *cls,
"# repeated elements",
1,
GNUNET_NO);
+ ke->received = GNUNET_YES;
GNUNET_free (ee);
}
else
@@ -1303,7 +1409,7 @@ handle_p2p_elements (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
op->state->received_fresh += 1;
- op_register_element (op, ee);
+ op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
{
@@ -1333,6 +1439,99 @@ handle_p2p_elements (void *cls,
/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_full_element (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ const struct GNUNET_SET_ElementMessage *emsg;
+ uint16_t element_size;
+
+ if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+ element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+ ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+ GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+ ee->element.size = element_size;
+ ee->element.data = &ee[1];
+ ee->element.element_type = ntohs (emsg->element_type);
+ ee->remote = GNUNET_YES;
+ GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got element (full diff, size %u, hash %s) from peer\n",
+ (unsigned int) element_size,
+ GNUNET_h2s (&ee->element_hash));
+
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# received elements",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+
+ op->state->received_total += 1;
+
+ struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+ if (NULL != ke)
+ {
+ /* Got repeated element. Should not happen since
+ * we track demands. */
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# repeated elements",
+ 1,
+ GNUNET_NO);
+ ke->received = GNUNET_YES;
+ GNUNET_free (ee);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Registering new element from remote peer\n");
+ op->state->received_fresh += 1;
+ op_register_element (op, ee, GNUNET_YES);
+ /* only send results immediately if the client wants it */
+ switch (op->spec->result_mode)
+ {
+ case GNUNET_SET_RESULT_ADDED:
+ send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+ break;
+ case GNUNET_SET_RESULT_SYMMETRIC:
+ send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+ break;
+ default:
+ /* Result mode not supported, should have been caught earlier. */
+ GNUNET_break (0);
+ break;
+ }
+ }
+
+ if (op->state->received_total > 8 && op->state->received_fresh <
op->state->received_total / 3)
+ {
+ /* The other peer gave us lots of old elements, there's something wrong. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+/**
* Send offers (for GNUNET_Hash-es) in response
* to inquiries (for IBF_Key-s).
*
@@ -1379,6 +1578,85 @@ handle_p2p_inquiry (void *cls,
/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_ElementMessage *emsg;
+ struct ElementEntry *ee = ke->element;
+
+ if (GNUNET_YES == ke->received)
+ return GNUNET_YES;
+
+ ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+ emsg->reserved = htons (0);
+ emsg->element_type = htons (ee->element.element_type);
+ GNUNET_MQ_send (op->mq, ev);
+
+ return GNUNET_YES;
+}
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF == op->state->phase)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other
peer is missing\n");
+
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_elements_iter,
+ op);
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_DONE;
+
+ /* we now wait until the other peer shuts the tunnel down*/
+ }
+ else if (PHASE_FULL_SENDING == op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ send_done_and_destroy (op);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n",
(unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+
+/**
* Handle a demand by the other peer for elements based on a list
* of GNUNET_HashCode-s.
*
@@ -1635,6 +1913,8 @@ union_evaluate (struct Operation *op,
else
LOG (GNUNET_ERROR_TYPE_DEBUG,
"sent op request without context message\n");
+
+ initialize_key_to_element (op);
}
@@ -1664,6 +1944,7 @@ union_accept (struct Operation *op)
op->state->se = strata_estimator_dup (op->spec->set->state->se);
op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
GNUNET_NO);
op->state->salt_receive = op->state->salt_send = 42;
+ initialize_key_to_element (op);
/* kick off the operation */
send_strata_estimator (op);
}
@@ -1771,6 +2052,9 @@ union_handle_p2p_message (struct Operation *op,
case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
handle_p2p_elements (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+ handle_p2p_full_element (op, mh);
+ break;
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
handle_p2p_inquiry (op, mh);
break;
@@ -1783,6 +2067,9 @@ union_handle_p2p_message (struct Operation *op,
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
handle_p2p_demand (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+ handle_p2p_full_done (op, mh);
+ break;
default:
/* Something wrong with cadet's message handlers? */
GNUNET_assert (0);
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 69e7f5c52..30ccbde55 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -5,7 +5,7 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-set/
[set]
AUTOSTART = YES
-# PREFIX = valgrind
+PREFIX = valgrind
#PREFIX = valgrind --leak-check=full
#PREFIX = gdbserver :1234
OPTIONS = -L INFO
--
To stop receiving notification emails like this one, please contact
address@hidden