gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21012 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r21012 - gnunet/src/stream
Date: Wed, 18 Apr 2012 16:21:25 +0200

Author: harsha
Date: 2012-04-18 16:21:25 +0200 (Wed, 18 Apr 2012)
New Revision: 21012

Modified:
   gnunet/src/stream/Makefile.am
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/test_stream_2peers.c
   gnunet/src/stream/test_stream_local.c
   gnunet/src/stream/test_stream_local.conf
Log:
-fixed mesh application types array and makefile

Modified: gnunet/src/stream/Makefile.am
===================================================================
--- gnunet/src/stream/Makefile.am       2012-04-18 14:02:20 UTC (rev 21011)
+++ gnunet/src/stream/Makefile.am       2012-04-18 14:21:25 UTC (rev 21012)
@@ -21,13 +21,14 @@
 
 check_PROGRAMS = \
  test_stream_2peers \
- test_stream_2peers_halfclose
-# test_stream_local
+ test_stream_2peers_halfclose \
+ test_stream_local
 
 EXTRA_DIST = test_stream_local.conf
 
 if ENABLE_TEST_RUN
-TESTS = $(check_PROGRAMS)
+TESTS = test_stream_2peers \
+ test_stream_2peers_halfclose
 endif
 
 test_stream_2peers_SOURCES = \

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-04-18 14:02:20 UTC (rev 21011)
+++ gnunet/src/stream/stream_api.c      2012-04-18 14:21:25 UTC (rev 21012)
@@ -2753,6 +2753,7 @@
   struct GNUNET_STREAM_Socket *socket;
   struct GNUNET_PeerIdentity own_peer_id;
   enum GNUNET_STREAM_Option option;
+  GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
   va_list vargs;                /* Variable arguments */
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2790,7 +2791,7 @@
                                       NULL, /* No inbound tunnel handler */
                                       &tunnel_cleaner, /* FIXME: not required? 
*/
                                       client_message_handlers,
-                                      &app_port); /* We don't get inbound 
tunnels */
+                                      ports); /* We don't get inbound tunnels 
*/
   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
     {
       GNUNET_free (socket);
@@ -3003,6 +3004,7 @@
 {
   /* FIXME: Add variable args for passing configration options? */
   struct GNUNET_STREAM_ListenSocket *lsocket;
+  GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
   struct GNUNET_PeerIdentity our_peer_id;
 
   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
@@ -3017,7 +3019,7 @@
                                        &new_tunnel_notify,
                                        &tunnel_cleaner,
                                        server_message_handlers,
-                                       &app_port);
+                                       ports);
   GNUNET_assert (NULL != lsocket->mesh);
   return lsocket;
 }

Modified: gnunet/src/stream/test_stream_2peers.c
===================================================================
--- gnunet/src/stream/test_stream_2peers.c      2012-04-18 14:02:20 UTC (rev 
21011)
+++ gnunet/src/stream/test_stream_2peers.c      2012-04-18 14:21:25 UTC (rev 
21012)
@@ -101,7 +101,6 @@
 static struct GNUNET_CONFIGURATION_Handle *config;
 
 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
-static GNUNET_SCHEDULER_TaskIdentifier read_task;
 
 static char *data = "ABCD";
 static int result;
@@ -111,6 +110,81 @@
 
 
 /**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+                 enum GNUNET_STREAM_Status status,
+                 const void *input_data,
+                 size_t size);
+
+/**
+ * Task for calling STREAM_read
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_read_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PeerData *peer = cls;
+  
+  peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
+                                             GNUNET_TIME_relative_multiply
+                                             (GNUNET_TIME_UNIT_SECONDS, 5),
+                                             &input_processor,
+                                             peer);
+  GNUNET_assert (NULL != peer->io_read_handle);
+}
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void 
+write_completion (void *cls,
+                  enum GNUNET_STREAM_Status status,
+                  size_t size);
+
+
+/**
+ * Task for calling STREAM_write
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PeerData *peer = cls;
+  
+  peer->io_write_handle = 
+    GNUNET_STREAM_write (peer->socket,
+                         (void *) data,
+                         strlen(data) - peer->bytes_wrote,
+                         GNUNET_TIME_relative_multiply
+                         (GNUNET_TIME_UNIT_SECONDS, 5),
+                         &write_completion,
+                         peer);
+ 
+  GNUNET_assert (NULL != peer->io_write_handle);
+ }
+
+
+/**
  * Check whether peers successfully shut down.
  */
 static void
