gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20755 - in gnunet/src: include stream


From: gnunet
Subject: [GNUnet-SVN] r20755 - in gnunet/src: include stream
Date: Sun, 25 Mar 2012 15:33:20 +0200

Author: harsha
Date: 2012-03-25 15:33:20 +0200 (Sun, 25 Mar 2012)
New Revision: 20755

Modified:
   gnunet/src/include/gnunet_stream_lib.h
   gnunet/src/stream/README
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/test_stream_local.c
Log:
-added write io cancellation

Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h      2012-03-25 12:48:09 UTC (rev 
20754)
+++ gnunet/src/include/gnunet_stream_lib.h      2012-03-25 13:33:20 UTC (rev 
20755)
@@ -217,13 +217,17 @@
 struct GNUNET_STREAM_IOReadHandle;
 
 /**
- * Tries to write the given data to the stream
+ * Tries to write the given data to the stream. The maximum size of data that
+ * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
+ * violation, however only the said number of maximum bytes will be written.
  *
  * @param socket the socket representing a stream
  * @param data the data buffer from where the data is written into the stream
  * @param size the number of bytes to be written from the data buffer
  * @param timeout the timeout period
- * @param write_cont the function to call upon writing some bytes into the 
stream
+ * @param write_cont the function to call upon writing some bytes into the
+ *          stream 
  * @param write_cont_cls the closure
  * @return handle to cancel the operation; NULL if a previous write is pending
  */
@@ -270,8 +274,20 @@
 
 
 /**
- * Cancel pending write operation.
+ * Cancels pending write operation. Also cancels packet retransmissions which
+ * may have resulted otherwise.
  *
+ * CAUTION: Normally a write operation is considered successful if the data
+ * given to it is sent and acknowledged by the receiver. As data is divided
+ * into packets, it is possible that not all packets are received by the
+ * receiver. Any missing packets are then retransmitted till the receiver
+ * acknowledges all packets or until a timeout . During this scenario if the
+ * write operation is cancelled all such retransmissions are also
+ * cancelled. This may leave the receiver's receive buffer incompletely filled
+ * as some missing packets are never retransmitted. So this operation should be
+ * used before shutting down transmission from our side or before closing the
+ * socket.
+ *
  * @param ioh handle to operation to cancel
  */
 void

Modified: gnunet/src/stream/README
===================================================================
--- gnunet/src/stream/README    2012-03-25 12:48:09 UTC (rev 20754)
+++ gnunet/src/stream/README    2012-03-25 13:33:20 UTC (rev 20755)
@@ -1,11 +1,11 @@
-The aim of the stream library is to provide stream connections between peers in
-GNUnet. This is a convenience library which hides the complexity of dividing
-data stream into packets, transmitting them and retransmitting them in case of
+Stream library provides stream connections between peers in GNUnet. This is a
+convenience library which hides the complexity of dividing data stream into
+packets, transmitting them and retransmitting them in case of communication
 errors.
 
 This library's API are similar to unix PIPE API. The user is expected to open a
 stream to a listening target peer. Once the stream is established, the user can
-use it as a pipe. Any data written into the stream will be readable by the
-target peer.
+use it as a pipe. Any data written into the stream at one peer will be readable
+by the other peer and vice versa.
 
-This library uses mesh API for establishing streams between peers.
\ No newline at end of file
+This library uses mesh API for establishing tunnels between peers.
\ No newline at end of file

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-03-25 12:48:09 UTC (rev 20754)
+++ gnunet/src/stream/stream_api.c      2012-03-25 13:33:20 UTC (rev 20755)
@@ -25,6 +25,8 @@
  *
  * Decrement PEER intern count during socket close and listen close to free the
  * memory used for PEER interning
+ *
+ * Add code for write io timeout
  **/
 
 /**
@@ -32,6 +34,8 @@
  * @brief Implementation of the stream library
  * @author Sree Harsha Totakura
  */
+
+
 #include "platform.h"
 #include "gnunet_common.h"
 #include "gnunet_crypto_lib.h"
@@ -46,17 +50,17 @@
 #define MAX_PACKET_SIZE 64000
 
 /**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
+
+/**
  * The maximum payload a data message packet can carry
  */
 static size_t max_payload_size = 
   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
 
 /**
- * Receive buffer
- */
-#define RECEIVE_BUFFER_SIZE 4096000
-
-/**
  * states in the Protocol
  */
 enum State
@@ -239,7 +243,7 @@
   /**
    * Task identifier for the read io timeout task
    */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
 
   /**
    * Task identifier for retransmission task after timeout
@@ -374,6 +378,11 @@
 struct GNUNET_STREAM_IOWriteHandle
 {
   /**
+   * The socket to which this write handle is associated
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
    * The packet_buffers associated with this Handle
    */
   struct GNUNET_STREAM_DataMessage 
*messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
@@ -398,13 +407,6 @@
    * Number of bytes in this write handle
    */
   size_t size;
-
-  /**
-   * Number of packets sent before waiting for an ack
-   *
-   * FIXME: Do we need this?
-   */
-  unsigned int sent_packets;
 };
 
 
@@ -717,24 +719,7 @@
 }
 
 
-
 /**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
-                      struct GNUNET_STREAM_Socket *socket)
-{
-  struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
-  io_handle->sent_packets++;
-}
-
-
-/**
  * Writes data using the given socket. The amount of data written is limited by
  * the receiver_window_size
  *
@@ -788,15 +773,15 @@
                   ntohl (io_handle->messages[packet]->sequence_number));
       copy_and_queue_message (socket,
                               &io_handle->messages[packet]->header,
-                              &write_data_finish_cb,
-                              io_handle);
+                              NULL,
+                              NULL);
       packet++;
     }
 
   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
     socket->retransmission_timeout_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
-                                    (GNUNET_TIME_UNIT_SECONDS, 5),
+                                    (GNUNET_TIME_UNIT_SECONDS, 8),
                                     &retransmission_timeout_task,
                                     socket);
 }
@@ -810,7 +795,7 @@
  */
 static void
 call_read_processor (void *cls,
-                          const struct GNUNET_SCHEDULER_TaskContext *tc)
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   size_t read_size;
@@ -842,8 +827,8 @@
   GNUNET_assert (0 != valid_read_size);
 
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
 
   /* Call the data processor */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -891,7 +876,8 @@
   memmove (socket->receive_buffer,
            socket->receive_buffer 
            + socket->receive_buffer_boundaries[sequence_increase-1],
-           socket->receive_buffer_size - 
socket->receive_buffer_boundaries[sequence_increase-1]);
+           socket->receive_buffer_size
+           - socket->receive_buffer_boundaries[sequence_increase-1]);
   
   /* Shift the bitmap */
   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
@@ -936,7 +922,7 @@
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1129,6 +1115,7 @@
   return GNUNET_YES;
 }
 
+
 /**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
@@ -2388,6 +2375,9 @@
 {
   struct MessageQueue *head;
 
+  GNUNET_break (NULL == socket->read_handle);
+  GNUNET_break (NULL == socket->write_handle);
+
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
     {
       /* socket closed with read task pending!? */
@@ -2548,6 +2538,7 @@
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+  io_handle->socket = socket;
   io_handle->write_cont = write_cont;
   io_handle->write_cont_cls = write_cont_cls;
   io_handle->size = size;
@@ -2642,9 +2633,10 @@
     }
   
   /* Setup the read timeout task */
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
-                                                               
&read_io_timeout,
-                                                               socket);
+  socket->read_io_timeout_task_id =
+    GNUNET_SCHEDULER_add_delayed (timeout,
+                                  &read_io_timeout,
+                                  socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%x: %s() END\n",
               socket->our_id,
@@ -2661,7 +2653,26 @@
 void
 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
 {
-  /* FIXME: Should cancel the write retransmission task */
+  struct GNUNET_STREAM_Socket *socket = ioh->socket;
+  unsigned int packet;
+
+  GNUNET_assert (NULL != socket->write_handle);
+  GNUNET_assert (socket->write_handle == ioh);
+
+  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+      socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+
+  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+    {
+      if (NULL == ioh->messages[packet]) break;
+      GNUNET_free (ioh->messages[packet]);
+    }
+      
+  GNUNET_free (socket->write_handle);
+  socket->write_handle = NULL;
   return;
 }
 

Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c       2012-03-25 12:48:09 UTC (rev 
20754)
+++ gnunet/src/stream/test_stream_local.c       2012-03-25 13:33:20 UTC (rev 
20755)
@@ -101,7 +101,10 @@
 static char *data = "ABCD";
 static int result;
 
+static int writing_success;
+static int reading_success;
 
+
 /**
  * Check whether peers successfully shut down.
  */
@@ -197,9 +200,8 @@
                   enum GNUNET_STREAM_Status status,
                   size_t size)
 {
-  struct PeerData *peer;
+  struct PeerData *peer = cls;
 
-  peer = (struct PeerData *) cls;
   GNUNET_assert (GNUNET_STREAM_OK == status);
   GNUNET_assert (size <= strlen (data));
   peer->bytes_wrote += size;
@@ -232,6 +234,12 @@
                                 cls);
           GNUNET_assert (NULL!=peer->io_read_handle);
         }
+      else
+        {
+          writing_success = GNUNET_YES;
+          if (GNUNET_YES == reading_success) 
+            GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+        }
     }
 }
 
@@ -335,7 +343,9 @@
         }
       else                      /* Peer1 has completed reading. End of tests */
         {
-          GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+          reading_success = GNUNET_YES;
+          if (GNUNET_YES == writing_success)
+            GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
         }
     }
   return size;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]