gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r13628 - in gnunet/src: include transport


From: gnunet
Subject: [GNUnet-SVN] r13628 - in gnunet/src: include transport
Date: Tue, 9 Nov 2010 23:24:58 +0100

Author: brodski
Date: 2010-11-09 23:24:58 +0100 (Tue, 09 Nov 2010)
New Revision: 13628

Modified:
   gnunet/src/include/gnunet_container_lib.h
   gnunet/src/transport/plugin_transport_wlan.c
Log:
wlan: seperate fragment and session queue
wlan: schedule for timeouts used
dll: insert at tail
dll: insert before element


Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h   2010-11-09 20:42:34 UTC (rev 
13627)
+++ gnunet/src/include/gnunet_container_lib.h   2010-11-09 22:24:58 UTC (rev 
13628)
@@ -694,6 +694,23 @@
   (head) = (element); } while (0)
 
 /**
+ * Insert an element at the tail of a DLL. Assumes that head, tail and
+ * element are structs with prev and next fields.
+ *
+ * @param head pointer to the head of the DLL
+ * @param tail pointer to the tail of the DLL
+ * @param element element to insert
+ */
+#define GNUNET_CONTAINER_DLL_insert_tail(head,tail,element) do { \
+  (element)->prev = (tail); \
+  (element)->next = NULL; \
+  if ((head) == NULL) \
+    (head) = element; \
+  else \
+    (tail)->next = element; \
+  (tail) = (element); } while (0)
+
+/**
  * Insert an element into a DLL after the given other element.  Insert
  * at the head if the other element is NULL.
  *
@@ -719,9 +736,33 @@
   else \
     (element)->next->prev = (element); } while (0)
 
+/**
+ * Insert an element into a DLL before the given other element.  Insert
+ * at the tail if the other element is NULL.
+ *
+ * @param head pointer to the head of the DLL
+ * @param tail pointer to the tail of the DLL
+ * @param other prior element, NULL for insertion at head of DLL
+ * @param element element to insert
+ */
+#define GNUNET_CONTAINER_DLL_insert_before(head,tail,other,element) do { \
+  (element)->next = (other); \
+  if (NULL == other) \
+    { \
+      (element)->prev = (tail); \
+      (tail) = (element); \
+    } \
+  else \
+    { \
+      (element)->prev = (other)->prev; \
+      (other)->prev = (element); \
+    } \
+  if (NULL == (element)->prev) \
+    (head) = (element); \
+  else \
+    (element)->prev->next = (element); } while (0)
 
 
-
 /**
  * Remove an element from a DLL. Assumes
  * that head, tail and element are structs

Modified: gnunet/src/transport/plugin_transport_wlan.c
===================================================================
--- gnunet/src/transport/plugin_transport_wlan.c        2010-11-09 20:42:34 UTC 
(rev 13627)
+++ gnunet/src/transport/plugin_transport_wlan.c        2010-11-09 22:24:58 UTC 
(rev 13628)
@@ -48,6 +48,7 @@
 
 #define FRAGMENT_TIMEOUT 1000
 
+#define FRAGMENT_QUEUE_SIZE 10
 
 #define DEBUG_wlan GNUNET_NO
 
@@ -164,6 +165,24 @@
    */
   uint pendingsessions;
 
+  /**
+   * Messages in the fragmentation queue, head
+   */
+
+  struct FragmentMessage * pending_Fragment_Messages_head;
+
+  /**
+   * Messages in the fragmentation queue, tail
+   */
+
+  struct FragmentMessage * pending_Fragment_Messages_tail;
+
+  /**
+     * number of pending fragment message
+     */
+
+  uint pending_fragment_messages;
+
 };
 
 //TODO doxigen
@@ -177,14 +196,50 @@
 
 //TODO doxigen
 
-struct FragmentQueue
+struct AckQueue
 {
-       struct FragmentQueue * next;
-       struct FragmentQueue * prev;
+       struct AckQueue * next;
+       struct AckQueue * prev;
        int fragment_num;
 };
 
 /**
+ * Information kept for each message that is yet to
+ * be transmitted.
+ */
+struct PendingMessage
+{
+
+  /**
+   * The pending message
+   */
+  char *msg;
+
+  /**
+   * Continuation function to call once the message
+   * has been sent.  Can be NULL if there is no
+   * continuation to call.
+   */
+  GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
+
+  /**
+   * Cls for transmit_cont
+   */
+  void * transmit_cont_cls;
+
+  /**
+   * Timeout value for the pending message.
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Size of the message
+   */
+  size_t message_size;
+
+};
+
+/**
  * Session handle for connections.
  */
 struct Session
@@ -201,18 +256,12 @@
   struct Plugin *plugin;
 
   /**
-   * Messages currently pending for transmission
+   * Message currently pending for transmission
    * to this peer, if any.
    */