@@ -198,32 +272,12 @@
 do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
-  if (0 != read_task)
-    {
-      GNUNET_SCHEDULER_cancel (read_task);
-    }
   result = GNUNET_SYSERR;
   abort_task = 0;
   do_close (cls, tc);  
 }
 
-/**
- * Signature for input processor 
- *
- * @param cls the closure from GNUNET_STREAM_write/read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read 
- * @return number of bytes of processed from 'data' (any data remaining should 
be
- *         given to the next time the read processor is called).
- */
-static size_t
-input_processor (void *cls,
-                 enum GNUNET_STREAM_Status status,
-                 const void *input_data,
-                 size_t size);
 
-
 /**
  * The write completion function; called upon writing some data to stream or
  * upon error
@@ -237,7 +291,7 @@
                   enum GNUNET_STREAM_Status status,
                   size_t size)
 {
-  struct PeerData *peer = cls;
+  struct PeerData *peer=cls;
 
   GNUNET_assert (GNUNET_STREAM_OK == status);
   GNUNET_assert (size <= strlen (data));
@@ -245,15 +299,7 @@
 
   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
     {
-      peer->io_write_handle =
-        GNUNET_STREAM_write (peer->socket,
-                             (void *) data,
-                             strlen(data) - peer->bytes_wrote,
-                             GNUNET_TIME_relative_multiply
-                             (GNUNET_TIME_UNIT_SECONDS, 5),
-                             &write_completion,
-                             cls);
-      GNUNET_assert (NULL != peer->io_write_handle);
+      GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
     }
   else
     {
@@ -262,14 +308,8 @@
 
       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
         {
-          peer->io_read_handle =
-            GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                peer->socket,
-                                GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS, 5),
-                                &input_processor,
-                                cls);
-          GNUNET_assert (NULL!=peer->io_read_handle);
+          peer->bytes_read = 0;
+          GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
         }
       else
         {
@@ -291,24 +331,15 @@
 stream_open_cb (void *cls,
                 struct GNUNET_STREAM_Socket *socket)
 {
-  struct PeerData *peer;
-
+  struct PeerData *peer=cls;
+  
+  GNUNET_assert (&peer1 == peer);
   GNUNET_assert (socket == peer1.socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Stream established from peer1\n",
               GNUNET_i2s (&peer1.our_id));
-  peer = (struct PeerData *) cls;
   peer->bytes_wrote = 0;
-  GNUNET_assert (socket == peer1.socket);
-  GNUNET_assert (socket == peer->socket);
-  peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
-                                               (void *) data, /* data */
-                                               strlen(data),
-                                               GNUNET_TIME_relative_multiply
-                                               (GNUNET_TIME_UNIT_SECONDS, 5),
-                                               &write_completion,
-                                         cls);
-  GNUNET_assert (NULL != peer->io_write_handle);
+  GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
 }
 
 
@@ -337,13 +368,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Read operation timedout - reading again!\n");
       GNUNET_assert (0 == size);
-      peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket 
*)
-                                                 peer->socket,
-                                                 GNUNET_TIME_relative_multiply
-                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                 &input_processor,
-                                                 cls);
-      GNUNET_assert (NULL != peer->io_read_handle);
+      GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
       return 0;
     }
 
@@ -356,27 +381,14 @@
   
   if (peer->bytes_read < strlen (data))
     {
-      peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket 
*)
-                                                 peer->socket,
-                                                 GNUNET_TIME_relative_multiply
-                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                 &input_processor,
-                                                 cls);
-      GNUNET_assert (NULL != peer->io_read_handle);
+      GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
     }
   else 
     {
       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
         {
           peer->bytes_wrote = 0;
-          peer->io_write_handle = 
-            GNUNET_STREAM_write (peer->socket,
-                                 data,
-                                 strlen(data),
-                                 GNUNET_TIME_relative_multiply
-                                 (GNUNET_TIME_UNIT_SECONDS, 5),
-                                 &write_completion,
-                                 cls);
+          GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
         }
       else                      /* Peer1 has completed reading. End of tests */
         {
@@ -390,26 +402,6 @@
 
   
 /**
- * Scheduler call back; to be executed when a new stream is connected
- * Called from listen connect for peer2
- */
-static void
-stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  read_task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_assert (NULL != cls);
-  peer2.bytes_read = 0;
-  peer2.io_read_handle =
-    GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
-                        GNUNET_TIME_relative_multiply
-                        (GNUNET_TIME_UNIT_SECONDS, 5),
-                        &input_processor,
-                        (void *) &peer2);
-  GNUNET_assert (NULL != peer2.io_read_handle);
-}
-
-
-/**
  * Functions of this type are called upon new stream connection from other 
peers
  *
  * @param cls the closure from GNUNET_STREAM_listen
@@ -434,8 +426,8 @@
               GNUNET_i2s(initiator));
 
   peer2.socket = socket;
-  /* FIXME: reading should be done right now instead of a scheduled call */
-  read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
+  peer2.bytes_read = 0;
+  GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
   return GNUNET_OK;
 }
 
