[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r20755 - in gnunet/src: include stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r20755 - in gnunet/src: include stream |
Date: |
Sun, 25 Mar 2012 15:33:20 +0200 |
Author: harsha
Date: 2012-03-25 15:33:20 +0200 (Sun, 25 Mar 2012)
New Revision: 20755
Modified:
gnunet/src/include/gnunet_stream_lib.h
gnunet/src/stream/README
gnunet/src/stream/stream_api.c
gnunet/src/stream/test_stream_local.c
Log:
-added write io cancellation
Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h 2012-03-25 12:48:09 UTC (rev
20754)
+++ gnunet/src/include/gnunet_stream_lib.h 2012-03-25 13:33:20 UTC (rev
20755)
@@ -217,13 +217,17 @@
struct GNUNET_STREAM_IOReadHandle;
/**
- * Tries to write the given data to the stream
+ * Tries to write the given data to the stream. The maximum size of data that
+ * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
+ * violation, however only the said number of maximum bytes will be written.
*
* @param socket the socket representing a stream
* @param data the data buffer from where the data is written into the stream
* @param size the number of bytes to be written from the data buffer
* @param timeout the timeout period
- * @param write_cont the function to call upon writing some bytes into the
stream
+ * @param write_cont the function to call upon writing some bytes into the
+ * stream
* @param write_cont_cls the closure
* @return handle to cancel the operation; NULL if a previous write is pending
*/
@@ -270,8 +274,20 @@
/**
- * Cancel pending write operation.
+ * Cancels pending write operation. Also cancels packet retransmissions which
+ * may have resulted otherwise.
*
+ * CAUTION: Normally a write operation is considered successful if the data
+ * given to it is sent and acknowledged by the receiver. As data is divided
+ * into packets, it is possible that not all packets are received by the
+ * receiver. Any missing packets are then retransmitted till the receiver
+ * acknowledges all packets or until a timeout . During this scenario if the
+ * write operation is cancelled all such retransmissions are also
+ * cancelled. This may leave the receiver's receive buffer incompletely filled
+ * as some missing packets are never retransmitted. So this operation should be
+ * used before shutting down transmission from our side or before closing the
+ * socket.
+ *
* @param ioh handle to operation to cancel
*/
void
Modified: gnunet/src/stream/README
===================================================================
--- gnunet/src/stream/README 2012-03-25 12:48:09 UTC (rev 20754)
+++ gnunet/src/stream/README 2012-03-25 13:33:20 UTC (rev 20755)
@@ -1,11 +1,11 @@
-The aim of the stream library is to provide stream connections between peers in
-GNUnet. This is a convenience library which hides the complexity of dividing
-data stream into packets, transmitting them and retransmitting them in case of
+Stream library provides stream connections between peers in GNUnet. This is a
+convenience library which hides the complexity of dividing data stream into
+packets, transmitting them and retransmitting them in case of communication
errors.
This library's API are similar to unix PIPE API. The user is expected to open a
stream to a listening target peer. Once the stream is established, the user can
-use it as a pipe. Any data written into the stream will be readable by the
-target peer.
+use it as a pipe. Any data written into the stream at one peer will be readable
+by the other peer and vice versa.
-This library uses mesh API for establishing streams between peers.
\ No newline at end of file
+This library uses mesh API for establishing tunnels between peers.
\ No newline at end of file
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-03-25 12:48:09 UTC (rev 20754)
+++ gnunet/src/stream/stream_api.c 2012-03-25 13:33:20 UTC (rev 20755)
@@ -25,6 +25,8 @@
*
* Decrement PEER intern count during socket close and listen close to free the
* memory used for PEER interning
+ *
+ * Add code for write io timeout
**/
/**
@@ -32,6 +34,8 @@
* @brief Implementation of the stream library
* @author Sree Harsha Totakura
*/
+
+
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
@@ -46,17 +50,17 @@
#define MAX_PACKET_SIZE 64000
/**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
+
+/**
* The maximum payload a data message packet can carry
*/
static size_t max_payload_size =
MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
/**
- * Receive buffer
- */
-#define RECEIVE_BUFFER_SIZE 4096000
-
-/**
* states in the Protocol
*/
enum State
@@ -239,7 +243,7 @@
/**
* Task identifier for the read io timeout task
*/
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
/**
* Task identifier for retransmission task after timeout
@@ -374,6 +378,11 @@
struct GNUNET_STREAM_IOWriteHandle
{
/**
+ * The socket to which this write handle is associated
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
* The packet_buffers associated with this Handle
*/
struct GNUNET_STREAM_DataMessage
*messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
@@ -398,13 +407,6 @@
* Number of bytes in this write handle
*/
size_t size;
-
- /**
- * Number of packets sent before waiting for an ack
- *
- * FIXME: Do we need this?
- */
- unsigned int sent_packets;
};
@@ -717,24 +719,7 @@
}
-
/**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
- io_handle->sent_packets++;
-}
-
-
-/**
* Writes data using the given socket. The amount of data written is limited by
* the receiver_window_size
*
@@ -788,15 +773,15 @@
ntohl (io_handle->messages[packet]->sequence_number));
copy_and_queue_message (socket,
&io_handle->messages[packet]->header,
- &write_data_finish_cb,
- io_handle);
+ NULL,
+ NULL);
packet++;
}
if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
socket->retransmission_timeout_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
+ (GNUNET_TIME_UNIT_SECONDS, 8),
&retransmission_timeout_task,
socket);
}
@@ -810,7 +795,7 @@
*/
static void
call_read_processor (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
size_t read_size;
@@ -842,8 +827,8 @@
GNUNET_assert (0 != valid_read_size);
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -891,7 +876,8 @@
memmove (socket->receive_buffer,
socket->receive_buffer
+ socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size -
socket->receive_buffer_boundaries[sequence_increase-1]);
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
@@ -936,7 +922,7 @@
GNUNET_STREAM_DataProcessor proc;
void *proc_cls;
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1129,6 +1115,7 @@
return GNUNET_YES;
}
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
@@ -2388,6 +2375,9 @@
{
struct MessageQueue *head;
+ GNUNET_break (NULL == socket->read_handle);
+ GNUNET_break (NULL == socket->write_handle);
+
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
/* socket closed with read task pending!? */
@@ -2548,6 +2538,7 @@
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ io_handle->socket = socket;
io_handle->write_cont = write_cont;
io_handle->write_cont_cls = write_cont_cls;
io_handle->size = size;
@@ -2642,9 +2633,10 @@
}
/* Setup the read timeout task */
- socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
-
&read_io_timeout,
- socket);
+ socket->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout,
+ &read_io_timeout,
+ socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%x: %s() END\n",
socket->our_id,
@@ -2661,7 +2653,26 @@
void
GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
{
- /* FIXME: Should cancel the write retransmission task */
+ struct GNUNET_STREAM_Socket *socket = ioh->socket;
+ unsigned int packet;
+
+ GNUNET_assert (NULL != socket->write_handle);
+ GNUNET_assert (socket->write_handle == ioh);
+
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == ioh->messages[packet]) break;
+ GNUNET_free (ioh->messages[packet]);
+ }
+
+ GNUNET_free (socket->write_handle);
+ socket->write_handle = NULL;
return;
}
Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c 2012-03-25 12:48:09 UTC (rev
20754)
+++ gnunet/src/stream/test_stream_local.c 2012-03-25 13:33:20 UTC (rev
20755)
@@ -101,7 +101,10 @@
static char *data = "ABCD";
static int result;
+static int writing_success;
+static int reading_success;
+
/**
* Check whether peers successfully shut down.
*/
@@ -197,9 +200,8 @@
enum GNUNET_STREAM_Status status,
size_t size)
{
- struct PeerData *peer;
+ struct PeerData *peer = cls;
- peer = (struct PeerData *) cls;
GNUNET_assert (GNUNET_STREAM_OK == status);
GNUNET_assert (size <= strlen (data));
peer->bytes_wrote += size;
@@ -232,6 +234,12 @@
cls);
GNUNET_assert (NULL!=peer->io_read_handle);
}
+ else
+ {
+ writing_success = GNUNET_YES;
+ if (GNUNET_YES == reading_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
}
}
@@ -335,7 +343,9 @@
}
else /* Peer1 has completed reading. End of tests */
{
- GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ reading_success = GNUNET_YES;
+ if (GNUNET_YES == writing_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
}
}
return size;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r20755 - in gnunet/src: include stream,
gnunet <=