-  struct PendingMessage *pending_messages_head;
+  struct PendingMessage *pending_message;
 
   /**
-   * Messages currently pending for transmission
-   * to this peer, if any.
-   */
-  struct PendingMessage *pending_messages_tail;
-
-  /**
    * To whom are we talking to (set to our identity
    * if we are still waiting for the welcome message)
    */
@@ -241,7 +290,7 @@
   struct GNUNET_TIME_Absolute last_activity;
 
   /**
-   * current number for message incoming , to distinguish between the messages
+   * current number for message incoming, to distinguish between the messages
    */
   uint32_t message_id_in;
 
@@ -250,79 +299,76 @@
    */
   uint32_t message_id_out;
 
+  /**
+   * does this session have a message in the fragment queue
+   */
 
+  int has_fragment;
+
 };
 
-/**
- * Information kept for each message that is yet to
- * be transmitted.
- */
-struct PendingMessage
+
+
+
+struct FragmentMessage
 {
+       /**
+        * Session this message belongs to
+        */
 
-  /**
-   * This is a doubly-linked list.
-   */
-  struct PendingMessage *next;
+       struct Session *session;
 
-  /**
-   * This is a doubly-linked list.
-   */
-  struct PendingMessage *prev;
+       /**
+       * This is a doubly-linked list.
+       */
+       struct FragmentMessage *next;
 
-  /**
-   * The pending message
-   */
-  const char *msg;
+       /**
+       * This is a doubly-linked list.
+       */
+       struct FragmentMessage *prev;
 
-  /**
-   * Continuation function to call once the message
-   * has been sent.  Can be NULL if there is no
-   * continuation to call.
-   */
-  GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
+       /**
+       * The pending message
+       */
+       char *msg;
 
-  /**
-   * Cls for transmit_cont
-   */
-  void * transmit_cont_cls;
+       /**
+       * Timeout value for the pending message.
+       */
+       struct GNUNET_TIME_Absolute timeout;
 
-  /**
-   * Timeout value for the pending message.
-   */
-  struct GNUNET_TIME_Absolute timeout;
+       /**
+       * Timeout value for the pending fragments.
+       * Stores the time when the next msg fragment ack has to be received
+       */
+       struct GNUNET_TIME_Absolute next_ack;
 
-  /**
-   * Timeout value for the pending fragments.
-   * Stores the time when the last msg fragment ack was received
-   */
-  struct GNUNET_TIME_Absolute last_ack;
+       /**
+       * Sorted queue with the acks received for fragments; head
+       */
 
-  /**
-   * Sorted queue with the acks received for fragments; head
-   */
+       struct AckQueue * head;
 
-  struct FragmentQueue * head;
+       /**
+       * Sorted queue with the acks received for fragments; tail
+       */
 
-  /**
-   * Sorted queue with the acks received for fragments; tail
-   */
+       struct AckQueue * tail;
 
-  struct FragmentQueue * tail;
+       /**
+       * Size of the message
+       */
+       size_t message_size;
 
-  /**
-   * Size of the message
-   */
-  size_t message_size;
+       /**
+       * pos / next fragment number in the message, for 
fragmentation/segmentation,
+       * some acks can be missing but there is still time
+       */
+       uint32_t message_pos;
 
-  /**
-   * pos / next fragment number in the message, for fragmentation/segmentation,
-   * some acks can be missing but there is still time
-   */
-  uint32_t message_pos;
 };
 
-
 /**
  * Header for messages which need fragmentation
  */
@@ -401,6 +447,8 @@
                                   const void *addr,
                                   size_t addrlen);
 uint16_t getcrc16 (const char *msgbuf, size_t msgbuf_size);