@@ -504,9 +496,9 @@
 {
   struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
 
-  /* GNUNET_log_setup ("test_stream_local", */
-  /*                   "DEBUG", */
-  /*                   NULL); */
+  GNUNET_log_setup ("test_stream_2peers",
+                    "DEBUG",
+                    NULL);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting test\n");
@@ -538,7 +530,7 @@
 {
   int ret;
 
-  char *argv2[] = { "test-stream-local",
+  char *argv2[] = { "test-stream-2peers",
                     "-L", "DEBUG",
                     "-c", "test_stream_local.conf",
                     NULL};
@@ -549,7 +541,7 @@
 
   ret =
       GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
-                          "test-stream-local", "nohelp", options, &run, NULL);
+                          "test-stream-2peers", "nohelp", options, &run, NULL);
 
   if (GNUNET_OK != ret)
   {

Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c       2012-04-18 14:02:20 UTC (rev 
21011)
+++ gnunet/src/stream/test_stream_local.c       2012-04-18 14:21:25 UTC (rev 
21012)
@@ -69,15 +69,18 @@
 static struct PeerData peer1;
 static struct PeerData peer2;
 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
-static struct GNUNET_CONFIGURATION_Handle *config;
+static struct GNUNET_CONFIGURATION_Handle *config_peer1;
+static struct GNUNET_CONFIGURATION_Handle *config_peer2;
 
 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
 static GNUNET_SCHEDULER_TaskIdentifier test_task;
-static GNUNET_SCHEDULER_TaskIdentifier read_task;
 
 static char *data = "ABCD";
 static int result;
 
+static int writing_success;
+static int reading_success;
+
 /**
  * Input processor
  *
@@ -175,7 +178,8 @@
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
   /* Free the duplicated configuration */
-  GNUNET_CONFIGURATION_destroy (config);
+  GNUNET_CONFIGURATION_destroy (config_peer1);
+  GNUNET_CONFIGURATION_destroy (config_peer2);
   GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
   GNUNET_OS_process_close (arm_pid);
 }
@@ -192,32 +196,13 @@
   {
     GNUNET_SCHEDULER_cancel (test_task);
   }
-  if (0 != read_task)
-    {
-      GNUNET_SCHEDULER_cancel (read_task);
-    }
+
   result = GNUNET_SYSERR;
   abort_task = 0;
   do_shutdown (cls, tc);
 }
 
-/**
- * Signature for input processor 
- *
- * @param cls the closure from GNUNET_STREAM_write/read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read 
- * @return number of bytes of processed from 'data' (any data remaining should 
be
- *         given to the next time the read processor is called).
- */
-static size_t
-input_processor (void *cls,
-                 enum GNUNET_STREAM_Status status,
-                 const void *input_data,
-                 size_t size);
 
-
 /**
  * The write completion function; called upon writing some data to stream or
  * upon error
@@ -231,10 +216,10 @@
                   enum GNUNET_STREAM_Status status,
                   size_t size)
 {
-  struct PeerData *peer=cls;;
+  struct PeerData *peer=cls;
 
   GNUNET_assert (GNUNET_STREAM_OK == status);
-  GNUNET_assert (size < strlen (data));
+  GNUNET_assert (size <= strlen (data));
   peer->bytes_wrote += size;
 
   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
@@ -243,11 +228,20 @@
     }
   else
     {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Writing completed\n");
+
       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
         {
           peer->bytes_read = 0;
           GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
         }
+      else
+        {
+          writing_success = GNUNET_YES;
+          if (GNUNET_YES == reading_success)
+            GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+        }
     }
 }
 
@@ -264,10 +258,12 @@
 {
   struct PeerData *peer=cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
-  peer->bytes_wrote = 0;
+  GNUNET_assert (&peer1 == peer);
   GNUNET_assert (socket == peer1.socket);
   GNUNET_assert (socket == peer->socket);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
+  peer->bytes_wrote = 0;
   GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
 }
 
@@ -291,7 +287,7 @@
   struct PeerData *peer = cls;
 
   GNUNET_assert (GNUNET_STREAM_OK == status);
-  GNUNET_assert (size < strlen (data));
+  GNUNET_assert (size <= strlen (data));
   GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 
                           (const char *) input_data,
                           size));
@@ -310,7 +306,9 @@
         }
       else                      /* Peer1 has completed reading. End of tests */
         {
-          GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+          reading_success = GNUNET_YES;
+          if (GNUNET_YES == writing_success)
+            GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
         }
     } 
   return size;
