gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20931 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20931 - gnunet/src/stream
Date: Tue, 10 Apr 2012 22:56:54 +0200

Author: harsha
Date: 2012-04-10 22:56:54 +0200 (Tue, 10 Apr 2012)
New Revision: 20931

Modified:
   gnunet/src/stream/stream_api.c
Log:
-retransmission of close messages

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-04-10 16:40:13 UTC (rev 20930)
+++ gnunet/src/stream/stream_api.c      2012-04-10 20:56:54 UTC (rev 20931)
@@ -445,11 +445,6 @@
   struct GNUNET_STREAM_Socket *socket;
 
   /**
-   * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
-   */
-  int operation;
-
-  /**
    * Shutdown completion callback
    */
   GNUNET_STREAM_ShutdownCompletion completion_cb;
@@ -458,6 +453,16 @@
    * Closure for completion callback
    */
   void *completion_cls;
+
+  /**
+   * Close message retransmission task id
+   */
+  GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+  /**
+   * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+   */
+  int operation;  
 };
 
 
@@ -719,6 +724,50 @@
 
 
 /**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+                               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+  struct GNUNET_STREAM_MessageHeader *msg;
+  struct GNUNET_STREAM_Socket *socket;
+
+  GNUNET_assert (NULL != shutdown_handle);
+  socket = shutdown_handle->socket;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  switch (shutdown_handle->operation)
+    {
+    case SHUT_RDWR:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+      break;
+    case SHUT_RD:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+      break;
+    case SHUT_WR:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+      break;
+    default:
+      GNUNET_free (msg);
+      shutdown_handle->close_msg_retransmission_task_id = 
+        GNUNET_SCHEDULER_NO_TASK;
+      return;
+    }
+  queue_message (socket, msg, NULL, NULL);
+  shutdown_handle->close_msg_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &close_msg_retransmission_task,
+                                  shutdown_handle);
+}
+
+
+/**
  * Function to modify a bit in GNUNET_STREAM_AckBitmap
  *
  * @param bitmap the bitmap to modify
@@ -1264,6 +1313,43 @@
 
 
 /**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+                              struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
+              socket->our_id);
+  socket->state = STATE_RECEIVE_CLOSE_WAIT;
+  GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+  socket->receive_buffer = NULL;
+  socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+                               struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
+              socket->our_id);
+  socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
+/**
  * Callback to set state to CLOSED
  *
  * @param cls the closure from queue_message
@@ -1640,7 +1726,7 @@
                   struct GNUNET_MESH_Tunnel *tunnel,
                   const struct GNUNET_PeerIdentity *sender,
                   const struct GNUNET_STREAM_MessageHeader *message,
-                  const struct GNUNET_ATS_Information*atsi)
+                  const struct GNUNET_ATS_Information *atsi)
 {
   struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
 
@@ -1648,7 +1734,6 @@
   switch (socket->state)
     {
     case STATE_CLOSE_WAIT:
-      socket->state = STATE_CLOSED;
       if ( (NULL == shutdown_handle) ||
            (SHUT_RDWR != shutdown_handle->operation) )
         {
@@ -1658,10 +1743,21 @@
                       socket->our_id);
           return GNUNET_OK;
         }
+
+      if (GNUNET_SCHEDULER_NO_TASK
+          != shutdown_handle->close_msg_retransmission_task_id)
+        {
+          GNUNET_SCHEDULER_cancel 
+            (shutdown_handle->close_msg_retransmission_task_id);
+          shutdown_handle->close_msg_retransmission_task_id =
+            GNUNET_SCHEDULER_NO_TASK;
+        }
+
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "%x: Received CLOSE_ACK from %x\n",
                   socket->our_id,
                   socket->other_peer);
+      socket->state = STATE_CLOSED;
       if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
         shutdown_handle->completion_cb(shutdown_handle->completion_cls,
                                        SHUT_RDWR);
@@ -1695,7 +1791,7 @@
                          void **tunnel_ctx,
                          const struct GNUNET_PeerIdentity *sender,
                          const struct GNUNET_MessageHeader *message,
-                         const struct GNUNET_ATS_Information*atsi)
+                         const struct GNUNET_ATS_Information *atsi)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
@@ -2571,10 +2667,23 @@
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                     "Existing read handle should be cancelled before shutting"
                     " down reading\n");
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+      queue_message (socket,
+                     msg,
+                     &set_state_receive_close_wait,
+                     NULL);
       break;
     case SHUT_WR:
       handle->operation = SHUT_WR;
-      
+      if (NULL != socket->write_handle)
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Existing write handle should be cancelled before shutting"
+                    " down writing\n");
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+      queue_message (socket,
+                     msg,
+                     &set_state_transmit_close_wait,
+                     NULL);
       break;
     case SHUT_RDWR:
       handle->operation = SHUT_RDWR;
@@ -2596,9 +2705,14 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "GNUNET_STREAM_shutdown called with invalid value for "
                   "parameter operation -- Ignoring\n");
+      GNUNET_free (msg);
       GNUNET_free (handle);
       return NULL;
     }
+  handle->close_msg_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &close_msg_retransmission_task,
+                                  handle);
   return handle;
 }
 
@@ -2611,6 +2725,9 @@
 void
 GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
 {
+  if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
+    GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+  GNUNET_free (handle);
   return;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]