+static void do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc);
+static void check_fragment_queue (struct Plugin * plugin);
 
 /**
  * get the next message number, at the moment just a random one
@@ -461,23 +509,25 @@
        queue->content->plugin = plugin;
        memcpy(queue->content->addr, addr, 6);
        queue->content->message_id_out = get_next_message_id();
+       queue->content->has_fragment = 0;
 
        //queue welcome message for new sessions, not realy needed
        //struct WelcomeMessage welcome;
        struct PendingMessage *pm;
-       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
GNUNET_HELLO_size(* (plugin->env->our_hello)));
-       pm->msg = (const char*) &pm[1];
+       pm = GNUNET_malloc (sizeof (struct PendingMessage));
+       pm->msg = GNUNET_malloc(GNUNET_HELLO_size(* (plugin->env->our_hello)));
        pm->message_size = GNUNET_HELLO_size(* (plugin->env->our_hello));
        //welcome.header.size = htons (GNUNET_HELLO_size(* 
(plugin->env->our_hello)));
        //welcome.header.type = htons (GNUNET_MESSAGE_TYPE_WLAN_ADVERTISEMENT);
        //welcome.clientIdentity = *plugin->env->my_identity;
-       memcpy (&pm[1], * plugin->env->our_hello, GNUNET_HELLO_size(* 
(plugin->env->our_hello)));
+       memcpy ( (pm->msg), * plugin->env->our_hello, GNUNET_HELLO_size(* 
(plugin->env->our_hello)));
        pm->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
-       GNUNET_CONTAINER_DLL_insert ((queue->content)->pending_messages_head,
-                                          
(queue->content)->pending_messages_tail,
-                                      pm);
+       queue->content->pending_message = pm;
        plugin->pendingsessions ++;
-       GNUNET_CONTAINER_DLL_insert_after(plugin->pending_Sessions, 
plugin->pending_Sessions_tail, plugin->pending_Sessions_tail, queue);
+       GNUNET_CONTAINER_DLL_insert_tail(plugin->pending_Sessions, 
plugin->pending_Sessions_tail, queue);
+
+       check_fragment_queue(plugin);
+
        return queue->content;
 
 }
@@ -520,8 +570,8 @@
 
 //TODO doxigen
 static void
-free_acks (struct PendingMessage * pm){
-       struct FragmentQueue * fq;
+free_acks (struct FragmentMessage * pm){
+       struct AckQueue * fq;
        while (pm->head != NULL){
                fq = pm->head;
                GNUNET_CONTAINER_DLL_remove(pm->head, pm->tail, fq);
@@ -529,316 +579,347 @@
        }
 }
 
-/**
- * Function called to when wlan helper is ready to get some data
- *
- * @param cls closure
- * @param GNUNET_SCHEDULER_TaskContext
- */
-
+//TODO doxigen
 static void
-do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Plugin * plugin = cls;
-  ssize_t bytes;
+delay_fragment_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc){
+       struct Plugin * plugin = cls;
+       plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK;
 
-  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
-    return;
+       if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+           return;
 
-  struct Session * session;
-  struct Sessionqueue * queue;
-  struct PendingMessage * pm;
-  struct IeeeHeader * wlanheader;
-  struct RadiotapHeader * radioHeader;
-  struct GNUNET_MessageHeader * msgheader;
-  struct FragmentationHeader fragheader;
-  uint16_t size = 0;
-  const char * copystart = NULL;
-  uint16_t copysize = 0;
-  uint copyoffset = 0;
-  struct FragmentQueue * akt = NULL;
-  int exit = 0;
+       // GNUNET_TIME_UNIT_FOREVER_REL is needed to clean up old msg
+       plugin->server_write_task
+               = GNUNET_SCHEDULER_add_write_file(GNUNET_TIME_UNIT_FOREVER_REL,
+                                                                               
        plugin->server_stdin_handle,
+                                                                               
   &do_transmit,
+                                                                               
   plugin);
+}
 
-  int i = 0;
 
-  struct GNUNET_TIME_Absolute nextsend;
-  struct GNUNET_TIME_Relative timeout;
-  struct Sessionqueue * nextsession = NULL;
+//TODO doxigen
+struct GNUNET_TIME_Relative
+get_next_frag_timeout (struct FragmentMessage * fm)
+{
+       return 
GNUNET_TIME_relative_min(GNUNET_TIME_absolute_get_remaining(fm->next_ack), 
GNUNET_TIME_absolute_get_remaining(fm->timeout));
+}
 
-  timeout.rel_value = FRAGMENT_TIMEOUT;
-  nextsend = GNUNET_TIME_absolute_get_forever();
+//TODO doxigen
+/**
+ * Function to get the timeout value for acks for this session
+ */
 
-  queue = plugin->pending_Sessions;
+struct GNUNET_TIME_Relative
+get_ack_timeout (struct FragmentMessage * fm){
+       struct GNUNET_TIME_Relative timeout;
+       timeout.rel_value = FRAGMENT_TIMEOUT;
+       return timeout;
+}
 
-  // check if the are some pending sessions/messages ...
-  GNUNET_assert(queue != NULL);
+//TODO doxigen
+/**
+ * Function to set the timer for the next timeout of the fragment queue
+ */
+static void
+check_next_fragment_timeout (struct Plugin * plugin){
+       struct FragmentMessage * fm;
+       if (plugin->server_write_task != GNUNET_SCHEDULER_NO_TASK){
+               GNUNET_SCHEDULER_cancel(plugin->server_write_task);
+       }
+       fm = plugin->pending_Fragment_Messages_head;
+       if (fm != NULL){
+               plugin->server_write_task = 
GNUNET_SCHEDULER_add_delayed(get_next_frag_timeout(fm), &delay_fragment_task, 
plugin);
+       }
+}
 
