gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3399 - in GNUnet: . src/include src/server src/transports


From: grothoff
Subject: [GNUnet-SVN] r3399 - in GNUnet: . src/include src/server src/transports src/util/network
Date: Wed, 13 Sep 2006 23:15:27 -0700 (PDT)

Author: grothoff
Date: 2006-09-13 23:15:21 -0700 (Wed, 13 Sep 2006)
New Revision: 3399

Modified:
   GNUnet/src/include/gnunet_util_network.h
   GNUnet/src/server/tcpserver.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/tcp6.c
   GNUnet/src/transports/udp.c
   GNUnet/src/transports/udp6.c
   GNUnet/src/transports/udp_helper.c
   GNUnet/src/util/network/select.c
   GNUnet/todo
Log:
working on udp tbench

Modified: GNUnet/src/include/gnunet_util_network.h
===================================================================
--- GNUnet/src/include/gnunet_util_network.h    2006-09-14 05:09:41 UTC (rev 
3398)
+++ GNUnet/src/include/gnunet_util_network.h    2006-09-14 06:15:21 UTC (rev 
3399)
@@ -410,6 +410,7 @@
  * @return NULL on error
  */
 struct SelectHandle * select_create(const char * desc,
+                                   int is_udp,
                                    struct GE_Context * ectx,
                                    struct LoadMonitor * mon,
                                    int sock,

Modified: GNUnet/src/server/tcpserver.c
===================================================================
--- GNUnet/src/server/tcpserver.c       2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/server/tcpserver.c       2006-09-14 06:15:21 UTC (rev 3399)
@@ -319,6 +319,7 @@
     return SYSERR;
   }
   selector = select_create("tcpserver",
+                          NO,
                           ectx,
                           NULL,
                           listenerFD,

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/tcp.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -330,10 +330,11 @@
     return SYSERR;
   }
   selector = select_create("tcp",
+                          NO,
                           ectx,
                           coreAPI->load_monitor,
                           s,
-                          sizeof(IPaddr),
+                          sizeof(struct sockaddr_in),
                           TCP_TIMEOUT,
                           &select_message_handler,
                           NULL,

Modified: GNUnet/src/transports/tcp6.c
===================================================================
--- GNUnet/src/transports/tcp6.c        2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/tcp6.c        2006-09-14 06:15:21 UTC (rev 3399)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2003, 2004, 2005 Christian Grothoff (and other contributing authors)
+     (C) 2003, 2004, 2005, 2006 Christian Grothoff (and other contributing 
authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -339,6 +339,7 @@
     return SYSERR;
   }
   selector = select_create("tcp6",
+                          NO,
                           ectx,
                           coreAPI->load_monitor,
                           s,

Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -98,41 +98,46 @@
   const int on = 1;
 
   sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
-  if (sock < 0)
+  if (sock < 0) {
     GE_DIE_STRERROR(ectx,
                    GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
                    "socket");
+    return -1;
+  }
   if ( SETSOCKOPT(sock,
                  SOL_SOCKET,
                  SO_REUSEADDR, 
                  &on,
-                 sizeof(on)) < 0 )
+                 sizeof(on)) < 0 ) {
     GE_DIE_STRERROR(ectx,
                    GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
                    "setsockopt");
-  if (port != 0) {
-    memset(&sin, 
-          0, 
-          sizeof(sin));
-    sin.sin_family      = AF_INET;
-    sin.sin_addr.s_addr = INADDR_ANY;
-    sin.sin_port        = htons(port);
-    if (BIND(sock,
-            (struct sockaddr *)&sin,
-            sizeof(sin)) < 0) {
-      GE_LOG_STRERROR(ectx,
-                     GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
-                     "bind");
-      GE_LOG(ectx,
-            GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
-            _("Failed to bind to UDP port %d.\n"),
-            port);
-      GE_DIE_STRERROR(ectx,
-                     GE_FATAL | GE_USER | GE_IMMEDIATE,
-                     "bind");
-    }
-  } /* do not bind if port == 0, then we use
-       send-only! */
+    return -1;
+  }
+  GE_ASSERT(NULL, port != 0);
+  memset(&sin, 
+        0, 
+        sizeof(sin));
+  sin.sin_family      = AF_INET;
+  sin.sin_addr.s_addr = INADDR_ANY;
+  sin.sin_port        = htons(port);
+  if (BIND(sock,
+          (struct sockaddr *)&sin,
+          sizeof(sin)) < 0) {
+    GE_LOG_STRERROR(ectx,
+                   GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+                   "bind");
+    GE_LOG(ectx,
+          GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+          _("Failed to bind to UDP port %d.\n"),
+          port);
+    GE_DIE_STRERROR(ectx,
+                   GE_FATAL | GE_USER | GE_IMMEDIATE,
+                   "bind");
+    return -1;
+  }
+  /* do not bind if port == 0, then we use
+     send-only! */
   return sock;
 }
 
