[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28534 - in gnunet/src: include set
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28534 - in gnunet/src: include set |
Date: |
Mon, 12 Aug 2013 16:34:16 +0200 |
Author: dold
Date: 2013-08-12 16:34:16 +0200 (Mon, 12 Aug 2013)
New Revision: 28534
Modified:
gnunet/src/include/gnunet_set_service.h
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/gnunet-service-set_union.c
gnunet/src/set/set.h
gnunet/src/set/set_api.c
Log:
- listener re-connects transparently
- bugs
Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h 2013-08-12 14:33:26 UTC (rev
28533)
+++ gnunet/src/include/gnunet_set_service.h 2013-08-12 14:34:16 UTC (rev
28534)
@@ -208,8 +208,7 @@
* @param other_peer the other peer
* @param context_msg message with application specific information from
* the other peer
- * @param request request from the other peer, use GNUNET_SET_accept
- * Will be NULL if the listener failed.
+ * @param request request from the other peer (never NULL), use
GNUNET_SET_accept
* to accept it, otherwise the request will be refused
* Note that we can't just return value from the listen callback,
* as it is also necessary to specify the set we want to do the
@@ -315,7 +314,9 @@
/**
- * Wait for set operation requests for the given application id
+ * Wait for set operation requests for the given application ID.
+ * If the connection to the set service is lost, the listener is
+ * re-created transparently with exponential backoff.
*
* @param cfg configuration to use for connecting to
* the set service
@@ -336,6 +337,8 @@
/**
* Cancel the given listen operation.
+ * After calling cancel, the listen callback for this listen handle
+ * will not be called again.
*
* @param lh handle for the listen operation
*/
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/gnunet-service-set.c 2013-08-12 14:34:16 UTC (rev 28534)
@@ -442,8 +442,8 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u,
app %s)\n",
- ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
- listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id);
+ ntohl (msg->operation), GNUNET_h2s (&msg->app_id));
+ listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id);
if (NULL == listener)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -477,6 +477,7 @@
return;
}
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
set->vt->iterate (set);
}
@@ -557,21 +558,30 @@
listener->client = client;
listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
listener->app_id = msg->app_id;
- listener->operation = ntohs (msg->operation);
+ listener->operation = ntohl (msg->operation);
GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app
%s)\n",
listener->operation, GNUNET_h2s (&listener->app_id));
for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
{
- if ( (NULL == incoming->spec) ||
- (0 != incoming->suggest_id) )
+ if (NULL == incoming->spec)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
continue;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s,
suggest: %u)\n",
+ incoming->spec->operation, GNUNET_h2s
(&incoming->spec->app_id), incoming->suggest_id);
+
+ if (0 != incoming->suggest_id)
+ continue;
if (listener->operation != incoming->spec->operation)
continue;
if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
&incoming->spec->app_id))
continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
incoming_suggest (incoming, listener);
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -942,8 +952,9 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
ntohs (message->type));
+ /* FIXME: do this before or after the handler? */
+ GNUNET_MESH_receive_done (tunnel);
ret = tc->vt->msg_handler (tc->op, message);
- GNUNET_MESH_receive_done (tunnel);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
ntohs (message->type));
return ret;
@@ -1023,7 +1034,7 @@
int ret;
ret = GNUNET_SERVICE_run (argc, argv, "set",
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
return (GNUNET_OK == ret) ? 0 : 1;
}
Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c 2013-08-12 14:33:26 UTC (rev
28533)
+++ gnunet/src/set/gnunet-service-set_union.c 2013-08-12 14:34:16 UTC (rev
28534)
@@ -147,6 +147,8 @@
/**
* Maps IBF-Keys (specific to the current salt) to elements.
+ * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+ * Colliding IBF-Keys are linked.
*/
struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
@@ -493,7 +495,7 @@
GNUNET_SERVER_client_disconnect (eo->spec->set->client);
return;
}
- msg->operation = htons (GNUNET_SET_OPERATION_UNION);
+ msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
msg->app_id = eo->spec->app_id;
msg->salt = htonl (eo->spec->salt);
GNUNET_MQ_send (eo->mq, ev);
@@ -524,7 +526,7 @@
* GNUNET_NO if not.
*/
static int
-insert_element_iterator (void *cls,
+op_register_element_iterator (void *cls,
uint32_t key,
void *value)
{
@@ -549,12 +551,16 @@
/**
* Insert an element into the union operation's
* key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
*
* @param eo the union operation
* @param ee the element entry
*/
static void
-insert_element (struct OperationState *eo, struct ElementEntry *ee)
+op_register_element (struct OperationState *eo, struct ElementEntry *ee)
{
int ret;
struct IBF_Key ibf_key;
@@ -566,14 +572,14 @@
k->ibf_key = ibf_key;
ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
(uint32_t)
ibf_key.key_val,
- insert_element_iterator,
k);
+
op_register_element_iterator, k);
/* was the element inserted into a colliding bucket? */
if (GNUNET_SYSERR == ret)
return;
GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t)
ibf_key.key_val, k,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
@@ -623,7 +629,7 @@
e->remote = GNUNET_NO;
- insert_element (eo, e);
+ op_register_element (eo, e);
return GNUNET_YES;
}
@@ -861,27 +867,32 @@
ibf_destroy (eo->remote_ibf);
eo->remote_ibf = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n",
diff_ibf->size);
+
num_decoded = 0;
+ last_key.key_val = 0;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n",
diff_ibf->size);
-
while (1)
{
int res;
+ int cycle_detected = GNUNET_NO;
- if (num_decoded > 0)
- last_key = key;
+ last_key = key;
res = ibf_decode (diff_ibf, &side, &key);
if (res == GNUNET_OK)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
key.key_val);
- num_decoded += 1;
- if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val
== key.key_val))
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded
%u/%u)\n",
- num_decoded, diff_ibf->size);
- if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) ||
- (num_decoded > 1 && last_key.key_val == key.key_val))
+ num_decoded += 1;
+ if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val
== key.key_val))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded
%u/%u)\n",
+ num_decoded, diff_ibf->size);
+ cycle_detected = GNUNET_YES;
+ }
+ }
+ if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
{
int next_order;
next_order = 0;
@@ -922,6 +933,8 @@
/* FIXME: before sending the request, check if we may just have the
element */
/* FIXME: merge multiple requests */
+ /* FIXME: remember somewhere that we already requested the element,
+ * so that we don't request it again with the next ibf if decoding fails
*/
ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
@@ -1089,7 +1102,9 @@
ee->remote = GNUNET_YES;
GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
- insert_element (eo, ee);
+ /* FIXME: see if the element has already been inserted! */
+
+ op_register_element (eo, ee);
send_client_element (eo, &ee->element);
}
@@ -1386,6 +1401,8 @@
union_handle_p2p_message (struct OperationState *eo,
const struct GNUNET_MessageHeader *mh)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
+ ntohs (mh->type), ntohs (mh->size));
switch (ntohs (mh->type))
{
case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1490,6 +1507,8 @@
{
struct GNUNET_MQ_Envelope *ev;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u
elements\n",
+ GNUNET_CONTAINER_multihashmap_size (set->state->elements));
GNUNET_CONTAINER_multihashmap_iterate (set->state->elements,
send_iter_element_iter, set);
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
GNUNET_MQ_send (set->client_mq, ev);
Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h 2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/set.h 2013-08-12 14:34:16 UTC (rev 28534)
@@ -59,7 +59,7 @@
/**
* Operation type, values of enum GNUNET_SET_OperationType
*/
- uint16_t operation GNUNET_PACKED;
+ uint32_t operation GNUNET_PACKED;
/**
* application id
Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c 2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/set_api.c 2013-08-12 14:34:16 UTC (rev 28534)
@@ -169,6 +169,13 @@
struct GNUNET_MQ_Handle* mq;
/**
+ * Configuration handle for the listener, stored
+ * here to be able to reconnect transparently on
+ * connection failure.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
* Function to call on a new incoming request,
* or on error.
*/
@@ -178,9 +185,30 @@
* Closure for listen_cb.
*/
void *listen_cls;
+
+ /**
+ * Operation we listen for.
+ */
+ enum GNUNET_SET_OperationType operation;
+
+ /**
+ * Application ID we listen for.
+ */
+ struct GNUNET_HashCode app_id;
+
+ /**
+ * Time to wait until we try to reconnect on failure.
+ */
+ struct GNUNET_TIME_Relative reconnect_backoff;
};
+/* forward declaration */
+static void
+listen_connect (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* Handle element for iteration over the set.
*
@@ -198,7 +226,8 @@
if (NULL == set->iterator)
return;
- element.type = htons (mh->type);
+ element.size = ntohs (mh->size) - sizeof (struct
GNUNET_SET_IterResponseMessage);
+ element.type = htons (msg->element_type);
element.data = &msg[1];
set->iterator (set->iterator_cls, &element);
}
@@ -266,6 +295,7 @@
oh->result_cb (oh->result_cls, &e, result_status);
}
+
/**
* Handle request message for a listen operation
*
@@ -297,9 +327,9 @@
amsg->request_id = htonl (0);
amsg->accept_reject_id = msg->accept_id;
GNUNET_MQ_send (lh->mq, mqm);
- GNUNET_free (req);
LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
}
+ GNUNET_free (req);
LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
@@ -313,8 +343,14 @@
{
struct GNUNET_SET_ListenHandle *lh = cls;
- /* FIXME: why do you do this? */
- lh->listen_cb (lh->listen_cls, NULL, NULL, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n");
+ GNUNET_CLIENT_disconnect (lh->client);
+ lh->client = NULL;
+ GNUNET_MQ_destroy (lh->mq);
+ lh->mq = NULL;
+
+ GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh);
+ lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
}
@@ -465,6 +501,7 @@
set->client = NULL;
GNUNET_MQ_destroy (set->mq);
set->mq = NULL;
+ GNUNET_free (set);
}
@@ -514,11 +551,49 @@
return oh;
}
+
/**
+ * Connect to the set service in order to listen
+ * for request.
+ *
+ * @param cls the listen handle to connect
+ * @param tc task context if invoked as a task, NULL otherwise
+ */
+static void
+listen_connect (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SET_ListenMessage *msg;
+ struct GNUNET_SET_ListenHandle *lh = cls;
+ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
+ GNUNET_MQ_HANDLERS_END
+ };
+
+ GNUNET_assert (NULL == lh->client);
+ lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
+ if (NULL == lh->client)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "could not connect to set (wrong configuration?), giving up
listening\n");
+ return;
+ }
+ GNUNET_assert (NULL == lh->mq);
+ lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
+
handle_client_listener_error, lh);
+ mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
+ msg->operation = htonl (lh->operation);
+ msg->app_id = lh->app_id;
+ GNUNET_MQ_send (lh->mq, mqm);
+}
+
+
+/**
* Wait for set operation requests for the given application id
*
* @param cfg configuration to use for connecting to
- * the set service
+ * the set service, needs to be valid for the lifetime of the
listen handle
* @param operation operation we want to listen for
* @param app_id id of the application that handles set operation requests
* @param listen_cb called for each incoming request matching the operation
@@ -534,25 +609,15 @@
void *listen_cls)
{
struct GNUNET_SET_ListenHandle *lh;
- struct GNUNET_MQ_Envelope *mqm;
- struct GNUNET_SET_ListenMessage *msg;
- static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
- GNUNET_MQ_HANDLERS_END
- };
lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
- lh->client = GNUNET_CLIENT_connect ("set", cfg);
lh->listen_cb = listen_cb;
lh->listen_cls = listen_cls;
- GNUNET_assert (NULL != lh->client);
- lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
-
handle_client_listener_error, lh);
- mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
- msg->operation = htons (operation);
- msg->app_id = *app_id;
- GNUNET_MQ_send (lh->mq, mqm);
-
+ lh->cfg = cfg;
+ lh->operation = operation;
+ lh->app_id = *app_id;
+ lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ listen_connect (lh, NULL);
return lh;
}
@@ -680,7 +745,6 @@
}
-
/**
* Iterate over all elements in the given set.
* Note that this operation involves transferring every element of the set
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28534 - in gnunet/src: include set,
gnunet <=