-  session = queue->content;
-  GNUNET_assert(session != NULL);
+//TODO doxigen
+/**
+ * Function to get the next queued Session, removes the session from the queue
+ */
 
-  pm = session->pending_messages_head;
-  GNUNET_assert(pm != NULL);
+static struct Session *
+get_next_queue_Session (struct Plugin * plugin){
+       struct Session * session;
+       struct Sessionqueue * sessionqueue;
+       struct Sessionqueue * sessionqueue_alt;
+       struct PendingMessage * pm;
+       sessionqueue = plugin->pending_Sessions;
+       while (sessionqueue != NULL){
+               session = sessionqueue->content;
+               pm = session->pending_message;
 
-  // get next valid session
-  // check if this session is only waiting to receive the acks for an already 
send fragments to finish it
-  // timeout is not reached
-  for (i = 0; i < plugin->pendingsessions; i++){
+               //check for message timeout
+               if (GNUNET_TIME_absolute_get_remaining(pm->timeout).rel_value > 
0){
+                       //check if session has no message in the fragment queue
+                       if (! session->has_fragment){
+                               plugin->pendingsessions --;
+                               GNUNET_CONTAINER_DLL_remove 
(plugin->pending_Sessions,
+                                               plugin->pending_Sessions_tail, 
sessionqueue);
+                               GNUNET_free(sessionqueue);
 
-         // check if the are some pending sessions/messages ...
-         GNUNET_assert(queue != NULL);
+                               return session;
+                       } else {
+                               sessionqueue = sessionqueue->next;
+                       }
+               } else {
 
-         session = queue->content;
-         GNUNET_assert(session != NULL);
-
-         pm = session->pending_messages_head;
-         GNUNET_assert(pm != NULL);
-
-         //save next session
-         nextsession = queue->next;
-         // test if message timed out
-         while (GNUNET_TIME_absolute_get_remaining(pm->timeout).rel_value == 
0){
-                 //remove message
-                 //free the acks
-                 free_acks (pm);
-                 //call the cont func that it did not work
-                 if (pm->transmit_cont != NULL)
+                       session->pending_message = NULL;
+                       //call the cont func that it did not work
+                       if (pm->transmit_cont != NULL)
                          pm->transmit_cont (pm->transmit_cont_cls,
-                                               &(session->target), 
GNUNET_SYSERR);
-                 //remove the message
-                 GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
-                                                                         
session->pending_messages_tail,
-                                                                         pm);
-                 GNUNET_free(pm);
+                                               &(session->target), 
GNUNET_SYSERR);
+                       GNUNET_free(pm->msg);
+                       GNUNET_free(pm);
 
-                 //test if there are no more messages pending for this session
-                 if (session->pending_messages_head == NULL){
+                       sessionqueue_alt = sessionqueue;
+                       sessionqueue = sessionqueue->next;
+                       plugin->pendingsessions --;
+                       GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions,
+                                       plugin->pending_Sessions_tail, 
sessionqueue_alt);
 
-                         //test if tail is null too
-                         GNUNET_assert(session->pending_messages_tail == NULL);
+                       GNUNET_free(sessionqueue_alt);
 
-                         plugin->pendingsessions --;
-                         GNUNET_CONTAINER_DLL_remove 
(plugin->pending_Sessions, plugin->pending_Sessions_tail, queue);
-                         GNUNET_free(queue);
-                         queue = NULL;
-                         break;
+               }
 
-                 } else {
-                         pm = session->pending_messages_head;
-                 }
 
-         }
-         // restore next session if necessary
-         if (queue == NULL){
-                 queue = nextsession;
-                 nextsession = NULL;
-                 //there are no more messages in this session
-                 continue;
-         }
-         nextsession = NULL;
+       }
+       return NULL;
+}
 
-         // test if retransmit is needed
-         if (GNUNET_TIME_absolute_get_duration(pm->last_ack).rel_value < 
FRAGMENT_TIMEOUT) {
-                 // get last offset for this message
-                 copyoffset = pm->message_size /(WLAN_MTU - sizeof(struct 
FragmentationHeader));
-                 // one more is the end
-                 copyoffset ++;
-                 // test if it is not the end
-                 if (copyoffset > pm->message_pos){
-                         nextsession = queue;
-                         break;
-                 }
+//TODO doxigen
+/**
+ * Function to sort the message into the message fragment queue
+ */
+static void
+sort_fragment_into_queue (struct Plugin * plugin, struct FragmentMessage * fm){
+       struct FragmentMessage * fm2;
+       //sort into the list at the right position
 
-                 nextsend = 
GNUNET_TIME_absolute_min(GNUNET_TIME_absolute_add(pm->last_ack, timeout), 
nextsend);
+       fm2 = plugin->pending_Fragment_Messages_head;
 
-                 GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions, 
plugin->pending_Sessions_tail, queue);
-                 //insert at the tail
-                 GNUNET_CONTAINER_DLL_insert_after (plugin->pending_Sessions,
-                                 plugin->pending_Sessions_tail,
-                                 plugin->pending_Sessions_tail, queue);
+       while (fm2 != NULL){
+               if (GNUNET_TIME_absolute_get_difference(fm2->next_ack, 
fm->next_ack).rel_value == 0){
+                       break;
+               } else {
+                       fm2 = fm2->next;
+               }
+       }
 
-                 //get next pending session
-                 queue = queue->next;
+       
GNUNET_CONTAINER_DLL_insert_after(plugin->pending_Fragment_Messages_head,
+                       plugin->pending_Fragment_Messages_tail,fm2,fm);
+}
 
