gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19757 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r19757 - gnunet/src/stream
Date: Sun, 12 Feb 2012 13:28:31 +0100

Author: harsha
Date: 2012-02-12 13:28:31 +0100 (Sun, 12 Feb 2012)
New Revision: 19757

Modified:
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/stream_protocol.h
Log:
-added write operation

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-12 11:17:21 UTC (rev 19756)
+++ gnunet/src/stream/stream_api.c      2012-02-12 12:28:31 UTC (rev 19757)
@@ -30,10 +30,23 @@
 #include "stream_protocol.h"
 
 
+/**
+ * The maximum packet size of a stream packet
+ */
 #define MAX_PACKET_SIZE 64000
 
+/**
+ * The maximum payload a data message packet can carry
+ */
+static size_t max_payload_size = 
+  MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
 
 /**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
+
+/**
  * states in the Protocol
  */
 enum State
@@ -221,6 +234,11 @@
    * Read sequence number. This number's value is determined during handshake
    */
   uint32_t read_sequence_number;
+
+  /**
+   * receiver's available buffer
+   */
+  uint32_t receive_window_available;
 };
 
 
@@ -266,7 +284,19 @@
    * The bitmap of this IOHandle; Corresponding bit for a message is set when
    * it has been acknowledged by the receiver
    */
-  GNUNET_STREAM_AckBitmap bitmap;
+  GNUNET_STREAM_AckBitmap ack_bitmap;
+
+  /**
+   * receiver's available buffer
+   */
+  uint32_t receive_window_available;
+
+  /**
+   * Number of packets sent before waiting for an ack
+   *
+   * FIXME: Do we need this?
+   */
+  unsigned int sent_packets;
 };
 
 
@@ -392,7 +422,7 @@
  * @param value GNUNET_YES to on, GNUNET_NO to off
  */
 static void
-AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
                      unsigned int bit, 
                      int value)
 {
@@ -411,7 +441,7 @@
  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
  */
 static uint8_t
-AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
                       unsigned int bit)
 {
   GNUNET_assert (bit < 64);
@@ -419,7 +449,72 @@
 }
 
 
+
 /**
+ * Function called when Data Message is sent
+ *
+ * @param cls the io_handle corresponding to the Data Message
+ * @param socket the socket which was used
+ */
+static void
+write_data_finish_cb (void *cls,
+                      struct GNUNET_STREAM_Socket *socket)
+{
+  struct GNUNET_STREAM_IOHandle *io_handle = cls;
+
+  io_handle->sent_packets++;
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void 
+write_data (struct GNUNET_STREAM_Socket *socket)
+{
+  struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+  unsigned int packet;
+  int ack_packet;
+
+  ack_packet = -1;
+  /* Find the last acknowledged packet */
+  for (packet=0; packet < 64; packet++)
+    {
+      if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+                                              packet))
+        {
+          ack_packet = packet;
+        }
+    }
+  /* Resend packets which weren't ack'ed */
+  for (packet=0; packet < ack_packet; packet++)
+    {
+      if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+                                             packet))
+        {
+          queue_message (socket,
+                         &io_handle->messages[packet]->header,
+                         NULL,
+                         NULL);
+        }
+    }
+  packet = ack_packet + 1;
+  /* Now send new packets if there is enough buffer space */
+  while (io_handle->receive_window_available -=
+         io_handle->messages[packet]->header.header.size > 0)
+    {
+      queue_message (socket,
+                     &io_handle->messages[packet]->header,
+                     &write_data_finish_cb,
+                     io_handle);
+    }
+}
+
+
+/**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
  * @param cls the socket (set from GNUNET_MESH_connect)
@@ -520,6 +615,7 @@
   {
   case STATE_HELLO_WAIT:
       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+      socket->receive_window_available = ntohl (ack_msg->receive_window_size);
       /* Get the random sequence number */
       socket->write_sequence_number = 
         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