@@ -339,15 +344,16 @@
     if (sock == -1)
       return SYSERR;
     selector = select_create("udp",
+                            YES,
                             ectx,
                             load_monitor,
                             sock,
-                            sizeof(IPaddr),
+                            sizeof(struct sockaddr_in),
                             0, /* timeout */
                             &select_message_handler,
                             NULL,
                             &select_accept_handler,
-                            NULL,
+                            &isBlacklisted,
                             &select_close_handler,
                             NULL,
                             0 /* memory quota */ );

Modified: GNUnet/src/transports/udp6.c
===================================================================
--- GNUnet/src/transports/udp6.c        2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp6.c        2006-09-14 06:15:21 UTC (rev 3399)
@@ -337,15 +337,16 @@
     if (sock == -1)
       return SYSERR;
     selector = select_create("udp6",
+                            YES,
                             ectx,
                             load_monitor,
                             sock,
-                            sizeof(IPaddr),
+                            sizeof(struct sockaddr_in6),
                             0, /* timeout */
                             &select_message_handler,
                             NULL,
                             &select_accept_handler,
-                            NULL,
+                            &isBlacklisted,
                             &select_close_handler,
                             NULL,
                             0 /* memory quota */ );

Modified: GNUnet/src/transports/udp_helper.c
===================================================================
--- GNUnet/src/transports/udp_helper.c  2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp_helper.c  2006-09-14 06:15:21 UTC (rev 3399)
@@ -24,6 +24,9 @@
  * @author Christian Grothoff
  */
 
+typedef int (*BlacklistedTester)(const void * addr,
+                                unsigned int addr_len);
+
 /**
  * Message-Packet header.
  */
@@ -115,6 +118,10 @@
                                    const void * addr,
                                    unsigned int addr_len) {
   static int nonnullpointer;
+  BlacklistedTester blt = ah_cls;
+  if (NO != blt(addr,
+               addr_len))
+    return NULL;  
   return &nonnullpointer;
 }
 

Modified: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c    2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/util/network/select.c    2006-09-14 06:15:21 UTC (rev 3399)
@@ -104,17 +104,37 @@
    */
   struct SocketHandle * listen_sock;
 
+  struct GE_Context * ectx;
+  
+  struct LoadMonitor * load_monitor;
+
   /**
+   * Array of currently active TCP sessions.
+   */
+  Session ** sessions;
+
+  SelectMessageHandler mh;
+
+  SelectAcceptHandler ah;
+
+  SelectCloseHandler ch;
+
+  void * mh_cls; 
+  
+  void * ah_cls;
+  
+  void * ch_cls;
+  
+  cron_t timeout;
+  
+  /**
    * tcp_pipe is used to signal the thread that is
    * blocked in a select call that the set of sockets to listen
    * to has changed.
    */
   int signal_pipe[2];
 
-  /**
-   * Array of currently active TCP sessions.
-   */
-  Session ** sessions;
+  int is_udp;
 
   unsigned int sessionCount;
 
@@ -122,26 +142,8 @@
 
   int shutdown;
 
-  struct GE_Context * ectx;
-  
-  struct LoadMonitor * load_monitor;
-
   unsigned int max_addr_len;
 
-  cron_t timeout;
-  
-  SelectMessageHandler mh;
-
-  void * mh_cls;
-  
-  SelectAcceptHandler ah;
-  
-  void * ah_cls;
-  
-  SelectCloseHandler ch;
-
-  void * ch_cls;
-  
   unsigned int memory_quota;
 
 } SelectHandle;
@@ -413,6 +415,7 @@
   void * sctx;
   SocketHandle * sock;
   Session * session;   