-         } else {
-                 // retransmit
-                 nextsession = queue;
-                 break;
-         }
-  }
+//TODO doxigen
+/**
+ * Function to check if there is some space in the fragment queue
+ */
 
+static void
+check_fragment_queue (struct Plugin * plugin){
+       struct Session * session;
+       struct FragmentMessage * fm;
 
-  //test if there is one session to send something
-  if (nextsession != NULL){
-         queue = nextsession;
+       struct PendingMessage * pm;
 
-           // check if the are some pending sessions/messages ...
-           GNUNET_assert(queue != NULL);
+       if (plugin->pending_fragment_messages < FRAGMENT_QUEUE_SIZE){
+               session = get_next_queue_Session(plugin);
+               if (session != NULL){
+                       pm = session->pending_message;
+                       session->pending_message = NULL;
+                       session->has_fragment = 1;
+                       GNUNET_assert(pm != NULL);
 
-           session = queue->content;
-           GNUNET_assert(session != NULL);
+                       fm = GNUNET_malloc(sizeof(struct FragmentMessage));
+                       fm->message_size = pm->message_size;
+                       fm->msg = pm->msg;
+                       fm->session = session;
+                       fm->timeout.abs_value = pm->timeout.abs_value;
+                       fm->message_pos = 0;
+                       fm->next_ack = GNUNET_TIME_absolute_get();
 
-           pm = session->pending_messages_head;
-           GNUNET_assert(pm != NULL);
-  } else {
-         //nothing to send at the moment
-         plugin->server_read_task =
-                         GNUNET_SCHEDULER_add_delayed 
(GNUNET_TIME_absolute_get_remaining(nextsend),
-                                         &do_transmit, plugin);
+                       if (pm->transmit_cont != NULL)
+                                 pm->transmit_cont (pm->transmit_cont_cls,
+                                                       &(session->target), 
GNUNET_OK);
+                       GNUNET_free(pm);
 
-  }
+                       sort_fragment_into_queue(plugin,fm);
+                       plugin->pending_fragment_messages ++;
 
+                       //generate new message id
+                       session->message_id_out = get_next_message_id();
 
+                       //check if timeout changed
+                       check_next_fragment_timeout(plugin);
+               }
+       }
+}
 