@@ -318,23 +316,9 @@
 
   
 /**
- * Scheduler call back; to be executed when a new stream is connected
- * Called from listen connect for peer2
- */
-static void
-stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  read_task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_assert (NULL != cls);
-  peer2.bytes_read = 0;
-  GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
-}
-
-
-/**
  * Functions of this type are called upon new stream connection from other 
peers
  *
- * @param cls the closure from GNUNET_STREAM_listen
+ * @param cls the PeerData of peer2
  * @param socket the socket representing the stream
  * @param initiator the identity of the peer who wants to establish a stream
  *            with us
@@ -346,13 +330,15 @@
            struct GNUNET_STREAM_Socket *socket,
            const struct GNUNET_PeerIdentity *initiator)
 {
+  struct PeerData *peer=cls;
   struct GNUNET_PeerIdentity self;
 
   GNUNET_assert (NULL != socket);
   GNUNET_assert (socket != peer1.socket);
+  GNUNET_assert (&peer2 == peer);
 
   /* Get our identity */
-  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
                                                                 &self));
   GNUNET_assert (0 == memcmp (&self,
                               initiator,
@@ -361,8 +347,9 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Peer connected: %s\n", GNUNET_i2s(initiator));
 
-  peer2.socket = socket;
-  read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
+  peer->socket = socket;
+  peer->bytes_read = 0;
+  GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
   return GNUNET_OK;
 }
 
@@ -380,22 +367,22 @@
 
   test_task = GNUNET_SCHEDULER_NO_TASK;
   /* Get our identity */
-  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
                                                                 &self));
 
-  peer2_listen_socket = GNUNET_STREAM_listen (config,
+  peer2_listen_socket = GNUNET_STREAM_listen (config_peer2,
                                               10, /* App port */
                                               &stream_listen_cb,
-                                              NULL);
+                                              &peer2);
   GNUNET_assert (NULL != peer2_listen_socket);
 
   /* Connect to stream library */
-  peer1.socket = GNUNET_STREAM_open (config,
+  peer1.socket = GNUNET_STREAM_open (config_peer1,
                                      &self,         /* Null for local peer? */
                                      10,           /* App port */
                                      &stream_open_cb,
-                                     (void *) &peer1);
-  GNUNET_assert (NULL != peer1.socket);                  
+                                     &peer1);
+  GNUNET_assert (NULL != peer1.socket);
 }
 
 /**
@@ -413,7 +400,8 @@
 #endif
                     NULL);
    /* Duplicate the configuration */
-   config = GNUNET_CONFIGURATION_dup (cfg);
+   config_peer1 = GNUNET_CONFIGURATION_dup (cfg);
+   config_peer2 = GNUNET_CONFIGURATION_dup (cfg);
    arm_pid =
      GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
                               "gnunet-service-arm",
@@ -428,7 +416,6 @@
                                     NULL);
    
    test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
-
 }
 
 /**
@@ -449,7 +436,7 @@
   struct GNUNET_GETOPT_CommandLineOption options[] = {
     GNUNET_GETOPT_OPTION_END
   };
-  
+
   ret =
       GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
                           "test-stream-local", "nohelp", options, &run, NULL);
@@ -465,6 +452,6 @@
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
     return 1;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
   return 0;
 }

Modified: gnunet/src/stream/test_stream_local.conf
===================================================================
--- gnunet/src/stream/test_stream_local.conf    2012-04-18 14:02:20 UTC (rev 
21011)
+++ gnunet/src/stream/test_stream_local.conf    2012-04-18 14:21:25 UTC (rev 
21012)
@@ -9,7 +9,7 @@
 AUTOSTART = YES
 ACCEPT_FROM = 127.0.0.1;
 HOSTNAME = localhost
-PORT = 10511
+PORT = 10700
 # PREFIX = valgrind --leak-check=full
 # PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]