gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20644 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20644 - gnunet/src/stream
Date: Wed, 21 Mar 2012 13:32:55 +0100

Author: harsha
Date: 2012-03-21 13:32:55 +0100 (Wed, 21 Mar 2012)
New Revision: 20644

Modified:
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/test_stream_local.c
Log:
fixed read timeout problem and added ack sending incase of ignored data messages

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-03-21 09:43:54 UTC (rev 20643)
+++ gnunet/src/stream/stream_api.c      2012-03-21 12:32:55 UTC (rev 20644)
@@ -845,12 +845,19 @@
   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
 
   /* Call the data processor */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Calling read processor\n",
+              socket->our_id);
   read_size = 
     socket->read_handle->proc (socket->read_handle->proc_cls,
                                socket->status,
                                socket->receive_buffer + socket->copy_offset,
                                valid_read_size);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Read processor read %d bytes\n",
+              socket->our_id,
+              read_size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%x: Read processor completed successfully\n",
               socket->our_id);
 
@@ -917,17 +924,29 @@
                 const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  GNUNET_STREAM_DataProcessor proc;
+  void *proc_cls;
 
   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%x: Read task timedout - Cancelling it\n",
+                socket->our_id);
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
   GNUNET_assert (NULL != socket->read_handle);
-  
+  proc = socket->read_handle->proc;
+  proc_cls = socket->read_handle->proc_cls;
+
   GNUNET_free (socket->read_handle);
   socket->read_handle = NULL;
+  /* Call the read processor to signal timeout */
+  proc (proc_cls,
+        GNUNET_STREAM_TIMEOUT,
+        NULL,
+        0);
 }
 
 
@@ -986,9 +1005,18 @@
                       "%x: Ignoring received message with sequence number 
%u\n",
                       socket->our_id,
                       ntohl (msg->sequence_number));
+          /* Start ACK sending task if one is not already present */
+          if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+            {
+              socket->ack_task_id = 
+                GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                              (msg->ack_deadline),
+                                              &ack_task,
+                                              socket);
+            }
           return GNUNET_YES;
         }
-
+      
       /* Check if we have already seen this message */
       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                               relative_sequence_number))
@@ -998,6 +1026,15 @@
                       "number %u\n",
                       socket->our_id,
                       ntohl (msg->sequence_number));
+          /* Start ACK sending task if one is not already present */
+          if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+            {
+              socket->ack_task_id = 
+                GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                              (msg->ack_deadline),
+                                              &ack_task,
+                                              socket);
+            }
           return GNUNET_YES;
         }
 
@@ -1063,6 +1100,10 @@
           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
                                                  0)))
         {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "%x: Scheduling read processor\n",
+                      socket->our_id);
+
           socket->read_task_id = 
             GNUNET_SCHEDULER_add_now (&call_read_processor,
                                       socket);
@@ -1864,12 +1905,13 @@
                       socket->our_id);
           return GNUNET_OK;
         }
-      
+      /* FIXME: increment in the base sequence number is breaking current flow
+       */
       if (!((socket->write_sequence_number 
              - htonl (ack->base_sequence_number)) < 
GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Received DATA_ACK with unexpected base sequence",
+                      "%x: Received DATA_ACK with unexpected base sequence "
                       "number\n",
                       socket->our_id);
           return GNUNET_OK;
@@ -2532,14 +2574,19 @@
   struct GNUNET_STREAM_IOReadHandle *read_handle;
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s()\n", __func__);
+              "%x: %s()\n", 
+              socket->our_id,
+              __func__);
 
   /* Return NULL if there is already a read handle; the user has to cancel that
   first before continuing or has to wait until it is completed */
   if (NULL != socket->read_handle) return NULL;
 
+  GNUNET_assert (NULL != proc);
+
   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
   read_handle->proc = proc;
+  read_handle->proc_cls = proc_cls;
   socket->read_handle = read_handle;
 
   /* Check if we have a packet at bitmap 0 */
@@ -2556,7 +2603,9 @@
                                                                
&read_io_timeout,
                                                                socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s() END\n", __func__);
+              "%x: %s() END\n",
+              socket->our_id,
+              __func__);
   return read_handle;
 }
 
@@ -2569,6 +2618,7 @@
 void
 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
 {
+  /* FIXME: Should cancel the write retransmission task */
   return;
 }
 

Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c       2012-03-21 09:43:54 UTC (rev 
20643)
+++ gnunet/src/stream/test_stream_local.c       2012-03-21 12:32:55 UTC (rev 
20644)
@@ -287,11 +287,26 @@
 
   peer = (struct PeerData *) cls;
 
+  if (GNUNET_STREAM_TIMEOUT == status)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Read operation timedout - reading again!\n");
+      GNUNET_assert (0 == size);
+      peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket 
*)
+                                                 peer->socket,
+                                                 GNUNET_TIME_relative_multiply
+                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
+                                                 &input_processor,
+                                                 cls);
+      GNUNET_assert (NULL != peer->io_read_handle);
+      return 0;
+    }
+
   GNUNET_assert (GNUNET_STREAM_OK == status);
-  GNUNET_assert (size < strlen (data));
-  GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 
-                          (const char *) input_data,
-                          size));
+  GNUNET_assert (size <= strlen (data));
+  GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, 
+                               (const char *) input_data,
+                               size));
   peer->bytes_read += size;
   
   if (peer->bytes_read < strlen (data))




reply via email to

[Prev in Thread] Current Thread [Next in Thread]