-  if (pm->message_size > WLAN_MTU) {
-       size += sizeof(struct FragmentationHeader);
-       // check for retransmission
-       if (GNUNET_TIME_absolute_get_duration(pm->last_ack).rel_value > 
FRAGMENT_TIMEOUT) {
-               // TODO retransmit
-               // be positive and try again later :-D
-               pm->last_ack = GNUNET_TIME_absolute_get();
-               // find first missing fragment
-               exit = 0;
-               akt = pm->head;
-               pm->message_pos = 0;
+/**
+ * Function called to when wlan helper is ready to get some data
+ *
+ * @param cls closure
+ * @param GNUNET_SCHEDULER_TaskContext
+ */
 
-               //test if ack 0 was already received
-               while (akt != NULL){
-                       //if fragment is present, take next
-                       if (akt->fragment_num == pm->message_pos) {
-                               pm->message_pos ++;
-                       }
-                       //next ack is bigger then the fragment number
-                       //in case there is something like this: (acks) 1, 2, 5, 
6, ...
-                       //and we send 3 again, the next number should be 4
-                       if (akt->fragment_num > pm->message_pos) {
-                                                               break;
-                                                       }
+static void
+do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
 
-                       akt = akt->next;
+  struct Plugin * plugin = cls;
+  plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK;
 
-               }
+  ssize_t bytes;
 
+  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+    return;
 
-       }
+  struct Session * session;
+  struct FragmentMessage * fm;
+  struct IeeeHeader * wlanheader;
+  struct RadiotapHeader * radioHeader;
+  struct GNUNET_MessageHeader * msgheader;
+  struct FragmentationHeader fragheader;
+  uint16_t size = 0;
+  const char * copystart = NULL;
+  uint16_t copysize = 0;
+  uint copyoffset = 0;
+  struct AckQueue * akt = NULL;
+  //int exit = 0;
 
-       copyoffset = (WLAN_MTU - sizeof(struct FragmentationHeader)) * 
pm->message_pos;
-       fragheader.fragment_off_or_num = pm->message_pos;
-       fragheader.message_id = session->message_id_out;
+  fm = plugin->pending_Fragment_Messages_head;
+  GNUNET_assert(fm != NULL);
+  session = fm->session;
+  GNUNET_assert(session != NULL);
 
-       // start should be smaller then the packet size
-       //TODO send some other data if everything was send but not all acks are 
present
-       GNUNET_assert(copyoffset < pm->message_size);
-       copystart = pm->msg + copyoffset;
+  // test if message timed out
+  if (GNUNET_TIME_absolute_get_remaining(fm->timeout).rel_value == 0){
+         free_acks(fm);
+         GNUNET_assert(plugin->pending_fragment_messages > 0);
+         plugin->pending_fragment_messages --;
+         GNUNET_CONTAINER_DLL_remove(plugin->pending_Fragment_Messages_head,
+                         plugin->pending_Fragment_Messages_tail, fm);
 
-       //size of the fragment is either the MTU - overhead
-       //or the missing part of the message in case this is the last fragment
-       copysize = GNUNET_MIN(pm->message_size - copyoffset,
-                       WLAN_MTU - sizeof(struct FragmentationHeader));
-       fragheader.header.size = copysize;
-       fragheader.header.type = GNUNET_MESSAGE_TYPE_WLAN_FRAGMENT;
+         GNUNET_free(fm->msg);
 
-       //get the next missing fragment
-       exit = 0;
-       akt = pm->head;
-       pm->message_pos ++;
+         GNUNET_free(fm);
+         check_fragment_queue(plugin);
+  } else {
 
-       //test if ack was already received
-       while (akt != NULL){
-               //if fragment is present, take next
-               if (akt->fragment_num == pm->message_pos) {
-                       pm->message_pos ++;
-               }
-               //next ack is bigger then the fragment number
-               //in case there is something like this: (acks) 1, 2, 5, 6, ...
-               //and we send 3 again, the next number should be 4
-               if (akt->fragment_num > pm->message_pos) {
-                                                       break;
-                                               }
+         if (fm->message_size > WLAN_MTU) {
+               size += sizeof(struct FragmentationHeader);
+               // check/set for retransmission
+               if (GNUNET_TIME_absolute_get_duration(fm->next_ack).rel_value 
== 0) {
 
-               akt = akt->next;
-       }
+                       // be positive and try again later :-D
+                       fm->next_ack = 
GNUNET_TIME_relative_to_absolute(get_ack_timeout(fm));
+                       // find first missing fragment
+                       akt = fm->head;
+                       fm->message_pos = 0;
 
+                       //test if ack 0 was already received
+                       while (akt != NULL){
+                               //if fragment is present, take next
+                               if (akt->fragment_num == fm->message_pos) {
+                                       fm->message_pos ++;
+                               }
+                               //next ack is bigger then the fragment number
+                               //in case there is something like this: (acks) 
1, 2, 5, 6, ...
+                               //and we send 3 again, the next number should 
be 4
+                               else if (akt->fragment_num > fm->message_pos) {
+                                       break;
+                               }
 
-       } else {
-       // there is no need to split
-       copystart = pm->msg;
-       copysize = pm->message_size;
-       }
-  size += copysize;
-  size += sizeof(struct RadiotapHeader) + sizeof(struct IeeeHeader)
-                       + sizeof(struct GNUNET_MessageHeader);
-  msgheader = GNUNET_malloc(size);
-  msgheader->size = htons(size - sizeof(struct GNUNET_MessageHeader));
-  msgheader->type = GNUNET_MESSAGE_TYPE_WLAN_HELPER_DATA;
+                               akt = akt->next;
 
-  radioHeader = (struct RadiotapHeader*) &msgheader[1];
-  getRadiotapHeader(radioHeader);
+                       }
 
-  wlanheader = (struct IeeeHeader *) &radioHeader[1];
-  getWlanHeader(wlanheader);
 
+               }
 
-  //could be faster if content is just send and not copyed before
-  //fragmentheader is needed
-  if (pm->message_size > WLAN_MTU){
-  fragheader.message_crc = getcrc16(copystart, copysize);
-  memcpy(&wlanheader[1],&fragheader, sizeof(struct FragmentationHeader));
-  memcpy(&wlanheader[1] + sizeof(struct 
FragmentationHeader),copystart,copysize);
-  } else {
-  memcpy(&wlanheader[1],copystart,copysize);
-  }
+               copyoffset = (WLAN_MTU - sizeof(struct FragmentationHeader)) * 
fm->message_pos;
+               fragheader.fragment_off_or_num = htons(fm->message_pos);
+               fragheader.message_id = htonl(session->message_id_out);
 
-  bytes = GNUNET_DISK_file_write(plugin->server_stdin_handle, msgheader, size);
+               // start should be smaller then the packet size
+               GNUNET_assert(copyoffset < fm->message_size);
+               copystart = fm->msg + copyoffset;
 
+               //size of the fragment is either the MTU - overhead
+               //or the missing part of the message in case this is the last 
fragment
+               copysize = GNUNET_MIN(fm->message_size - copyoffset,
+                               WLAN_MTU - sizeof(struct FragmentationHeader));
+               fragheader.header.size = htons(copysize);
+               fragheader.header.type = GNUNET_MESSAGE_TYPE_WLAN_FRAGMENT;
 
 
+               //get the next missing fragment
+               akt = fm->head;
+               fm->message_pos ++;
 
+               //test if ack was already received
+               while (akt != NULL){
+                       //if fragment is present, take next
+                       if (akt->fragment_num == fm->message_pos) {
+                               fm->message_pos ++;
+                       }
+                       //next ack is bigger then the fragment number
+                       //in case there is something like this: (acks) 1, 2, 5, 
6, ...
+                       //and we send 3 again, the next number should be 4
+                       else if (akt->fragment_num > fm->message_pos) {
+                               break;
+                       }
 
+                       akt = akt->next;
+               }
+         } else {
+               // there is no need to split
+               copystart = fm->msg;
+               copysize = fm->message_size;
+         }
 
+       size += copysize;
+       size += sizeof(struct RadiotapHeader) + sizeof(struct IeeeHeader)
+               + sizeof(struct GNUNET_MessageHeader);
+       msgheader = GNUNET_malloc(size);
+       msgheader->size = htons(size - sizeof(struct GNUNET_MessageHeader));
+       msgheader->type = GNUNET_MESSAGE_TYPE_WLAN_HELPER_DATA;
 
+       radioHeader = (struct RadiotapHeader*) &msgheader[1];
+       getRadiotapHeader(radioHeader);
 
-  if (bytes < 1)
-    {
-      return;
-    }
+       wlanheader = (struct IeeeHeader *) &radioHeader[1];
+       getWlanHeader(wlanheader);
 
-  //plugin->server_read_task =
-  //GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
-  //                                plugin->server_stdout_handle, 
&wlan_plugin_helper_read, plugin);
 
