[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r15914 - in gnunet/src: fragmentation include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r15914 - in gnunet/src: fragmentation include |
Date: |
Sat, 9 Jul 2011 19:25:44 +0200 |
Author: grothoff
Date: 2011-07-09 19:25:43 +0200 (Sat, 09 Jul 2011)
New Revision: 15914
Modified:
gnunet/src/fragmentation/defragmentation_new.c
gnunet/src/include/gnunet_fragmentation_lib.h
gnunet/src/include/gnunet_protocols.h
Log:
defrag
Modified: gnunet/src/fragmentation/defragmentation_new.c
===================================================================
--- gnunet/src/fragmentation/defragmentation_new.c 2011-07-09 16:48:59 UTC
(rev 15913)
+++ gnunet/src/fragmentation/defragmentation_new.c 2011-07-09 17:25:43 UTC
(rev 15914)
@@ -35,7 +35,7 @@
/**
* The time the fragment was received.
*/
- GNUNET_TIME_Absolute time;
+ struct GNUNET_TIME_Absolute time;
/**
* Number of the bit for the fragment (in [0,..,63]).
@@ -45,7 +45,10 @@
/**
- * Information we keep for one message that is being assembled.
+ * Information we keep for one message that is being assembled. Note
+ * that we keep the context around even after the assembly is done to
+ * handle 'stray' messages that are received 'late'. A message
+ * context is ONLY discarded when the queue gets too big.
*/
struct MessageContext
{
@@ -60,6 +63,11 @@
struct MessageContext *prev;
/**
+ * Associated defragmentation context.
+ */
+ struct GNUNET_DEFRAGMENT_Context *dc;
+
+ /**
* Pointer to the assembled message, allocated at the
* end of this struct.
*/
@@ -76,7 +84,7 @@
* Task scheduled for transmitting the next ACK to the
* other peer.
*/
- struct GNUNET_SCHEDULER_TaskIdentifier ack_task;
+ GNUNET_SCHEDULER_TaskIdentifier ack_task;
/**
* When did we receive which fragment? Used to calculate
@@ -127,11 +135,6 @@
struct GNUNET_STATISTICS_Handle *stats;
/**
- * Closure for 'proc' and 'ackp'.
- */
- void *cls;
-
- /**
* Head of list of messages we're defragmenting.
*/
struct MessageContext *head;
@@ -142,6 +145,11 @@
struct MessageContext *tail;
/**
+ * Closure for 'proc' and 'ackp'.
+ */
+ void *cls;
+
+ /**
* Function to call with defragmented messages.
*/
GNUNET_FRAGMENT_MessageProcessor proc;
@@ -169,7 +177,10 @@
*/
unsigned int list_size;
-
+ /**
+ * Maximum message size for each fragment.
+ */
+ uint16_t mtu;
};
@@ -177,6 +188,7 @@
* Create a defragmentation context.
*
* @param stats statistics context
+ * @param mtu the maximum message size for each fragment
* @param num_msgs how many fragmented messages
* to we defragment at most at the same time?
* @param cls closure for proc and ackp
@@ -187,6 +199,7 @@
*/
struct GNUNET_DEFRAGMENT_Context *
GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
unsigned int num_msgs,
void *cls,
GNUNET_FRAGMENT_MessageProcessor proc,
@@ -200,6 +213,7 @@
dc->proc = proc;
dc->ackp = ackp;
dc->num_msgs = num_msgs;
+ dc->mtu = mtu;
dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
return dc;
}
@@ -213,11 +227,50 @@
void
GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
{
+ struct MessageContext *mc;
+
+ while (NULL != (mc = dc->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size--;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ {
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (mc);
+ }
+ GNUNET_assert (0 == dc->list_size);
GNUNET_free (dc);
}
/**
+ * Send acknowledgement to the other peer now.
+ *
+ * @param cls the message context
+ * @param tc the scheduler context
+ */
+static void
+send_ack (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MessageContext *mc = cls;
+ struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
+ struct FragmentAcknowledgement fa;
+
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
+ fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
+ fa.fragment_id = htonl (mc->fragment_id);
+ fa.bits = GNUNET_htonll (mc->bits);
+ dc->ackp (dc->cls, &fa.header);
+}
+
+
+/**
* We have received a fragment. Process it.
*
* @param dc the context
@@ -227,6 +280,127 @@
GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
const struct GNUNET_MessageHeader *msg)
{
+ struct MessageContext *mc;
+ const struct FragmentHeader *fh;
+ uint16_t msize;
+ uint16_t foff;
+ uint32_t fid;
+ char *mbuf;
+ unsigned int bit;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative delay;
+
+ if (ntohs(msg->size) < sizeof (struct FragmentHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohs (msg->size) > dc->mtu)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ fh = (const struct FragmentHeader*) msg;
+ msize = ntohs (fh->total_size);
+ fid = ntohl (fh->fragment_id);
+ foff = ntohl (fh->offset);
+ if (foff >= msize)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Fragments received"),
+ 1,
+ GNUNET_NO);
+ mc = dc->head;
+ while ( (NULL != mc) &&
+ (fid != mc->fragment_id) )
+ mc = mc->next;
+ bit = foff / dc->mtu;
+ if (bit * dc->mtu + ntohs (msg->size)
+ - sizeof (struct FragmentHeader) > msize)
+ {
+ /* payload extends past total message size */
+ GNUNET_break_op (0);
+ return;
+ }
+ if ( (NULL != mc) && (msize != mc->total_size) )
+ {
+ /* inconsistent message size */
+ GNUNET_break_op (0);
+ return;
+ }
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL == mc)
+ {
+ mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
+ mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
+ mc->dc = dc;
+ mc->total_size = msize;
+ mc->fragment_id = fid;
+ mc->last_update = now;
+ mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct
FragmentHeader));
+ GNUNET_CONTAINER_DLL_insert (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size++;
+ if (dc->list_size > dc->num_msgs)
+ {
+ /* FIXME: discard oldest entry... */
+ }
+ }
+
+ /* copy data to 'mc' */
+ if (0 != (mc->bits & (1 << bit)))
+ {
+ mc->bits -= 1 << bit;
+ mbuf = (char* )&mc[1];
+ memcpy (&mbuf[bit * dc->mtu],
+ &fh[1],
+ ntohs (msg->size) - sizeof (struct FragmentHeader));
+ mc->last_update = now;
+ mc->frag_times[mc->frag_times_write_offset].time = now;
+ mc->frag_times[mc->frag_times_write_offset].bit = bit;
+ mc->frag_times_write_offset++;
+ if (0 == mc->bits)
+ {
+ /* message complete, notify! */
+ dc->proc (dc->cls,
+ mc->msg);
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Messages defragmented"),
+ 1,
+ GNUNET_NO);
+ }
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Duplicate fragments received"),
+ 1,
+ GNUNET_NO);
+ }
+
+ /* FIXME: update ACK timer (if 0==mc->bits, always ACK now!) */
+ delay = GNUNET_TIME_UNIT_SECONDS; /* FIXME: bad! */
+ if (mc->frag_times_write_offset == 1)
+ {
+ /* FIXME: use number-of-fragments * dc->delay */
+ }
+ else
+ {
+ /* FIXME: use best-fit regression */
+ }
+ /* FIXME: update dc->latency! */
+
+ if (0 == mc->bits)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &send_ack,
+ mc);
}
/* end of defragmentation_new.c */
Modified: gnunet/src/include/gnunet_fragmentation_lib.h
===================================================================
--- gnunet/src/include/gnunet_fragmentation_lib.h 2011-07-09 16:48:59 UTC
(rev 15913)
+++ gnunet/src/include/gnunet_fragmentation_lib.h 2011-07-09 17:25:43 UTC
(rev 15914)
@@ -112,7 +112,7 @@
/**
- * Defragmentation context.
+ * Defragmentation context (one per connection).
*/
struct GNUNET_DEFRAGMENT_Context;
@@ -121,6 +121,9 @@
* Create a defragmentation context.
*
* @param stats statistics context
+ * @param mtu the maximum message size for each fragment
+ * @param num_msgs how many fragmented messages
+ * to we defragment at most at the same time?
* @param cls closure for proc and ackp
* @param proc function to call with defragmented messages
* @param ackp function to call with acknowledgements (to send
@@ -129,6 +132,8 @@
*/
struct GNUNET_DEFRAGMENT_Context *
GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
+ unsigned int num_msgs,
void *cls,
GNUNET_FRAGMENT_MessageProcessor proc,
GNUNET_FRAGMENT_MessageProcessor ackp);
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2011-07-09 16:48:59 UTC (rev
15913)
+++ gnunet/src/include/gnunet_protocols.h 2011-07-09 17:25:43 UTC (rev
15914)
@@ -103,6 +103,12 @@
#define GNUNET_MESSAGE_TYPE_FRAGMENT 18
/**
+ * Acknowledgement of a FRAGMENT of a larger message.
+ * Managed by libgnunetfragment.
+ */
+#define GNUNET_MESSAGE_TYPE_FRAGMENT_ACK 19
+
+/**
* Message from the core saying that the transport
* server should start giving it messages. This
* should automatically trigger the transmission of
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r15914 - in gnunet/src: fragmentation include,
gnunet <=