@@ -530,6 +626,7 @@
       reply->header.header.type = 
         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
       reply->sequence_number = htonl (socket->write_sequence_number);
+      reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
       queue_message (socket, 
                      &reply->header, 
                      &set_state_established, 
@@ -840,7 +937,9 @@
   GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_HELLO_WAIT == socket->state)
     {
-      socket->read_sequence_number = ntohs (ack_message->sequence_number);
+      socket->read_sequence_number = ntohl (ack_message->sequence_number);
+      socket->receive_window_available = 
+        ntohl (ack_message->receive_window_size);
       socket->state = STATE_ESTABLISHED;
     }
   else
@@ -1039,11 +1138,10 @@
 /**
  * Message Handler for mesh
  *
- * @param cls closure (set from GNUNET_MESH_connect)
+ * @param socket the socket through which the ack was received
  * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
  * @param sender who sent the message
- * @param ack the actual message
+ * @param ack the acknowledgment message
  * @param atsi performance data for the connection
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
@@ -1055,6 +1153,24 @@
            const struct GNUNET_STREAM_AckMessage *ack,
            const struct GNUNET_ATS_Information*atsi)
 {
+  switch (socket->state)
+    {
+    case (STATE_ESTABLISHED):
+      if (NULL == socket->write_handle)
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received DATA ACK when write_handle is NULL\n");
+          return GNUNET_OK;
+        }
+
+      socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap);
+      socket->write_handle->receive_window_available = 
+        ntohl (ack->receive_window_remaining);
+      write_data (socket);
+      break;
+    default:
+      break;
+    }
   return GNUNET_OK;
 }
 
@@ -1502,7 +1618,6 @@
   unsigned int packet;
   struct GNUNET_STREAM_IOHandle *io_handle;
   struct GNUNET_STREAM_DataMessage *data_msg;
-  size_t max_payload_size;
   size_t packet_size;
   const void *sweep;
 
@@ -1517,8 +1632,6 @@
       return NULL;
     }
       
-  max_payload_size = 
-    MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
   num_needed_packets = ceil (size / max_payload_size);
   if (64 < num_needed_packets) 
     {
@@ -1528,8 +1641,9 @@
     }
 
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+  io_handle->receive_window_available = socket->receive_window_available;
   sweep = data;
-  /* Divide the given area into packets for sending */
+  /* Divide the given buffer into packets for sending */
   for (packet=0; packet < num_needed_packets; packet++)
     {
       if ((packet + 1) * max_payload_size < size) 
@@ -1545,6 +1659,8 @@
       io_handle->messages[packet]->header.header.size = htons (packet_size);
       io_handle->messages[packet]->header.header.type =
         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+      io_handle->messages[packet]->sequence_number =
+        htons (socket->write_sequence_number++);
       data_msg = io_handle->messages[packet];
       memcpy (&data_msg[1],
               sweep,
@@ -1552,5 +1668,7 @@
       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
     }
 
+  write_data (socket);
+
   return io_handle;
 }

Modified: gnunet/src/stream/stream_protocol.h
===================================================================
--- gnunet/src/stream/stream_protocol.h 2012-02-12 11:17:21 UTC (rev 19756)
+++ gnunet/src/stream/stream_protocol.h 2012-02-12 12:28:31 UTC (rev 19757)
@@ -123,6 +123,8 @@
   /**
    * The sequence number of the Data Message upto which the receiver has filled
    * its buffer without any missing packets
+   *
+   * FIXME: Do we need this?
    */
   uint32_t base_sequence_number GNUNET_PACKED;
 
@@ -147,6 +149,10 @@
    */
   uint32_t sequence_number;
 
+  /**
+   * The size(in bytes) of the receive window on the peer sending this message
+   */
+  uint32_t receive_window_size;
 };
 
 GNUNET_NETWORK_STRUCT_END




reply via email to

[Prev in Thread] Current Thread [Next in Thread]