-}
+       //could be faster if content is just send and not copyed before
+       //fragmentheader is needed
+       if (fm->message_size > WLAN_MTU){
+               fragheader.message_crc = htons(getcrc16(copystart, copysize));
+               memcpy(&wlanheader[1],&fragheader, sizeof(struct 
FragmentationHeader));
+               memcpy(&wlanheader[1] + sizeof(struct 
FragmentationHeader),copystart,copysize);
+       } else {
+               memcpy(&wlanheader[1],copystart,copysize);
+       }
 
+       bytes = GNUNET_DISK_file_write(plugin->server_stdin_handle, msgheader, 
size);
+       GNUNET_assert(bytes == size);
 
+       //check if this was the last fragment of this message, if true then 
queue at the end of the list
+       if (copysize + copyoffset >= fm->message_size){
+               GNUNET_assert(copysize + copyoffset == fm->message_size);
 
-/**
- * If we have pending messages, ask the server to
- * transmit them (schedule the respective tasks, etc.)
- *
- * @param Plugin env to get everything needed
- */
-static void
-process_pending_messages (struct Plugin * plugin)
-{
-  struct Sessionqueue * queue;
-  struct Session * session;
+               GNUNET_CONTAINER_DLL_remove 
(plugin->pending_Fragment_Messages_head,
+                               plugin->pending_Fragment_Messages_tail, fm);
 
-  if (plugin->pending_Sessions == NULL)
-    return;
+               
GNUNET_CONTAINER_DLL_insert_tail(plugin->pending_Fragment_Messages_head,
+                               plugin->pending_Fragment_Messages_tail, fm);
+               // if fragments have opimized timeouts
+               //sort_fragment_into_queue(plugin,fm);
 
-  queue = plugin->pending_Sessions;
-  //contet should not be empty
-  GNUNET_assert(queue->content != NULL);
+       }
+       check_next_fragment_timeout(plugin);
 
-  session = queue->content;
-  //pending sessions should have some msg
-  GNUNET_assert(session->pending_messages_head != NULL);
-
-  // GNUNET_TIME_UNIT_FOREVER_REL is needed to clean up old msg
-  plugin->server_write_task
-    = GNUNET_SCHEDULER_add_write_file(GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                               
        plugin->server_stdin_handle,
-                                           &do_transmit,
-                                           plugin);
+  }
 }
 
 
