[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 02/02: add prototypes for handlers for incoming me
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 02/02: add prototypes for handlers for incoming messages |
Date: |
Fri, 25 Jan 2019 18:35:10 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
commit 8e157def22a61958cd4b6c791da505a38062cc1d
Author: Christian Grothoff <address@hidden>
AuthorDate: Fri Jan 25 18:34:41 2019 +0100
add prototypes for handlers for incoming messages
---
src/include/gnunet_mq_lib.h | 45 +++
src/include/gnunet_protocols.h | 5 +
src/transport/gnunet-service-tng.c | 565 ++++++++++++++++++++++++++++++++-----
src/util/mq.c | 44 ++-
4 files changed, 589 insertions(+), 70 deletions(-)
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 3f67dc365..2a459636a 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -527,6 +527,51 @@ struct GNUNET_MQ_MessageHandler
}
+/**
+ * Insert code for a "check_" function that verifies that
+ * a given variable-length message received over the network
+ * is followed by another variable-length message that fits
+ * exactly with the given size. If the message @a m
+ * is not followed by another `struct GNUNET_MessageHeader`
+ * with a size that adds up to the total size, an error is logged
+ * and the function is returned with #GNUNET_NO.
+ *
+ * @param an IPC message with proper type to determine
+ * the size, starting with a `struct GNUNET_MessageHeader`
+ */
+#define GNUNET_MQ_check_boxed_message(m) \
+ { \
+ const struct GNUNET_MessageHeader *inbox = \
+ (const struct GNUNET_MessageHeader *) &m[1]; \
+ const struct GNUNET_MessageHeader *hdr = \
+ (const struct GNUNET_MessageHeader *) m; \
+ uint16_t slen = ntohs (hdr->size) - sizeof (*m); \
+ if ( (slen < sizeof (struct GNUNET_MessageHeader))||\
+ (slen != ntohs (inbox->size)) ) \
+ { \
+ GNUNET_break (0); \
+ return GNUNET_NO; \
+ } \
+ }
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ * #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+ const struct GNUNET_MessageHeader *mh);
+
+
/**
* Create a new envelope.
*
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 8593005d7..a8d716b3f 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3124,6 +3124,11 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219
+/**
+ * Transport affirming receipt of an ephemeral key.
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION 1220
+
/**
* Message sent to indicate to the transport that a monitor
* wants to observe certain events.
diff --git a/src/transport/gnunet-service-tng.c
b/src/transport/gnunet-service-tng.c
index 8febbdfff..3cccf5173 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,7 +33,7 @@
* transport-to-transport traffic)
*
* Implement:
- * - manage fragmentation/defragmentation, retransmission, track RTT, loss,
etc.
+ * - manage defragmentation, retransmission, track RTT, loss, etc.
*
* Easy:
* - use ATS bandwidth allocation callback and schedule transmissions!
@@ -165,8 +165,8 @@ struct TransportBackchannelEncapsulationMessage
/**
- * Message by which a peer confirms that it is using an
- * ephemeral key.
+ * Body by which a peqer confirms that it is using an ephemeral
+ * key.
*/
struct EphemeralConfirmation
{
@@ -191,6 +191,37 @@ struct EphemeralConfirmation
};
+/**
+ * Message by which a peqer confirms that it is using an ephemeral
+ * key.
+ */
+struct EphemeralConfirmationMessage
+{
+
+ /**
+ * Message header, type is
#GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Must be zero.
+ */
+ uint32_t reserved;
+
+ /**
+ * How long is this signature over the ephemeral key
+ * valid?
+ */
+ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+ /**
+ * Ephemeral key setup by the sender for @e target, used
+ * to encrypt the payload.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+};
+
+
/**
* Plaintext of the variable-size payload that is encrypted
* within a `struct TransportBackchannelEncapsulationMessage`
@@ -863,7 +894,12 @@ struct PendingMessage
* Kept in a MDLL of messages from this @a cpm (if @e pmt is
#PMT_FRAGMENT_BOX)
*/
struct PendingMessage *prev_frag;
-
+
+ /**
+ * This message, reliability boxed. Only possibly available if @e pmt is
#PMT_CORE.
+ */
+ struct PendingMessage *bpm;
+
/**
* Target of the request.
*/
@@ -1797,6 +1833,36 @@ free_fragment_tree (struct PendingMessage *root)
}
+/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures. @a pm must be a top-level pending message and not
+ * a fragment in the tree. The entire tree is freed (if applicable).
+ *
+ * @param pm the pending message to free
+ */
+static void
+free_pending_message (struct PendingMessage *pm)
+{
+ struct TransportClient *tc = pm->client;
+ struct Neighbour *target = pm->target;
+
+ if (NULL != tc)
+ {
+ GNUNET_CONTAINER_MDLL_remove (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ }
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ target->pending_msg_head,
+ target->pending_msg_tail,
+ pm);
+ free_fragment_tree (pm);
+ GNUNET_free_non_null (pm->bpm);
+ GNUNET_free (pm);
+}
+
+
/**
* Send a response to the @a pm that we have processed a
* "send" request with status @a success. We
@@ -1829,17 +1895,8 @@ client_send_response (struct PendingMessage *pm,
som->peer = target->pid;
GNUNET_MQ_send (tc->mq,
env);
- GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
}
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
- free_fragment_tree (pm);
- GNUNET_free (pm);
+ free_pending_message (pm);
}
@@ -2175,37 +2232,292 @@ handle_del_address (void *cls,
}
+/**
+ * Context from #handle_incoming_msg(). Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+ /**
+ * Which communicator provided us with the message.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Additional information for flow control and about the sender.
+ */
+ struct GNUNET_TRANSPORT_IncomingMessage im;
+};
+
+
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+ // FIXME: if (0 != ntohl (im->fc_on)) => send ACK when done to communicator
for flow control!
+ GNUNET_SERVICE_client_continue (cmc->tc->client);
+
+ GNUNET_free (cmc);
+}
+
+
+/**
+ * Communicator gave us an unencapsulated message to pass
+ * as-is to CORE. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param mh the message that was received
+ */
+static void
+handle_raw_message (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param fb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_fragment_box (void *cls,
+ const struct TransportFragmentBox *fb)
+{
+ // FIXME! check that off + size-of-payload <= total-length!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a fragment. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param fb the message that was received
+ */
+static void
+handle_fragment_box (void *cls,
+ const struct TransportFragmentBox *fb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment acknowledgement. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param fa the message that was received
+ */
+static void
+handle_fragment_ack (void *cls,
+ const struct TransportFragmentAckMessage *fa)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_reliability_box (void *cls,
+ const struct TransportReliabilityBox *rb)
+{
+ GNUNET_MQ_check_boxed_message (rb);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a reliability box. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param rb the message that was received
+ */
+static void
+handle_reliability_box (void *cls,
+ const struct TransportReliabilityBox *rb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability ack. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param ra the message that was received
+ */
+static void
+handle_reliability_ack (void *cls,
+ const struct TransportReliabilityAckMessage *ra)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param be the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_backchannel_encapsulation (void *cls,
+ const struct
TransportBackchannelEncapsulationMessage *be)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param be the message that was received
+ */
+static void
+handle_backchannel_encapsulation (void *cls,
+ const struct
TransportBackchannelEncapsulationMessage *be)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us an ephemeral confirmation. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param ec the message that was received
+ */
+static void
+handle_ephemeral_confirmation (void *cls,
+ const struct EphemeralConfirmationMessage *ec)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV learn message. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_learn (void *cls,
+ const struct TransportDVLearn *dvl)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV learn message. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls,
+ const struct TransportDVLearn *dvl)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_box (void *cls,
+ const struct TransportDVBox *dvb)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV box. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
#finish_cmc_handling() when done)
+ * @param dvb the message that was received
+ */
+static void
+handle_dv_box (void *cls,
+ const struct TransportDVBox *dvb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
/**
* Client notified us about transmission from a peer. Process the request.
*
- * @param cls the client
+ * @param cls a `struct TransportClient` which sent us the message
* @param obm the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
*/
static int
check_incoming_msg (void *cls,
const struct GNUNET_TRANSPORT_IncomingMessage *im)
{
struct TransportClient *tc = cls;
- uint16_t size;
- const struct GNUNET_MessageHeader *obmm;
if (CT_COMMUNICATOR != tc->type)
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
- size = ntohs (im->header.size) - sizeof (*im);
- if (size < sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- obmm = (const struct GNUNET_MessageHeader *) &im[1];
- if (size != ntohs (obmm->size))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ GNUNET_MQ_check_boxed_message (im);
return GNUNET_OK;
}
@@ -2213,7 +2525,6 @@ check_incoming_msg (void *cls,
/**
* Incoming meessage. Process the request.
*
- * @param cls the client
* @param im the send message that was received
*/
static void
@@ -2221,8 +2532,61 @@ handle_incoming_msg (void *cls,
const struct GNUNET_TRANSPORT_IncomingMessage *im)
{
struct TransportClient *tc = cls;
+ struct CommunicatorMessageContext *cmc = GNUNET_new (struct
CommunicatorMessageContext);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (fragment_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+ struct TransportFragmentBox,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (fragment_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
+ struct TransportFragmentAckMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (reliability_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+ struct TransportReliabilityBox,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (reliability_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+ struct TransportReliabilityAckMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+ struct TransportBackchannelEncapsulationMessage,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
+
GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
+ struct EphemeralConfirmationMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_learn,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+ struct TransportDVLearn,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+ struct TransportDVBox,
+ &cmc),
+ GNUNET_MQ_handler_end()
+ };
+ int ret;
- GNUNET_SERVICE_client_continue (tc->client);
+ cmc->tc = tc;
+ cmc->im = *im;
+ ret = GNUNET_MQ_handle_message (handlers,
+ (const struct GNUNET_MessageHeader *) &im[1]);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ GNUNET_free (cmc);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ /* unencapsulated 'raw' message */
+ handle_raw_message (&cmc,
+ (const struct GNUNET_MessageHeader *) &im[1]);
+ }
}
@@ -2268,6 +2632,23 @@ tracker_update_in_cb (void *cls)
}
+/**
+ * If necessary, generates the UUID for a @a pm
+ *
+ * @param pm pending message to generate UUID for.
+ */
+static void
+set_pending_message_uuid (struct PendingMessage *pm)
+{
+ if (pm->msg_uuid_set)
+ return;
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &pm->msg_uuid,
+ sizeof (pm->msg_uuid));
+ pm->msg_uuid_set = GNUNET_YES;
+}
+
+
/**
* Fragment the given @a pm to the given @a mtu. Adds
* additional fragments to the neighbour as well. If the
@@ -2284,13 +2665,7 @@ fragment_message (struct PendingMessage *pm,
{
struct PendingMessage *ff;
- if (GNUNET_NO == pm->msg_uuid_set)
- {
- GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &pm->msg_uuid,
- sizeof (pm->msg_uuid));
- pm->msg_uuid_set = GNUNET_YES;
- }
+ set_pending_message_uuid (pm);
/* This invariant is established in #handle_add_queue_message() */
GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
@@ -2390,24 +2765,50 @@ fragment_message (struct PendingMessage *pm,
static struct PendingMessage *
reliability_box_message (struct PendingMessage *pm)
{
- if (PMT_CORE != pm->pmt)
- {
- /* already fragmented or reliability boxed, or control message: do nothing
*/
- return pm;
- }
-
- if (0) // FIXME
+ struct TransportReliabilityBox rbox;
+ struct PendingMessage *bpm;
+ char *msg;
+
+ if (PMT_CORE != pm->pmt)
+ return pm; /* already fragmented or reliability boxed, or control
message: do nothing */
+ if (NULL != pm->bpm)
+ return pm->bpm; /* already computed earlier: do nothing */
+ GNUNET_assert (NULL == pm->head_frag);
+ if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
{
/* failed hard */
- // FIMXE: bitch
+ GNUNET_break (0);
client_send_response (pm,
GNUNET_NO,
0);
return NULL;
}
-
- /* FIXME: return boxed PM here! */
- return NULL;
+ bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (rbox) +
+ pm->bytes_msg);
+ bpm->target = pm->target;
+ bpm->frag_parent = pm;
+ GNUNET_CONTAINER_MDLL_insert (frag,
+ pm->head_frag,
+ pm->tail_frag,
+ bpm);
+ bpm->timeout = pm->timeout;
+ bpm->pmt = PMT_RELIABILITY_BOX;
+ bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+ set_pending_message_uuid (bpm);
+ rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
+ rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
+ rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+ rbox.msg_uuid = pm->msg_uuid;
+ msg = (char *) &bpm[1];
+ memcpy (msg,
+ &rbox,
+ sizeof (rbox));
+ memcpy (&msg[sizeof (rbox)],
+ &pm[1],
+ pm->bytes_msg);
+ pm->bpm = bpm;
+ return bpm;
}
@@ -2542,26 +2943,64 @@ transmit_on_queue (void *cls)
else if (PMT_CORE != pm->pmt)
{
/* This was an acknowledgement of some type, always free */
-
- struct Neighbour *neighbour = pm->target;
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pm);
- GNUNET_free (pm);
+ free_pending_message (pm);
}
else
{
/* message not finished, waiting for acknowledgement */
- // FIXME: update time by which we might retransmit 's' based on
- // queue characteristics (i.e. RTT)
-
- // FIXME: move 'pm' back in the transmission queue (simplistic: to
- // the end, better: with position depending on type, timeout,
- // etc.)
+ struct Neighbour *neighbour = pm->target;
+ /* Update time by which we might retransmit 's' based on queue
+ characteristics (i.e. RTT); it takes one RTT for the message to
+ arrive and the ACK to come back in the best case; but the other
+ side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
+ retransmitting. Note that in the future this heuristic should
+ likely be improved further (measure RTT stability, consider
+ message urgency and size when delaying ACKs, etc.) */
+ s->next_attempt = GNUNET_TIME_relative_to_absolute
+ (GNUNET_TIME_relative_multiply (queue->rtt,
+ 4));
+ if (s == pm)
+ {
+ struct PendingMessage *pos;
+
+ /* re-insert sort in neighbour list */
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pm);
+ pos = neighbour->pending_msg_tail;
+ while ( (NULL != pos) &&
+ (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+ pos = pos->prev_neighbour;
+ GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pos,
+ pm);
+ }
+ else
+ {
+ /* re-insert sort in fragment list */
+ struct PendingMessage *fp = s->frag_parent;
+ struct PendingMessage *pos;
+
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ s);
+ pos = fp->tail_frag;
+ while ( (NULL != pos) &&
+ (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+ pos = pos->prev_frag;
+ GNUNET_CONTAINER_MDLL_insert_after (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ pos,
+ s);
+ }
}
- /* finally, re-schedule self */
+ /* finally, re-schedule queue transmission task itself */
schedule_transmit_on_queue (queue);
}
diff --git a/src/util/mq.c b/src/util/mq.c
index 4dfcb72be..d2f5add19 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -214,6 +214,35 @@ struct GNUNET_MQ_Handle
void
GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
const struct GNUNET_MessageHeader *mh)
+{
+ int ret;
+
+ ret = GNUNET_MQ_handle_message (mq->handlers,
+ mh);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_MQ_inject_error (mq,
+ GNUNET_MQ_ERROR_MALFORMED);
+ return;
+ }
+}
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ * #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+ const struct GNUNET_MessageHeader *mh)
{
const struct GNUNET_MQ_MessageHandler *handler;
int handled = GNUNET_NO;
@@ -224,9 +253,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
"Received message of type %u and size %u\n",
mtype, msize);
- if (NULL == mq->handlers)
+ if (NULL == handlers)
goto done;
- for (handler = mq->handlers; NULL != handler->cb; handler++)
+ for (handler = handlers; NULL != handler->cb; handler++)
{
if (handler->type == mtype)
{
@@ -240,9 +269,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
LOG (GNUNET_ERROR_TYPE_ERROR,
"Received malformed message of type %u\n",
(unsigned int) handler->type);
- GNUNET_MQ_inject_error (mq,
- GNUNET_MQ_ERROR_MALFORMED);
- break;
+ return GNUNET_SYSERR;
}
if ( (NULL == handler->mv) ||
(GNUNET_OK ==
@@ -257,17 +284,20 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
LOG (GNUNET_ERROR_TYPE_ERROR,
"Received malformed message of type %u\n",
(unsigned int) handler->type);
- GNUNET_MQ_inject_error (mq,
- GNUNET_MQ_ERROR_MALFORMED);
+ return GNUNET_SYSERR;
}
break;
}
}
done:
if (GNUNET_NO == handled)
+ {
LOG (GNUNET_ERROR_TYPE_INFO,
"No handler for message of type %u and size %u\n",
mtype, msize);
+ return GNUNET_NO;
+ }
+ return GNUNET_OK;
}
--
To stop receiving notification emails like this one, please contact
address@hidden