gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19915 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r19915 - gnunet/src/stream
Date: Wed, 22 Feb 2012 11:10:18 +0100

Author: harsha
Date: 2012-02-22 11:10:18 +0100 (Wed, 22 Feb 2012)
New Revision: 19915

Modified:
   gnunet/src/stream/stream_api.c
Log:
added ack sending

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-22 10:02:56 UTC (rev 19914)
+++ gnunet/src/stream/stream_api.c      2012-02-22 10:10:18 UTC (rev 19915)
@@ -161,6 +161,26 @@
   struct GNUNET_TIME_Relative retransmit_timeout;
 
   /**
+   * The Acknowledgement Bitmap
+   */
+  GNUNET_STREAM_AckBitmap ack_bitmap;
+
+  /**
+   * Time when the Acknowledgement was queued
+   */
+  struct GNUNET_TIME_Absolute ack_time_registered;
+
+  /**
+   * Queued Acknowledgement deadline
+   */
+  struct GNUNET_TIME_Relative ack_time_deadline;
+
+  /**
+   * The task for sending timely Acks
+   */
+  GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
+  /**
    * The mesh handle
    */
   struct GNUNET_MESH_Handle *mesh;
@@ -243,7 +263,7 @@
   uint32_t read_sequence_number;
 
   /**
-   * receiver's available buffer
+   * receiver's available buffer after the last acknowledged packet
    */
   uint32_t receive_window_available;
 };
@@ -422,6 +442,76 @@
 
 
 /**
+ * Callback function for sending ack message
+ *
+ * @param cls closure the ACK message created in ack_task
+ * @param size number of bytes available in buffer
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_ack_notify (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+
+  if (0 == size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s called with size 0\n", __func__);
+      return 0;
+    }
+  GNUNET_assert (ack_msg->header.header.size <= size);
+  
+  size = ack_msg->header.header.size;
+  memcpy (buf, ack_msg, size);
+  return size;
+}
+
+
+/**
+ * Task for sending ACK message
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+ack_task (void *cls,
+          const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_AckMessage *ack_msg;
+
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    {
+      return;
+    }
+
+  socket->ack_task_id = 0;
+
+  /* Create the ACK Message */
+  ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
+  ack_msg->header.header.size = htons (sizeof (struct 
+                                               GNUNET_STREAM_AckMessage));
+  ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
+  ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
+  ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
+  ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+
+  /* Request MESH for sending ACK */
+  GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                     0, /* Corking */
+                                     1, /* Priority */
+                                     socket->retransmit_timeout,
+                                     &socket->other_peer,
+                                     ntohs (ack_msg->header.header.size),
+                                     &send_ack_notify,
+                                     ack_msg);
+
+  
+}
+
+
+/**
  * Function to modify a bit in GNUNET_STREAM_AckBitmap
  *
  * @param bitmap the bitmap to modify
@@ -577,7 +667,23 @@
               * MAX_PACKET_SIZE,
               payload,
               size);
-      /* FIXME: We have to send GNUNET_STREAM_AckMessage */
+      
+      /* Modify the ACK bitmap */
+      ackbitmap_modify_bit (&socket->bitmap,
+                            ntohl (msg->sequence_number) -
+                            socket->read_sequence_number,
+                            GNUNET_YES);
+
+      /* Start ACK sending task if one is not already present */
+      if (0 == socket->ack_task_id)
+       {
+         socket->ack_task_id = 
+           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                         (msg->ack_deadline),
+                                         &ack_task,
+                                         socket);
+       }
+      
       break;
 
     default:
@@ -1740,8 +1846,8 @@
   unsigned int num_needed_packets;
   unsigned int packet;
   struct GNUNET_STREAM_IOHandle *io_handle;
+  size_t packet_size;
   struct GNUNET_STREAM_DataMessage *data_msg;
-  size_t packet_size;
   const void *sweep;
 
   /* There is already a write request pending */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]