@@ -940,48 +1021,24 @@
   queue_Session(plugin, session);
 
   //queue message in session
-  newmsg = GNUNET_malloc(sizeof(struct PendingMessage) + msgbuf_size + 
sizeof(struct WlanHeader));
-  newmsg->msg = (const char*) &newmsg[1];
-  wlanheader = (struct WlanHeader *) &newmsg[1];
-  //copy msg to buffer, not fragmented / segmented yet, but with message header
-  wlanheader->header.size = msgbuf_size;
-  wlanheader->header.type = GNUNET_MESSAGE_TYPE_WLAN_DATA;
-  wlanheader->target = *target;
-  wlanheader->crc = getcrc32(msgbuf, msgbuf_size);
-  memcpy(&wlanheader[1], msgbuf, msgbuf_size);
-  newmsg->transmit_cont = cont;
-  newmsg->transmit_cont_cls = cont_cls;
-  newmsg->timeout = GNUNET_TIME_relative_to_absolute(timeout);
-  newmsg->message_pos = 0;
-  newmsg->message_size = msgbuf_size + sizeof(struct WlanHeader);
-  newmsg->next = NULL;
-
-  //check if queue is empty
-  struct PendingMessage * tailmsg;
-  tailmsg = session->pending_messages_tail;
-
-  //new tail is the new msg
-  session->pending_messages_tail = newmsg;
-  newmsg->prev = tailmsg;
-
-  //test if tail was not NULL (queue is empty)
-  if (tailmsg == NULL){
-         // head should be NULL too
-         GNUNET_assert(session->pending_messages_head == NULL);
-
-         session->pending_messages_head = newmsg;
-
+  if (session->pending_message == NULL){
+       newmsg = GNUNET_malloc(sizeof(struct PendingMessage));
+       (newmsg->msg) = GNUNET_malloc(msgbuf_size + sizeof(struct WlanHeader));
+       wlanheader = (struct WlanHeader *) newmsg->msg;
+       //copy msg to buffer, not fragmented / segmented yet, but with message 
header
+       wlanheader->header.size = htons(msgbuf_size);
+       wlanheader->header.type = GNUNET_MESSAGE_TYPE_WLAN_DATA;
+       wlanheader->target = *target;
+       wlanheader->crc = getcrc32(msgbuf, msgbuf_size);
+       memcpy(&wlanheader[1], msgbuf, msgbuf_size);
+       newmsg->transmit_cont = cont;
+       newmsg->transmit_cont_cls = cont_cls;
+       newmsg->timeout = GNUNET_TIME_relative_to_absolute(timeout);
+       newmsg->message_size = msgbuf_size + sizeof(struct WlanHeader);
   } else {
-         //next at the tail should be NULL
-         GNUNET_assert(tailmsg->next == NULL);
-
-         //queue the msg
-         tailmsg->next = newmsg;
+         //TODO if message is send while hello is still pending, other cases 
should not occur
   }
-
-  process_pending_messages(plugin);
-
-
+  check_fragment_queue(plugin);
   //FIXME not the correct size
   return msgbuf_size;
 
@@ -1012,16 +1069,12 @@
                GNUNET_assert (queue->content == NULL);
                if (memcmp(target, &(queue->content->target), sizeof(struct 
GNUNET_PeerIdentity)) == 0)
                  {
-                       // sesion found
+                       // session found
                        // remove PendingMessage
-                       while (queue->content->pending_messages_head != NULL){
-                               pm = queue->content->pending_messages_head;
-                               free_acks(pm);
-                               
GNUNET_CONTAINER_DLL_remove(queue->content->pending_messages_head,queue->content->pending_messages_tail,
 pm);
-                               GNUNET_free(pm);
+                       pm = queue->content->pending_message;
+                       GNUNET_free(pm->msg);
+                       GNUNET_free(pm);
 
-                       }
-
                        GNUNET_free(queue->content);
                        GNUNET_CONTAINER_DLL_remove(plugin->sessions, 
plugin->sessions_tail, queue);
                        GNUNET_free(queue);
@@ -1295,6 +1348,8 @@
   plugin = GNUNET_malloc (sizeof (struct Plugin));
   plugin->env = env;
   plugin->pendingsessions = 0;
+  plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK;
+  plugin->server_read_task = GNUNET_SCHEDULER_NO_TASK;
 
   wlan_transport_start_wlan_helper(plugin);
   plugin->consoltoken = GNUNET_SERVER_mst_create(&wlan_process_helper,plugin);
@@ -1310,6 +1365,7 @@
   api->check_address = &wlan_plugin_address_suggested;
   api->address_to_string = &wlan_plugin_address_to_string;
 
+
   start_next_message_id();
 
   return api;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]