+  size_t size;
 
   clientAddr = MALLOC(sh->max_addr_len);
   MUTEX_LOCK(sh->lock);
@@ -491,63 +494,149 @@
                        "select");
       }
     }
-    if ( (sh->listen_sock != NULL) &&
-        (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+    if (sh->is_udp == NO) {
+      if ( (sh->listen_sock != NULL) &&
+          (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+       lenOfIncomingAddr = sh->max_addr_len;
+       memset(clientAddr,
+              0,
+            lenOfIncomingAddr);
+       s = ACCEPT(sh->listen_sock->handle,
+                  (struct sockaddr *) clientAddr,
+                  &lenOfIncomingAddr);
+       if (s == -1) {  
+         GE_LOG(sh->ectx,
+                GE_WARNING | GE_ADMIN | GE_BULK,
+                "Select %s failed to accept!\n",
+                sh->description); 
+         GE_LOG_STRERROR(sh->ectx,
+                         GE_WARNING | GE_ADMIN | GE_BULK,
+                         "accept");
+         break;
+       } else {
+#if DEBUG_SELECT
+         GE_LOG(sh->ectx,
+                GE_DEBUG | GE_DEVELOPER | GE_BULK,
+                "Select %p is accepting connection: %d\n",
+                sh,
+                s); 
+#endif
+         sock = socket_create(sh->ectx,
+                              sh->load_monitor,
+                              s);
+         sctx = sh->ah(sh->ah_cls,
+                       sh,
+                       sock,
+                       clientAddr,
+                       lenOfIncomingAddr);
+#if DEBUG_SELECT
+         GE_LOG(sh->ectx,
+                GE_DEBUG | GE_DEVELOPER | GE_BULK,
+                "Select %p is accepting connection: %p\n",
+                sh,
+                sctx);  
+#endif
+         if (sctx == NULL) {
+           socket_destroy(sock);
+         } else {
+           session = MALLOC(sizeof(Session));
+           memset(session, 0, sizeof(Session));
+           session->sock = sock;
+           session->sock_ctx = sctx;
+           session->lastUse = get_time();
+           if (sh->sessionArrayLength == sh->sessionCount)
+             GROW(sh->sessions,
+                  sh->sessionArrayLength,
+                  sh->sessionArrayLength + 4);
+           sh->sessions[sh->sessionCount++] = session;
+         }
+       } 
+      }
+    } else {  /* is_udp == YES */
+      int pending;
+      int udp_sock;
+      int error;
+
+      udp_sock = sh->listen_sock->handle;
       lenOfIncomingAddr = sh->max_addr_len;
       memset(clientAddr,
             0,
             lenOfIncomingAddr);
-      s = ACCEPT(sh->listen_sock->handle,
-                (struct sockaddr *) clientAddr,
-                &lenOfIncomingAddr);
-      if (s == -1) {   
-       GE_LOG(sh->ectx,
-              GE_WARNING | GE_ADMIN | GE_BULK,
-              "Select %s failed to accept!\n",
-              sh->description); 
+      pending = 0;
+      /* @todo FIXME in PlibC */
+#ifdef MINGW
+      error = ioctlsocket(udp_sock,
+                         FIONREAD,
+                         &pending);
+#else
+      error = ioctl(udp_sock,
+                   FIONREAD,
+                   &pending);
+#endif
+      if (error != 0) {
        GE_LOG_STRERROR(sh->ectx,
-                       GE_WARNING | GE_ADMIN | GE_BULK,
-                       "accept");
-       break;
+                       GE_ERROR | GE_ADMIN | GE_BULK,
+                       "ioctl");
+       pending = 65535; /* max */
+      }
+      GE_ASSERT(sh->ectx, pending >= 0);
+      if (pending == 0) {
+       /* maybe empty UDP packet was sent (see report on bug-gnunet,
+          5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
+       socket_recv_from(sh->listen_sock,
+                        NC_Blocking,
+                        NULL,
+                        0,
+                        &size,
+                        clientAddr,
+                        &lenOfIncomingAddr);
+      } else if (pending >= 65536) {
+       GE_BREAK(sh->ectx, 0);
+       socket_close(sh->listen_sock);
       } else {
-#if DEBUG_SELECT
-       GE_LOG(sh->ectx,
-              GE_DEBUG | GE_DEVELOPER | GE_BULK,
-              "Select %p is accepting connection: %d\n",
-              sh,
-              s); 
-#endif
-       sock = socket_create(sh->ectx,
-                            sh->load_monitor,
-                            s);
-       sctx = sh->ah(sh->ah_cls,
-                     sh,
-                     sock,
-                     clientAddr,
-                     lenOfIncomingAddr);
-#if DEBUG_SELECT
-       GE_LOG(sh->ectx,
-              GE_DEBUG | GE_DEVELOPER | GE_BULK,
-              "Select %p is accepting connection: %p\n",
-              sh,
-              sctx);    
-#endif
-       if (sctx == NULL) {
-         socket_destroy(sock);
+       char * msg;
+       
+       msg = MALLOC(pending);    
+       size = 0;
+       if (YES != socket_recv_from(sh->listen_sock,
+                                   NC_Blocking,
+                                   msg,
+                                   pending,
+                                   &size,
+                                   clientAddr,
+                                   &lenOfIncomingAddr)) {
+         socket_close(sh->listen_sock);
        } else {
-         session = MALLOC(sizeof(Session));
-         memset(session, 0, sizeof(Session));
-         session->sock = sock;
-         session->sock_ctx = sctx;
-         session->lastUse = get_time();
-         if (sh->sessionArrayLength == sh->sessionCount)
-           GROW(sh->sessions,
-                sh->sessionArrayLength,
-                sh->sessionArrayLength + 4);
-         sh->sessions[sh->sessionCount++] = session;
+         /* validate msg format! */
+         const MESSAGE_HEADER * hdr;
+
+         hdr = (const MESSAGE_HEADER*) msg;
+         if ( (size == pending) &&
+              (size >= sizeof(MESSAGE_HEADER)) &&
+              (ntohs(hdr->size) == size) ) {
+           void * sctx;
+           
+           sctx = sh->ah(sh->ah_cls,
+                         sh,
+                         NULL,
+                         clientAddr,
+                         lenOfIncomingAddr);
+           if (sctx != NULL) {
+             sh->mh(sh->mh_cls,
+                    sh,
+                    NULL,
+                    sctx,
+                    hdr);
+             sh->ch(sh->ch_cls,
+                    sh,
+                    NULL,
+                    sctx);                
+           }
+         }
        }
-      } 
-    }
+       FREE(msg);
+      }
+    } /* end UDP processing */
     if (FD_ISSET(sh->signal_pipe[0], &readSet)) {
       /* allow reading multiple signals in one go in case we get many
         in one shot... */
@@ -638,6 +727,7 @@
  * @return NULL on error
  */
 SelectHandle * select_create(const char * description,
+                            int is_udp,
                             struct GE_Context * ectx,
                             struct LoadMonitor * mon,
                             int sock,
@@ -652,16 +742,18 @@
                             unsigned int memory_quota) {
   SelectHandle * sh;
 
-  if ( (0 != LISTEN(sock, 5)) &&
-       (errno != EOPNOTSUPP) ) { /* udp: not supported */
+  if ( (is_udp == NO) &&
+       (0 != LISTEN(sock, 5)) ) {
     GE_LOG_STRERROR(ectx,
                    GE_ERROR | GE_USER | GE_IMMEDIATE,
                    "listen");
-    return NULL;
+    return NULL;    
   }
+  GE_ASSERT(ectx, description != NULL);
   sh = MALLOC(sizeof(SelectHandle));
+  memset(sh, 0, sizeof(SelectHandle));
+  sh->is_udp = is_udp;
   sh->description = description;
-  memset(sh, 0, sizeof(SelectHandle));
   if (0 != PIPE(sh->signal_pipe)) {
     GE_LOG_STRERROR(ectx,
                    GE_ERROR | GE_USER | GE_IMMEDIATE,

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/todo 2006-09-14 06:15:21 UTC (rev 3399)
@@ -18,6 +18,7 @@
     + loggers: SMTP logger
     + use new loggers in for CS error reporting
   * make testcases compile & pass again:
+    + tbench_udp (bugs in udp transport?)
     + gap -- does not yet compile
     + fs/namespace -- does not yet compile
     + fs/fsui -- downloadtest does not yet compile





reply via email to

[Prev in Thread] Current Thread [Next in Thread]