gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3142 - in GNUnet/src: include transports util util/network


From: grothoff
Subject: [GNUnet-SVN] r3142 - in GNUnet/src: include transports util util/network util/network_client
Date: Fri, 28 Jul 2006 01:30:10 -0700 (PDT)

Author: grothoff
Date: 2006-07-28 01:29:51 -0700 (Fri, 28 Jul 2006)
New Revision: 3142

Added:
   GNUnet/src/util/network/select.c
   GNUnet/src/util/network_client/
   GNUnet/src/util/network_client/Makefile.am
   GNUnet/src/util/network_client/daemon.c
   GNUnet/src/util/network_client/tcpio.c
   GNUnet/src/util/network_client/tcpiotest.c
Removed:
   GNUnet/src/util/network/daemon.c
   GNUnet/src/util/network/tcpio.c
   GNUnet/src/util/network/tcpiotest.c
Modified:
   GNUnet/src/include/gnunet_protocols.h
   GNUnet/src/include/gnunet_util_network.h
   GNUnet/src/transports/Makefile.am
   GNUnet/src/transports/tcp.c
   GNUnet/src/util/Makefile.am
   GNUnet/src/util/README
   GNUnet/src/util/network/Makefile.am
   GNUnet/src/util/network/io.c
Log:
improved select abstraction

Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h       2006-07-27 08:02:54 UTC (rev 
3141)
+++ GNUnet/src/include/gnunet_protocols.h       2006-07-28 08:29:51 UTC (rev 
3142)
@@ -414,6 +414,8 @@
  */
 #define ONDEMAND_BLOCK 0xFFFFFFFF
 
+
+
 #if 0 /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: GNUnet/src/include/gnunet_util_network.h
===================================================================
--- GNUnet/src/include/gnunet_util_network.h    2006-07-27 08:02:54 UTC (rev 
3141)
+++ GNUnet/src/include/gnunet_util_network.h    2006-07-28 08:29:51 UTC (rev 
3142)
@@ -36,8 +36,6 @@
 #include "gnunet_util_string.h"
 #include "gnunet_util_os.h"
 #include "gnunet_util_threads.h"
-#include <sys/socket.h>
-#include <sys/select.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -115,6 +113,24 @@
 } MESSAGE_HEADER;
 
 /**
+ * Client-server communication: simple return value
+ */
+typedef struct {
+
+  /**
+   * The CS header (values: sizeof(CS_returnvalue_MESSAGE) + error-size, 
CS_PROTO_RETURN_VALUE)
+   */
+  MESSAGE_HEADER header;
+
+  /**
+   * The return value (network byte order)
+   */
+  int return_value;
+
+} RETURN_VALUE_MESSAGE;
+
+
+/**
  * @brief an IPv4 address
  */
 typedef struct {
@@ -144,10 +160,56 @@
  */
 struct CIDR6Network;
 
-struct ClientServerConnection;
-
+/**
+ * @brief handle for a system socket
+ */
 struct SocketHandle;
 
+/**
+ * @brief handle for a select manager
+ */
+struct SelectHandle;
+
+/**
+ * @brief callback for handling messages received by select
+ *
+ * @param sock socket on which the message was received
+ *        (should ONLY be used to queue reply using select methods)
+ * @return OK if message was valid, SYSERR if corresponding
+ *  socket should be closed
+ */
+typedef int (*SelectMessageHandler)(void * mh_cls,
+                                  struct SelectHandle * sh,
+                                  struct SocketHandle * sock,
+                                  void * sock_ctx,
+                                  const MESSAGE_HEADER * msg);                 
     
+
+/**
+ * We've accepted a connection, check that
+ * the connection is valid and create the
+ * corresponding sock_ctx for the new
+ * connection.
+ *
+ * @param addr the address of the other side as reported by OS
+ * @param addr_len the size of the address
+ * @return NULL to reject connection, otherwise value of sock_ctx
+ *         for the new connection
+ */
+typedef void * (*SelectAcceptHandler)(void * ah_cls,
+                                     struct SelectHandle * sh,
+                                     struct SocketHandle * sock,
+                                     const void * addr,
+                                     unsigned int addr_len);
+
+/**
+ * Select has been forced to close a connection.
+ * Free the associated context.
+ */
+typedef void (*SelectCloseHandler)(void * ch_cls,
+                                  struct SelectHandle * sh,
+                                  struct SocketHandle * sock,
+                                  void * sock_ctx);
+
 /* *********************** endianess conversion ************* */
 
 /**
@@ -230,171 +292,6 @@
                     const char * hostname,
                     IPaddr * ip);
 
-/* ***************** high-level GNUnet client-server connections *********** */
-
-/**
- * Get a connection with gnunetd.
- */
-struct ClientServerConnection * 
-daemon_connection_create(struct GE_Context * ectx,
-                        struct GC_Configuration * cfg);
-
-/**
- * Initialize a GNUnet server socket.
- * @param sock the open socket
- * @param result the SOCKET (filled in)
- * @return OK (always successful)
- */
-struct ClientServerConnection * 
-client_connection_create(struct GE_Context * ectx,
-                        struct GC_Configuration * cfg,
-                        struct SocketHandle * sock);
-
-/**
- * Close a GNUnet TCP socket for now (use to temporarily close
- * a TCP connection that will probably not be used for a long
- * time; the socket will still be auto-reopened by the
- * readFromSocket/writeToSocket methods if it is a client-socket).
- *
- * Also, you must still call connection_destroy to free all
- * resources associated with the connection.
- */
-void connection_close_temporarily(struct ClientServerConnection * sock);
-
-/**
- * Destroy connection between gnunetd and clients.
- * Also closes the connection if it is still active.
- */
-void connection_destroy(struct ClientServerConnection * con);
-
-/**
- * Check if a socket is open. Will ALWAYS return 'true' for a valid
- * client socket (even if the connection is closed), but will return
- * false for a closed server socket.
- *
- * @return 1 if open, 0 if closed
- */
-int connection_test_open(struct ClientServerConnection * sock);
-
-/**
- * Check a socket, open and connect if it is closed and it is a
- * client-socket.
- *
- * @return OK if the socket is now open, SYSERR if not
- */
-int connection_ensure_connected(struct ClientServerConnection * sock);
-
-/**
- * Read from a GNUnet client-server connection.
- *
- * @param sock the socket
- * @param buffer the buffer to write data to
- *        if NULL == *buffer, *buffer is allocated (caller frees)
- * @return OK if the read was successful, SYSERR if the socket
- *         was closed by the other side (if the socket is a
- *         client socket and is used again, the next
- *         read/write call will automatically attempt
- *         to re-establish the connection).
- */
-int connection_read(struct ClientServerConnection * sock,
-                   MESSAGE_HEADER ** buffer);
-
-/**
- * Write to a GNUnet TCP socket.
- *
- * @param sock the socket to write to
- * @param buffer the buffer to write
- * @return OK if the write was sucessful, 
- *         NO if it would block and isBlocking was NO,
- *         SYSERR if the write failed (error will be logged)
- */
-int connection_write(struct ClientServerConnection * sock,
-                    const MESSAGE_HEADER * buffer);
-
-/**
- * Obtain a simple return value from the connection.
- * Note that the protocol will automatically communicate
- * errors and pass those to the error context used when
- * the socket was created.  In that case, read_result
- * will return SYSERR for the corresponding communication.
- * 
- * @param sock the TCP socket
- * @param ret the return value from TCP
- * @return SYSERR on error, OK if the return value was
- *         read successfully
- */
-int connection_read_result(struct ClientServerConnection * sock,
-                          int * ret);
-
-/**
- * Send a simple return value to the other side.
- *
- * @param sock the TCP socket
- * @param ret the return value to send via TCP
- * @return SYSERR on error, OK if the return value was
- *         send successfully
- */
-int connection_write_result(struct ClientServerConnection * sock,
-                           int ret);
-
-/**
- * Send a return value that indicates
- * a serious error to the other side.
- *
- * @param sock the TCP socket
- * @param mask GE_MASK 
- * @param date date string
- * @param msg message string
- * @return SYSERR on error, OK if the error code was send
- *         successfully
- */
-int connection_write_error(struct ClientServerConnection * sock,
-                          GE_KIND mask,
-                          const char * date,
-                          const char * msg);
-
-/**
- * Stop gnunetd
- *
- * Note that returning an error does NOT mean that
- * gnunetd will continue to run (it may have been
- * shutdown by something else in the meantime or
- * crashed).  Call connection_test_running() frequently
- * to check the status of gnunetd.
- *
- * Furthermore, note that this WILL potentially kill
- * gnunetd processes on remote machines that cannot
- * be restarted with startGNUnetDaemon!
- *
- * This function does NOT need the PID and will also
- * kill daemonized gnunetd's.
- *
- * @return OK successfully stopped, SYSERR: error
- */
-int connection_request_shutdown(struct ClientServerConnection * sock);
-
-/**
- * Checks if gnunetd is running
- *
- * Uses CS_PROTO_traffic_COUNT query to determine if gnunetd is
- * running.
- *
- * @return OK if gnunetd is running, SYSERR if not
- */
-int connection_test_running(struct GE_Context * ectx,
-                           struct GC_Configuration * cfg);
-
-/**
- * Wait until the gnunet daemon is
- * running.
- *
- * @param timeout how long to wait at most in ms
- * @return OK if gnunetd is now running
- */
-int connection_wait_for_running(struct GE_Context * ectx,
-                               struct GC_Configuration * cfg,
-                               cron_t timeout);
-
 /* ********************* low-level socket operations **************** */
 
 /**
@@ -407,17 +304,11 @@
              struct LoadMonitor * mon,
              int osSocket);
 
+/**
+ * Destroy the socket (also closes it).
+ */
 void socket_destroy(struct SocketHandle * s);
 
-void socket_add_to_select_set(struct SocketHandle * s,
-                             fd_set * set,
-                             int * max);
-
-int socket_test_select_set(struct SocketHandle * sock,
-                          fd_set * set);
-
-int socket_get_os_socket(struct SocketHandle * sock);
-
 /**
  * Depending on doBlock, enable or disable the nonblocking mode
  * of socket s.
@@ -459,8 +350,8 @@
                     void * buf,
                     size_t max,
                     size_t * read,
-                    struct sockaddr * from,
-                    socklen_t * fromlen);
+                    char * from,
+                    unsigned int * fromlen);
 
 /**
  * Do a write on the given socket.
@@ -484,8 +375,8 @@
                   const void * buf,
                   size_t max,
                   size_t * sent,
-                  const struct sockaddr * dst,
-                  socklen_t dstlen);
+                  const char * dst,
+                  unsigned int dstlen);
 
 /**
  * Check if socket is valid
@@ -494,6 +385,75 @@
 int socket_test_valid(struct SocketHandle * s);
 
 
+/* ********************* select operations **************** */
+
+
+/**
+ * Start a select thread that will accept connections
+ * from the given socket and pass messages read to the
+ * given message handler.
+ *
+ * @param sock the listen socket
+ * @param max_addr_len maximum expected length of addresses for
+ *        connections accepted on the given socket
+ * @param timeout after how long should inactive connections be
+ *        closed?  Use 0 for no timeout
+ * @param mon maybe NULL
+ * @param memory_quota amount of memory available for
+ *        queueing messages (in bytes)
+ * @return NULL on error
+ */
+struct SelectHandle * select_create(struct GE_Context * ectx,
+                                   struct LoadMonitor * mon,
+                                   int sock,
+                                   unsigned int max_addr_len,
+                                   cron_t timeout,
+                                   SelectMessageHandler mh,
+                                   void * mh_cls,
+                                   SelectAcceptHandler ah,
+                                   void * ah_cls,
+                                   SelectCloseHandler ch,
+                                   void * ch_cls,
+                                   unsigned int memory_quota);
+
+/**
+ * Terminate the select thread, close the socket and
+ * all associated connections.
+ */
+void select_destroy(struct SelectHandle * sh);
+
+/**
+ * Queue the given message with the select thread.
+ *
+ * @param mayBlock if YES, blocks this thread until message
+ *        has been sent
+ * @param force message is important, queue even if
+ *        there is not enough space
+ * @return OK if the message was sent or queued
+ *         NO if there was not enough memory to queue it,
+ *         SYSERR if the sock does not belong with this select
+ */
+int select_write(struct SelectHandle * sh,
+                struct SocketHandle * sock,
+                const MESSAGE_HEADER * msg,
+                int mayBlock,
+                int force);
+
+/**
+ * Add another (already connected) socket to the set of
+ * sockets managed by the select.
+ */
+int select_connect(struct SelectHandle * sh,
+                  struct SocketHandle * sock,
+                  void * sock_ctx);
+
+/**
+ * Close the associated socket and remove it from the
+ * set of sockets managed by select.
+ */
+int select_disconnect(struct SelectHandle * sh,
+                     struct SocketHandle * sock);
+
 #if 0 /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: GNUnet/src/transports/Makefile.am
===================================================================
--- GNUnet/src/transports/Makefile.am   2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/transports/Makefile.am   2006-07-28 08:29:51 UTC (rev 3142)
@@ -15,7 +15,7 @@
   libip.la
 
 if !MINGW
-# smtptransport = libgnunettransport_smtp.la
+ smtptransport = libgnunettransport_smtp.la
 endif
 
 libip_la_SOURCES = \

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/transports/tcp.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -134,36 +134,6 @@
    */
   int expectingWelcome;
 
-  /**
-   * Current read position in the buffer.
-   */
-  unsigned int pos;
-
-  /**
-   * Current size of the read buffer.
-   */
-  unsigned int rsize;
-
-  /**
-   * The read buffer.
-   */
-  char * rbuff;
-
-  /**
-   * Position in the write buffer
-   */
-  unsigned int wpos;
-
-  /**
-   * The write buffer.
-   */
-  char * wbuff;
-
-  /**
-   * Size of the write buffer
-   */
-  unsigned int wsize;
-
 } TCPSession;
 
 /* *********** globals ************* */
@@ -183,66 +153,15 @@
 
 static int stat_bytesDropped;
 
-/**
- * one thread for listening for new connections,
- * and for reading on all open sockets
- */
-static struct PTHREAD * listenThread;
-
-/**
- * sock is the tcp socket that we listen on for new inbound
- * connections.
- */
-static struct SocketHandle * tcp_sock;
-
-/**
- * tcp_pipe is used to signal the thread that is
- * blocked in a select call that the set of sockets to listen
- * to has changed.
- */
-static int tcp_pipe[2];
-
-/**
- * Array of currently active TCP sessions.
- */
-static TSession ** tsessions = NULL;
-
-static unsigned int tsessionCount;
-
-static unsigned int tsessionArrayLength;
-
 /* configuration */
 static struct CIDRNetwork * filteredNetworks_;
 
-/**
- * Lock for access to mutable state of the module,
- * that is the configuration and the tsessions array.
- * Note that we ONLY need to synchronize access to
- * the tsessions array when adding or removing sessions,
- * since removing is done only by one thread and we just
- * need to avoid another thread adding an element at the
- * same point in time. We do not need to synchronize at
- * every access point since adding new elements does not
- * prevent the select thread from operating and removing
- * is done by the only therad that reads from the array.
- */
-static struct MUTEX * tcplock;
+static struct SelectHandle * selector;
 
-/**
- * Semaphore used by the server-thread to signal that
- * the server has been started -- and later again to
- * signal that the server has been stopped.
- */
-static struct SEMAPHORE * serverSignal;
-
-static int tcp_shutdown = YES;
-
 static struct GE_Context * ectx;
 
 static struct GC_Configuration * cfg;
 
-static struct LoadMonitor * load_monitor;
-
 /* ******************** helper functions *********************** */
 
 /**
@@ -259,23 +178,6 @@
 }
 
 /**
- * Write to the pipe to wake up the select thread (the set of
- * files to watch has changed).
- */
-static void signalSelect() {
-  char i = 0;
-  int ret;
-
-  ret = WRITE(tcp_pipe[1],
-             &i,
-             sizeof(char));
-  if (ret != sizeof(char))
-    GE_LOG_STRERROR(ectx,
-                   GE_ERROR | GE_ADMIN | GE_BULK,
-                   "write");
-}
-
-/**
  * Disconnect from a remote node. May only be called
  * on sessions that were aquired by the caller first.
  * For the core, aquiration means to call associate or
@@ -604,254 +506,6 @@
 }                                      
 
 /**
- * Main method for the thread listening on the tcp socket and all tcp
- * connections. Whenever a message is received, it is forwarded to the
- * core. This thread waits for activity on any of the TCP connections
- * and processes deferred (async) writes and buffers reads until an
- * entire message has been received.
- */
-static void * tcpListenMain(void * unused) {
-  struct sockaddr_in clientAddr;
-  fd_set readSet;
-  fd_set errorSet;
-  fd_set writeSet;
-  struct stat buf;
-  socklen_t lenOfIncomingAddr;
-  int i;
-  int max;
-  int ret;
-
-  SEMAPHORE_UP(serverSignal); /* we are there! */
-  MUTEX_LOCK(tcplock);
-  while (tcp_shutdown == NO) {
-    FD_ZERO(&readSet);
-    FD_ZERO(&errorSet);
-    FD_ZERO(&writeSet);
-    if (tcp_pipe[0] != -1) {
-      if (-1 != FSTAT(tcp_pipe[0], &buf)) {
-       FD_SET(tcp_pipe[0], 
-              &readSet);
-      } else {
-       GE_LOG_STRERROR(ectx,
-                       GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
-                       "fstat");
-       tcp_pipe[0] = -1; /* prevent us from error'ing all the time */  
-      }
-    }
-    max = tcp_pipe[0];
-    if (tcp_sock != NULL) {
-      if (socket_test_valid(tcp_sock)) {
-       socket_add_to_select_set(tcp_sock, &readSet, &max);
-      } else { 
-       socket_destroy(tcp_sock);
-       tcp_sock = NULL; /* prevent us from error'ing all the time */
-      }
-    }
-#if DEBUG_TCP
-    else
-      GE_LOG(ectx,
-            GE_USER | GE_WARNING | GE_BULK,
-            _("TCP server socket not open!\n"));
-#endif
-    for (i=0;i<tsessionCount;i++) {
-      TCPSession * tcpSession = tsessions[i]->internal;
-      struct SocketHandle * sock = tcpSession->sock;
-      if (sock != NULL) {
-       if (socket_test_valid(sock)) {
-         socket_add_to_select_set(sock, &readSet, &max);
-         socket_add_to_select_set(sock, &errorSet, &max);
-         if (tcpSession->wpos > 0)
-           socket_add_to_select_set(sock, &writeSet, &max); /* do we have a 
pending write request? */
-       } else {
-         destroySession(i);
-       }
-      } else {
-       GE_BREAK(ectx, 0); /* sock in tsessions array should never be -1 */
-       destroySession(i);
-      }
-    }
-    MUTEX_UNLOCK(tcplock);
-    ret = SELECT(max+1, 
-                &readSet,
-                &writeSet,
-                &errorSet,
-                NULL);
-    MUTEX_LOCK(tcplock);
-    if ( (ret == -1) &&
-        ( (errno == EAGAIN) || (errno == EINTR) ) )
-      continue;
-    if (ret == -1) {
-      if (errno == EBADF) {
-       GE_LOG_STRERROR(ectx,
-                       GE_ERROR | GE_ADMIN | GE_USER | GE_BULK, 
-                       "select");
-      } else {
-       GE_DIE_STRERROR(ectx,
-                       GE_FATAL | GE_ADMIN | GE_USER | GE_IMMEDIATE,
-                       "select");
-      }
-    }
-    if (tcp_sock != NULL) {
-      if (socket_test_select_set(tcp_sock, &readSet)) {
-       int sock;
-       
-       lenOfIncomingAddr = sizeof(clientAddr);
-       sock = ACCEPT(socket_get_os_socket(tcp_sock),
-                     (struct sockaddr *)&clientAddr,
-                     &lenOfIncomingAddr);
-       if (sock != -1) {       
-         /* verify clientAddr for eligibility here (ipcheck-style,
-            user should be able to specify who is allowed to connect,
-            otherwise we just close and reject the communication! */
-
-         IPaddr ipaddr;
-         GE_ASSERT(ectx,
-                   sizeof(struct in_addr) == sizeof(IPaddr));
-         memcpy(&ipaddr,
-                &clientAddr.sin_addr,
-                sizeof(struct in_addr));
-
-         if (YES == isBlacklisted(ipaddr)) {
-           GE_LOG(ectx,
-                  GE_INFO | GE_USER | GE_ADMIN | GE_REQUEST,
-                  _("%s: Rejected connection from blacklisted "
-                    "address %u.%u.%u.%u.\n"),
-                  "TCP",
-                  PRIP(ntohl(*(int*)&clientAddr.sin_addr)));
-           if (0 != SHUTDOWN(sock, 2))
-             GE_LOG_STRERROR(ectx,
-                             GE_USER | GE_ADMIN | GE_WARNING | GE_BULK,
-                             "shutdown");
-           if (0 != CLOSE(sock))
-             GE_LOG_STRERROR(ectx,
-                             GE_USER | GE_ADMIN | GE_WARNING | GE_BULK,
-                             "close");
-         } else {
-#if DEBUG_TCP
-           GE_LOG(ectx,
-                  GE_INFO,
-                  "Accepted connection from %u.%u.%u.%u.\n",
-                  PRIP(ntohl(*(int*)&clientAddr.sin_addr)));   
-#endif
-           createNewSession(sock);
-         }
-       } else {
-         GE_LOG_STRERROR(ectx,
-                         GE_WARNING | GE_ADMIN | GE_BULK,
-                         "accept");
-       }
-      }
-    }
-    if (FD_ISSET(tcp_pipe[0], &readSet)) {
-      /* allow reading multiple signals in one go in case we get many
-        in one shot... */
-#define MAXSIG_BUF 128
-      char buf[MAXSIG_BUF];
-      /* just a signal to refresh sets, eat and continue */
-      if (0 >= READ(tcp_pipe[0],
-                   &buf[0],
-                   MAXSIG_BUF)) {
-       GE_LOG_STRERROR(ectx,
-                       GE_WARNING | GE_USER | GE_BULK, 
-                       "read");
-      }
-    }
-    for (i=0;i<tsessionCount;i++) {
-      TCPSession * tcpSession = tsessions[i]->internal;
-      struct SocketHandle * sock = tcpSession->sock;
-      if (socket_test_select_set(sock, &readSet)) {
-       if (SYSERR == readAndProcess(i)) {
-         destroySession(i);
-         i--;
-         continue;
-       }
-      }
-      if (socket_test_select_set(sock, &writeSet)) {
-       size_t ret;
-       int success;
-
-try_again_1:
-#if DEBUG_TCP
-       GE_LOG(ectx,
-              GE_DEBUG | GE_USER | GE_BULK,
-              "TCP: trying to send %u bytes\n",
-              tcpSession->wpos);
-#endif
-       success = socket_send(sock,
-                             NC_Nonblocking,
-                             tcpSession->wbuff,
-                             tcpSession->wpos,
-                             &ret);
-       if (success == SYSERR) {
-         GE_LOG_STRERROR(ectx,
-                         GE_WARNING | GE_USER | GE_ADMIN | GE_BULK,
-                         "send");
-         destroySession(i);
-         i--;
-         continue;
-        } else if (success == NO) {
-         /* this should only happen under Win9x because
-            of a bug in the socket implementation (KB177346).
-            Let's sleep and try again. */
-         PTHREAD_SLEEP(20 * cronMILLIS);
-         goto try_again_1;
-        }
-       if (stats != NULL)
-         stats->change(stat_bytesSent,
-                       ret);
-
-#if DEBUG_TCP
-       GE_LOG(ectx,
-              GE_DEBUG | GE_USER | GE_BULK,
-              "TCP: transmitted %u bytes\n",
-              ret);
-#endif
-       if (ret == 0) {
-          /* send only returns 0 on error (other side closed connection),
-          * so close the session */
-         destroySession(i);
-         i--;
-         continue;
-       }
-       if (ret == tcpSession->wpos) {
-         FREENONNULL(tcpSession->wbuff);
-         tcpSession->wbuff = NULL;
-         tcpSession->wpos  = 0;
-         tcpSession->wsize = 0;
-       } else {
-         memmove(tcpSession->wbuff,
-                 &tcpSession->wbuff[ret],
-                 tcpSession->wpos - ret);
-         tcpSession->wpos -= ret;
-       }
-      }
-      if (socket_test_select_set(sock, &errorSet)) {
-       destroySession(i);
-       i--;
-       continue;
-      }
-      if ( ( tcpSession->users == 1) &&
-          (get_time() > tcpSession->lastUse + TCP_TIMEOUT) ) {
-       destroySession(i);
-       i--;
-       continue;
-      }
-    }
-  }
-  /* shutdown... */
-  if (tcp_sock != NULL) {
-    socket_destroy(tcp_sock);
-    tcp_sock = NULL;
-  }
-  /* close all sessions */
-  while (tsessionCount > 0)
-    destroySession(0);
-  MUTEX_UNLOCK(tcplock);
-  SEMAPHORE_UP(serverSignal); /* we are there! */
-  return NULL;
-} /* end of tcp listen main */
-
-/**
  * Send a message (already encapsulated if needed) via the
  * tcp socket (or enqueue if sending now would block).
  *
@@ -1425,20 +1079,8 @@
                             s);
   } else
     tcp_sock = NULL;
-  listenThread = PTHREAD_CREATE(&tcpListenMain,
-                               NULL,
-                               5 * 1024);
-  if (listenThread == NULL) {
-    GE_LOG_STRERROR(ectx,
-                   GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
-                   "pthread_create");
-    socket_destroy(tcp_sock);
-    tcp_sock = NULL;
-    SEMAPHORE_DESTROY(serverSignal);
-    serverSignal = NULL;
-    return SYSERR;
-  }
-  SEMAPHORE_DOWN(serverSignal, YES); /* wait for server to be up */
+
+  /* FIXME: call network/select code! */
   return OK;
 }
 

Modified: GNUnet/src/util/Makefile.am
===================================================================
--- GNUnet/src/util/Makefile.am 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/Makefile.am 2006-07-28 08:29:51 UTC (rev 3142)
@@ -3,7 +3,7 @@
   getopt disk threads \
   os network . \
   config_impl cron crypto \
-  containers loggers 
+  containers loggers network_client
 
 INCLUDES = -I$(top_srcdir)/src/include
 

Modified: GNUnet/src/util/README
===================================================================
--- GNUnet/src/util/README      2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/README      2006-07-28 08:29:51 UTC (rev 3142)
@@ -25,6 +25,8 @@
       => linked to gnunetutil_containers.so (also requires 
libgnunetutil_crypto)
 util/loggers: specific logging implementations (depends on gnunetutil.so)
       => linked to gnunetutil_logging.so
+util/network_client: library for synchronous communiation of clients with 
gnunetd (depends on gnunetutil.so)
+      => linked to gnunetutil_network_client.so
 
 Most GNUnet libraries and plugins will only need to (directly) link
 against gnunetutil.so.  Some may also require crypto or containers.

Modified: GNUnet/src/util/network/Makefile.am
===================================================================
--- GNUnet/src/util/network/Makefile.am 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/Makefile.am 2006-07-28 08:29:51 UTC (rev 3142)
@@ -6,11 +6,10 @@
   libnetwork.la
 
 libnetwork_la_SOURCES = \
- daemon.c \
  endian.c \
  io.c \
  ipcheck.c \
- tcpio.c 
+ select.c 
 
 check_PROGRAMS = \
  tcpiotest 

Deleted: GNUnet/src/util/network/daemon.c
===================================================================
--- GNUnet/src/util/network/daemon.c    2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/daemon.c    2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,101 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2005, 2006 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file src/util/network/daemon.c
- * @brief code for client-gnunetd interaction (stop, check running)
- * @author Christian Grothoff
- */
-
-#include "platform.h"
-#include "gnunet_util_network.h"
-#include "gnunet_protocols.h"
-#include "gnunet_util_threads.h"
-
-int connection_test_running(struct GE_Context * ectx,
-                           struct GC_Configuration * cfg) {
-  struct ClientServerConnection * sock;
-  MESSAGE_HEADER csHdr;
-  int ret;
-
-  sock = daemon_connection_create(ectx, cfg);
-  if (sock == NULL) 
-    return SYSERR;
-  csHdr.size
-    = htons(sizeof(MESSAGE_HEADER));
-  csHdr.type
-    = htons(CS_PROTO_traffic_COUNT);
-  if (SYSERR == connection_write(sock,
-                                &csHdr)) {
-    connection_destroy(sock);
-    return SYSERR;
-  }
-  if (SYSERR == connection_read_result(sock,
-                                      &ret)) {
-    connection_destroy(sock);
-    return SYSERR;
-  }
-  connection_destroy(sock);
-  return OK;
-}
-
-int connection_request_shutdown(struct ClientServerConnection * sock) {
-  MESSAGE_HEADER csHdr;
-  int ret;
-
-  csHdr.size
-    = htons(sizeof(MESSAGE_HEADER));
-  csHdr.type
-    = htons(CS_PROTO_SHUTDOWN_REQUEST);
-  if (SYSERR == connection_write(sock,
-                                &csHdr)) {
-    connection_close_temporarily(sock);
-    return SYSERR;
-  }
-  if (SYSERR == connection_read_result(sock,
-                                      &ret)) {
-    connection_close_temporarily(sock);
-    return SYSERR;
-  }
-  return ret;
-}
-
-/**
- * Wait until the gnunet daemon is
- * running.
- *
- * @param timeout how long to wait at most
- * @return OK if gnunetd is now running
- */
-int connection_wait_for_running(struct GE_Context * ectx,
-                               struct GC_Configuration * cfg,
-                               cron_t timeout) {
-  timeout += get_time();
-  while (OK != connection_test_running(ectx,
-                                      cfg)) {
-    PTHREAD_SLEEP(100 * cronMILLIS);
-    if (timeout < get_time())
-      return connection_test_running(ectx,
-                                    cfg);
-  }
-  return OK;
-}
-
-/* end of daemon.c */

Modified: GNUnet/src/util/network/io.c
===================================================================
--- GNUnet/src/util/network/io.c        2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/io.c        2006-07-28 08:29:51 UTC (rev 3142)
@@ -28,36 +28,8 @@
 
 #include "gnunet_util_network.h"
 #include "platform.h"
+#include "network.h"
 
-typedef struct SocketHandle {
-
-  struct LoadMonitor * mon;
-  
-  struct GE_Context * ectx;
-
-  int handle;
-  
-} SocketHandle;
-
-
-void socket_add_to_select_set(struct SocketHandle * s,
-                             fd_set * set,
-                             int * max) {
-  FD_SET(s->handle,
-        set);
-  if (*max < s->handle)
-    *max = s->handle;
-}
-
-int socket_test_select_set(struct SocketHandle * sock,
-                          fd_set * set) {
-  return FD_ISSET(sock->handle, set);
-}
-
-int socket_get_os_socket(struct SocketHandle * sock) {
-  return sock->handle;
-}
-
 struct SocketHandle * 
 socket_create(struct GE_Context * ectx,
              struct LoadMonitor * mon,
@@ -190,8 +162,8 @@
                     void * buf,
                     size_t max,
                     size_t * read,
-                    struct sockaddr * from,
-                    socklen_t * fromlen) {
+                    char * from,
+                    unsigned int * fromlen) {
   int flags;
   size_t pos;
   size_t ret;
@@ -221,7 +193,7 @@
                            &((char*)buf)[pos],
                            max - pos,
                            flags,
-                           from,
+                           (struct sockaddr*) from,
                            fromlen);
     if ( (ret == (size_t) -1) &&
         (errno == EINTR) &&
@@ -321,8 +293,8 @@
                   const void * buf,
                   size_t max,
                   size_t * sent,
-                  const struct sockaddr * dst,
-                  socklen_t dstlen) {
+                  const char * dst,
+                  unsigned int dstlen) {
   int flags;
   size_t pos;
   size_t ret;
@@ -353,7 +325,7 @@
                          &((char*)buf)[pos],
                          max - pos,
                          flags,
-                         dst,
+                         (const struct sockaddr*) dst,
                          dstlen);
     if ( (ret == (size_t) -1) &&
         (errno == EINTR) &&

Added: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c    2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/select.c    2006-07-28 08:29:51 UTC (rev 3142)
@@ -0,0 +1,740 @@
+/*
+     This file is part of GNUnet.
+     (C) 2003, 2006 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file util/network/select.c
+ * @brief (network) input/output operations
+ * @author Christian Grothoff
+ *
+ * TODO: memory management (pool allocation!)
+ */
+
+#include "gnunet_util_network.h"
+#include "platform.h"
+#include "network.h"
+
+/**
+ * Select Session handle.
+ */
+typedef struct {
+
+  /**
+   * the socket
+   */
+  struct SocketHandle * sock;
+
+  /**
+   * Client connection context.
+   */
+  void * sock_ctx;
+
+  cron_t lastUse;
+
+  /**
+   * Current read position in the buffer.
+   */
+  unsigned int pos;
+
+  /**
+   * Current size of the read buffer.
+   */
+  unsigned int rsize;
+
+  /**
+   * The read buffer.
+   */
+  char * rbuff;
+
+  /**
+   * Position in the write buffer
+   */
+  unsigned int wpos;
+
+  /**
+   * The write buffer.
+   */
+  char * wbuff;
+
+  /**
+   * Size of the write buffer
+   */
+  unsigned int wsize;
+
+} Session;
+
+typedef struct SelectHandle {
+
+  /**
+   * mutex for synchronized access
+   */
+  struct MUTEX * lock;
+
+  /**
+   * one thread for listening for new connections,
+   * and for reading on all open sockets
+   */
+  struct PTHREAD * thread;
+
+  /**
+   * sock is the tcp socket that we listen on for new inbound
+   * connections.
+   */
+  struct SocketHandle * listen_sock;
+
+  /**
+   * tcp_pipe is used to signal the thread that is
+   * blocked in a select call that the set of sockets to listen
+   * to has changed.
+   */
+  int signal_pipe[2];
+
+  /**
+   * Array of currently active TCP sessions.
+   */
+  Session ** sessions;
+
+  unsigned int sessionCount;
+
+  unsigned int sessionArrayLength;
+
+  int shutdown;
+
+  struct GE_Context * ectx;
+  
+  struct LoadMonitor * load_monitor;
+
+  unsigned int max_addr_len;
+
+  cron_t timeout;
+  
+  SelectMessageHandler mh;
+
+  void * mh_cls;
+  
+  SelectAcceptHandler ah;
+  
+  void * ah_cls;
+  
+  SelectCloseHandler ch;
+
+  void * ch_cls;
+  
+  unsigned int memory_quota;
+
+} SelectHandle;
+
+static void add_to_select_set(struct SocketHandle * s,
+                             fd_set * set,
+                             int * max) {
+  FD_SET(s->handle,
+        set);
+  if (*max < s->handle)
+    *max = s->handle;
+}
+
+/**
+ * Write to the pipe to wake up the select thread (the set of
+ * files to watch has changed).
+ */
+static void signalSelect(SelectHandle * sh) {
+  static char i = '\0';
+  int ret;
+
+  ret = WRITE(sh->signal_pipe[1],
+             &i,
+             sizeof(char));
+  if (ret != sizeof(char))
+    GE_LOG_STRERROR(sh->ectx,
+                   GE_ERROR | GE_ADMIN | GE_BULK,
+                   "write");
+}
+
+/**
+ * Destroy the given session by closing the socket,
+ * releasing the buffers and removing it from the
+ * select set.
+ *
+ * This function may only be called if the tcplock is
+ * already held by the caller.
+ */
+static void destroySession(SelectHandle * sh,
+                          Session * s) { 
+  int i;
+
+  sh->ch(sh->ch_cls,
+        sh,
+        s->sock,
+        s->sock_ctx);
+  socket_destroy(s->sock);
+  GROW(s->rbuff,
+       s->rsize,
+       0);
+  GROW(s->wbuff,
+       s->wsize,
+       0);
+  for (i=0;i<sh->sessionCount;i++) {
+    if (sh->sessions[i] == s) {
+      sh->sessions[i] = sh->sessions[sh->sessionCount-1];
+      sh->sessionCount--;
+      break;
+    }
+  }
+  FREE(s);
+  if (sh->sessionCount * 2 < sh->sessionArrayLength)
+    GROW(sh->sessions,
+        sh->sessionArrayLength,
+        sh->sessionCount);
+}
+
+/**
+ * The socket of a session has data waiting, read and 
+ * process!
+ *
+ * This function may only be called if the lock is
+ * already held by the caller.
+ * @return OK for success, SYSERR if session was destroyed
+ */
+static int readAndProcess(SelectHandle * sh,
+                         Session * session) {
+  const MESSAGE_HEADER * pack;
+  int ret;
+  size_t recvd;
+  unsigned short len;
+  
+  if (session->rsize == session->pos) {
+    /* read buffer too small, grow */
+    GROW(session->rbuff,
+        session->rsize,
+        session->rsize + 1024);
+  }
+  ret = socket_recv(session->sock,
+                   NC_Blocking | NC_IgnoreInt, 
+                   &session->rbuff[session->pos],
+                   session->rsize - session->pos,
+                   &recvd);
+  if (ret != OK) {
+    destroySession(sh, session);
+    return SYSERR; /* other side closed connection */
+  }
+  session->pos += recvd;
+  while (sh->shutdown == NO) {
+    pack = (const MESSAGE_HEADER*) &session->rbuff[0];
+    len = ntohs(pack->size);
+    /* check minimum size */
+    if (len < sizeof(MESSAGE_HEADER)) {
+      GE_LOG(sh->ectx,
+            GE_WARNING | GE_USER | GE_BULK,
+            _("Received malformed message (too small) from connection. 
Closing.\n"));
+      destroySession(sh, session);
+      return SYSERR;
+    }
+    if (len > session->rsize) /* if message larger than read buffer, grow! */
+      GROW(session->rbuff,
+          session->rsize,
+          len);
+
+    /* do we have the entire message? */
+    if (session->pos < len) 
+      break; /* wait for more */    
+
+    sh->mh(sh->mh_cls,
+          sh,
+          session->sock,
+          session->sock_ctx,
+          pack);
+    /* shrink buffer adequately */
+    memmove(&session->rbuff[0],
+           &session->rbuff[len],
+           session->pos - len);
+    session->pos -= len;
+  }
+  session->lastUse = get_time();
+  return OK;
+}
+
+/**
+ * The socket of a session has data waiting that can be 
+ * transmitted, do it!
+ *
+ * This function may only be called if the lock is
+ * already held by the caller.
+ * @return OK for success, SYSERR if session was destroyed
+ */
+static int writeAndProcess(SelectHandle * sh,
+                          Session * session) {
+  SocketHandle * sock;
+  int ret;
+  size_t size;
+
+  sock = session->sock;
+  while (sh->shutdown == NO) {
+    ret = socket_send(sock,
+                     NC_Nonblocking,
+                     session->wbuff,
+                     session->wpos,
+                     &size);
+    if (ret == SYSERR) {
+      GE_LOG_STRERROR(sh->ectx,
+                     GE_WARNING | GE_USER | GE_ADMIN | GE_BULK,
+                     "send");
+      destroySession(sh, session);
+      return SYSERR;
+    } 
+    if (ret == OK) {
+      if (size == 0) {
+       /* send only returns 0 on error (happens if 
+          other side closed connection), so close 
+          the session */
+       destroySession(sh, session);
+       return SYSERR;
+      }
+      if (size == session->wpos) {
+       session->wpos = 0;
+       GROW(session->wbuff,
+            session->wsize,
+            0);
+       break;
+      }
+      memmove(session->wbuff,
+             &session->wbuff[size],
+             session->wpos - size);
+      session->wpos -= size;
+      break;
+    }
+    GE_ASSERT(sh->ectx, ret == NO);
+    /* this should only happen under Win9x because
+       of a bug in the socket implementation (KB177346).
+       Let's sleep and try again. */
+    PTHREAD_SLEEP(20 * cronMILLIS);
+  }
+  session->lastUse = get_time();
+  return OK;
+}
+
+/**
+ * Thread that selects until it is signaled to shut down.
+ */
+static void * selectThread(void * ctx) {
+  struct SelectHandle * sh = ctx;
+  char * clientAddr;
+  fd_set readSet;
+  fd_set errorSet;
+  fd_set writeSet;
+  struct stat buf;
+  socklen_t lenOfIncomingAddr;
+  int i;
+  int max;
+  int ret;
+  int s;
+  void * sctx;
+  SocketHandle * sock;
+  Session * session;   
+
+  clientAddr = MALLOC(sh->max_addr_len);
+  MUTEX_LOCK(sh->lock);
+  while (sh->shutdown == NO) {
+    FD_ZERO(&readSet);
+    FD_ZERO(&errorSet);
+    FD_ZERO(&writeSet);
+    if (sh->signal_pipe[0] != -1) {
+      if (-1 == FSTAT(sh->signal_pipe[0], &buf)) {
+       GE_LOG_STRERROR(sh->ectx,
+                       GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+                       "fstat");
+       sh->signal_pipe[0] = -1; /* prevent us from error'ing all the time */   
+      } else {
+       FD_SET(sh->signal_pipe[0], 
+              &readSet);
+      }
+    }
+    max = sh->signal_pipe[0];
+    if (sh->listen_sock != NULL) {
+      if (! socket_test_valid(sh->listen_sock)) {
+       socket_destroy(sh->listen_sock);
+       GE_LOG(sh->ectx,
+              GE_USER | GE_ERROR | GE_BULK,
+              _("select listen socket not valid!\n"));
+       sh->listen_sock = NULL; /* prevent us from error'ing all the time */
+      } else {
+       add_to_select_set(sh->listen_sock, &readSet, &max);
+      } 
+    }
+    for (i=0;i<sh->sessionCount;i++) {
+      Session * session = sh->sessions[i];
+      struct SocketHandle * sock = session->sock;
+
+      if (! socket_test_valid(sock)) {
+       destroySession(sh, session);
+      } else {
+       add_to_select_set(sock, &readSet, &max);
+       add_to_select_set(sock, &errorSet, &max);
+       if (session->wpos > 0)
+         add_to_select_set(sock, &writeSet, &max); /* do we have a pending 
write request? */
+      }
+    }
+    MUTEX_UNLOCK(sh->lock);
+    ret = SELECT(max+1, 
+                &readSet,
+                &writeSet,
+                &errorSet,
+                NULL);
+    MUTEX_LOCK(sh->lock);
+    if ( (ret == -1) &&
+        ( (errno == EAGAIN) || (errno == EINTR) ) )
+      continue;
+    if (ret == -1) {
+      if (errno == EBADF) {
+       GE_LOG_STRERROR(sh->ectx,
+                       GE_ERROR | GE_ADMIN | GE_USER | GE_BULK, 
+                       "select");
+      } else {
+       GE_DIE_STRERROR(sh->ectx,
+                       GE_FATAL | GE_ADMIN | GE_USER | GE_IMMEDIATE,
+                       "select");
+      }
+    }
+    if ( (sh->listen_sock != NULL) &&
+        (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+      lenOfIncomingAddr = sh->max_addr_len;
+      memset(clientAddr,
+            0,
+            lenOfIncomingAddr);
+      s = ACCEPT(sh->listen_sock->handle,
+                (struct sockaddr *) clientAddr,
+                &lenOfIncomingAddr);
+      if (s == -1) {   
+       GE_LOG_STRERROR(sh->ectx,
+                       GE_WARNING | GE_ADMIN | GE_BULK,
+                       "accept");
+      } else {
+       sock = socket_create(sh->ectx,
+                            sh->load_monitor,
+                            s);
+       sctx = sh->ah(sh->ah_cls,
+                     sh,
+                     sock,
+                     clientAddr,
+                     lenOfIncomingAddr);
+       if (sctx == NULL) {
+         socket_destroy(sock);
+       } else {
+         session = MALLOC(sizeof(Session));
+         memset(session, 0, sizeof(Session));
+         session->sock = sock;
+         session->sock_ctx = sctx;
+         session->lastUse = get_time();
+         if (sh->sessionArrayLength == sh->sessionCount)
+           GROW(sh->sessions,
+                sh->sessionArrayLength,
+                sh->sessionArrayLength + 4);
+         sh->sessions[sh->sessionCount++] = session;
+       }
+      } 
+    }
+    if (FD_ISSET(sh->signal_pipe[0], &readSet)) {
+      /* allow reading multiple signals in one go in case we get many
+        in one shot... */
+#define MAXSIG_BUF 128
+      char buf[MAXSIG_BUF];
+      /* just a signal to refresh sets, eat and continue */
+      if (0 >= READ(sh->signal_pipe[0],
+                   &buf[0],
+                   MAXSIG_BUF)) {
+       GE_LOG_STRERROR(sh->ectx,
+                       GE_WARNING | GE_USER | GE_BULK, 
+                       "read");
+      }
+    }
+    for (i=0;i<sh->sessionCount;i++) {
+      session = sh->sessions[i];
+      sock = session->sock;
+      if ( (FD_ISSET(sock->handle, &readSet)) &&
+          (SYSERR == readAndProcess(sh,
+                                    session)) ) {
+       i--;
+       continue;       
+      }
+      if ( (FD_ISSET(sock->handle, &writeSet)) &&
+          (SYSERR == writeAndProcess(sh,
+                                     session)) ) {
+       i--;
+       continue;
+      }     
+      if (FD_ISSET(sock->handle, &errorSet)) {
+       destroySession(sh, 
+                      session);
+       i--;
+       continue;
+      }
+      if ( (sh->timeout != 0) &&
+          (get_time() > session->lastUse + sh->timeout) ) {
+       destroySession(sh, session);
+       i--;
+       continue;
+      }
+    }
+  }
+  MUTEX_UNLOCK(sh->lock);
+  FREE(clientAddr);
+  return NULL;
+}
+
+static int makeNonblocking(struct GE_Context * ectx,
+                           int handle) {
+#if MINGW
+  u_long l = 1;
+  if (ioctlsocket(handle, 
+                 FIONBIO, 
+                 &l) == SOCKET_ERROR) {
+    SetErrnoFromWinsockError(WSAGetLastError());
+    return SYSERR;
+  } else {
+    /* store the blocking mode */
+    __win_SetHandleBlockingMode(handle, 0);
+  }
+#else
+  int flags = fcntl(handle, F_GETFL);
+  flags |= O_NONBLOCK;
+  if (-1 == fcntl(handle,
+                 F_SETFL,
+                 flags)) {
+    GE_LOG_STRERROR(ectx,
+                   GE_WARNING | GE_USER | GE_ADMIN | GE_IMMEDIATE,
+                   "fcntl");
+    return SYSERR;
+  }
+#endif
+  return OK;
+}
+
+/**
+ * Start a select thread that will accept connections
+ * from the given socket and pass messages read to the
+ * given message handler.
+ *
+ * @param sock the listen socket
+ * @param max_addr_len maximum expected length of addresses for
+ *        connections accepted on the given socket
+ * @param mon maybe NULL
+ * @param memory_quota amount of memory available for
+ *        queueing messages (in bytes)
+ * @return NULL on error
+ */
+SelectHandle * select_create(struct GE_Context * ectx,
+                            struct LoadMonitor * mon,
+                            int sock,
+                            unsigned int max_addr_len,
+                            cron_t timeout,
+                            SelectMessageHandler mh,
+                            void * mh_cls,
+                            SelectAcceptHandler ah,
+                            void * ah_cls,
+                            SelectCloseHandler ch,
+                            void * ch_cls,
+                            unsigned int memory_quota) {
+  SelectHandle * sh;
+
+  if (0 != LISTEN(sock, 5)) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_USER | GE_IMMEDIATE,
+                   "listen");
+    return NULL;
+  }
+  sh = MALLOC(sizeof(SelectHandle));
+  memset(sh, 0, sizeof(SelectHandle));
+  if (0 != PIPE(sh->signal_pipe)) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_USER | GE_IMMEDIATE,
+                   "pipe");
+    FREE(sh);
+    return NULL;
+  }
+  if (OK != makeNonblocking(sh->ectx,
+                           sh->signal_pipe[0])) {
+    if ( (0 != CLOSE(sh->signal_pipe[0])) ||
+        (0 != CLOSE(sh->signal_pipe[1])) )
+      GE_LOG_STRERROR(ectx,
+                     GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+                     "close");    
+    FREE(sh);
+    return NULL;    
+  }      
+  sh->shutdown = NO;
+  sh->ectx = ectx;
+  sh->load_monitor = mon;
+  sh->max_addr_len = max_addr_len;
+  sh->mh = mh;
+  sh->mh_cls = mh_cls;
+  sh->ah = ah;
+  sh->ah_cls = ah_cls;
+  sh->ch = ch;
+  sh->ch_cls = ch_cls;
+  sh->memory_quota = memory_quota;
+  sh->timeout = timeout;
+  sh->lock = MUTEX_CREATE(NO);
+  sh->listen_sock = socket_create(ectx,
+                                 mon,
+                                 sock);
+  sh->thread = PTHREAD_CREATE(&selectThread,
+                             sh,
+                             4 * 1024);
+  if (sh->thread == NULL) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+                   "pthread_create");
+    socket_destroy(sh->listen_sock);
+    if ( (0 != CLOSE(sh->signal_pipe[0])) ||
+        (0 != CLOSE(sh->signal_pipe[1])) )
+      GE_LOG_STRERROR(ectx,
+                     GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+                     "close");
+    MUTEX_DESTROY(sh->lock);
+    FREE(sh);
+    return NULL;
+  }
+  return sh;
+}
+
+/**
+ * Terminate the select thread, close the socket and
+ * all associated connections.
+ */
+void select_destroy(struct SelectHandle * sh) {
+  void * unused;
+
+  sh->shutdown = YES;
+  signalSelect(sh);
+  PTHREAD_STOP_SLEEP(sh->thread);
+  PTHREAD_JOIN(sh->thread, &unused);
+  while (sh->sessionCount > 0)
+    destroySession(sh, sh->sessions[0]);
+  GROW(sh->sessions,
+       sh->sessionArrayLength,
+       0);
+  MUTEX_DESTROY(sh->lock);    
+  if (0 != CLOSE(sh->signal_pipe[1]))
+    GE_LOG_STRERROR(sh->ectx,
+                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+                   "close");
+  if (0 != CLOSE(sh->signal_pipe[0])) 
+    GE_LOG_STRERROR(sh->ectx,
+                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+                   "close");
+  if (sh->listen_sock != NULL)
+    socket_destroy(sh->listen_sock);
+  FREE(sh);
+}
+
+/**
+ * Queue the given message with the select thread.
+ *
+ * @param mayBlock if YES, blocks this thread until message
+ *        has been sent
+ * @param force message is important, queue even if
+ *        there is not enough space
+ * @return OK if the message was sent or queued, 
+ *         NO if there was not enough memory to queue it,
+ *         SYSERR if the sock does not belong with this select
+ */
+int select_write(struct SelectHandle * sh,
+                struct SocketHandle * sock,
+                const MESSAGE_HEADER * msg,
+                int mayBlock,
+                int force) {
+  Session * session;
+  int i;
+  unsigned short len;
+  int fresh_write;
+
+  session = NULL;
+  len = ntohs(msg->size);
+  MUTEX_LOCK(sh->lock);
+  for (i=0;i<sh->sessionCount;i++) 
+    if (sh->sessions[i]->sock == sock) {
+      session = sh->sessions[i];
+      break;
+    }
+  if (session == NULL) {
+    MUTEX_UNLOCK(sh->lock);
+    return SYSERR;    
+  }
+  fresh_write = (session->wsize == 0);
+  GROW(session->wbuff,
+       session->wsize,
+       session->wsize + len);
+  memcpy(&session->wbuff[session->wpos],
+        msg,
+        len);
+  session->wpos += len;
+  MUTEX_UNLOCK(sh->lock);
+  if (fresh_write)
+    signalSelect(sh);
+  return OK;
+}
+
+/**
+ * Add another (already connected) socket to the set of
+ * sockets managed by the select.
+ */
+int select_connect(struct SelectHandle * sh,
+                  struct SocketHandle * sock,
+                  void * sock_ctx) {
+  Session * session;   
+  
+  session = MALLOC(sizeof(Session));
+  memset(session, 0, sizeof(Session));
+  session->sock = sock;
+  session->sock_ctx = sock_ctx;
+  session->lastUse = get_time();
+  MUTEX_LOCK(sh->lock);
+  if (sh->sessionArrayLength == sh->sessionCount)
+    GROW(sh->sessions,
+        sh->sessionArrayLength,
+        sh->sessionArrayLength + 4);
+  sh->sessions[sh->sessionCount++] = session;
+  MUTEX_UNLOCK(sh->lock);
+  signalSelect(sh);
+  return OK;
+}
+
+/**
+ * Close the associated socket and remove it from the
+ * set of sockets managed by select.
+ */
+int select_disconnect(struct SelectHandle * sh,
+                     struct SocketHandle * sock) {
+  Session * session;
+  int i;
+ 
+  MUTEX_LOCK(sh->lock);
+  for (i=0;i<sh->sessionCount;i++) 
+    if (sh->sessions[i]->sock == sock) {
+      session = sh->sessions[i];
+      break;
+    } 
+  if (session == NULL) {
+    MUTEX_UNLOCK(sh->lock);
+    return SYSERR;
+  }
+  destroySession(sh, session);
+  MUTEX_UNLOCK(sh->lock);
+  signalSelect(sh);
+  return OK;
+}


Property changes on: GNUnet/src/util/network/select.c
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: GNUnet/src/util/network/tcpio.c
===================================================================
--- GNUnet/src/util/network/tcpio.c     2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/tcpio.c     2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,459 +0,0 @@
-/*
-     This file is part of GNUnet.
-     (C) 2001, 2002, 2006 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file util/network/tcpio.c
- * @brief code for synchronized access to TCP streams
- * @author Christian Grothoff
- *
- * Generic TCP code for reliable, mostly blocking, record-oriented TCP
- * connections. GNUnet uses the "tcpio" code for trusted client-server
- * (e.g. gnunet-gtk to gnunetd via loopback) communications.  Note
- * that an unblocking write is also provided since if both client and
- * server use blocking IO, both may block on a write and cause a
- * mutual inter-process deadlock.
- *
- * Since we do not want other peers (!) to be able to block a peer by
- * not reading from the TCP stream, the peer-to-peer TCP transport
- * uses unreliable, buffered, non-blocking, record-oriented TCP code
- * with a select call to reduce the number of threads which is
- * provided in transports/tcp.c.
- */
-
-#include "gnunet_util_network.h"
-#include "gnunet_util_os.h"
-#include "gnunet_util_config.h"
-#include "gnunet_protocols.h"
-#include "platform.h"
-
-#define DEBUG_TCPIO NO
-
-/**
- * Struct to refer to a GNUnet TCP connection.
- * This is more than just a socket because if the server
- * drops the connection, the client automatically tries
- * to reconnect (and for that needs connection information).
- */
-typedef struct ClientServerConnection {
-
-  /**
-   * the socket handle, NULL if not life
-   */
-  struct SocketHandle * sock;
-
-  struct MUTEX * readlock;
-  
-  struct MUTEX * writelock;
-
-  struct GE_Context * ectx;
-
-  struct GC_Configuration * cfg;
-
-  /**
-   * If this is gnunetd's server socket, then we cannot
-   * automatically reconnect after closing the connection
-   * (since it is an "accept" that gives the socket).<p>
-   *
-   * If this is NO, we should query the configuration and
-   * automagically try to reconnect.
-   */
-  int isServerSocket;
-
-} ClientServerConnection;
-
-
-/**
- * Return the port-number (in host byte order)
- * @return 0 on error
- */
-static unsigned short getGNUnetPort(struct GE_Context * ectx,
-                                   struct GC_Configuration * cfg) {
-  unsigned long long port;
-
-  port = 2087;
-  if (-1 == GC_get_configuration_value_number(cfg,
-                                             "NETWORK",
-                                             "PORT",
-                                             1,
-                                             65535,
-                                             2087,
-                                             &port)) {
-    GE_LOG(ectx,
-          GE_ERROR | GE_USER | GE_BULK,
-          _("Could not find valid value for PORT in section NETWORK."));
-    return 0;
-  }
-  return (unsigned short) port;
-}
-
-/**
- * Configuration: get the GNUnetd host where the client
- * should connect to (via TCP)
- *
- * @return the name of the host, NULL on error
- */
-static char * getGNUnetdHost(struct GE_Context * ectx,
-                            struct GC_Configuration * cfg) {
-  char * res;
-
-  res = NULL;
-  if (-1 == GC_get_configuration_value_string(cfg,
-                                             "NETWORK",
-                                             "HOST",
-                                             "localhost",
-                                             &res)) {
-    GE_LOG(ectx,
-          GE_ERROR | GE_USER | GE_BULK,
-          _("Could not find valid value for HOST in section NETWORK."));
-    return NULL;
-  }
-  return res;
-}
-
-struct ClientServerConnection * 
-client_connection_create(struct GE_Context * ectx,
-                        struct GC_Configuration * cfg,
-                        struct SocketHandle * sock) {
-  ClientServerConnection * result;
-
-  result = MALLOC(sizeof(ClientServerConnection));  
-  result->sock = sock;
-  result->readlock = MUTEX_CREATE(NO);
-  result->writelock = MUTEX_CREATE(NO);
-  result->ectx = ectx;
-  result->cfg = cfg;
-  result->isServerSocket = YES;
-  return result;
-}
-
-
-/**
- * Get a GNUnet TCP socket that is connected to gnunetd.
- */
-struct ClientServerConnection * 
-daemon_connection_create(struct GE_Context * ectx,
-                        struct GC_Configuration * cfg) {
-  ClientServerConnection * result;
-
-  result = MALLOC(sizeof(ClientServerConnection));  
-  result->sock = NULL;
-  result->readlock = MUTEX_CREATE(NO);
-  result->writelock = MUTEX_CREATE(NO);
-  result->ectx = ectx;
-  result->cfg = cfg;
-  result->isServerSocket = NO;
-  return result;
-}
-
-void connection_close_temporarily(struct ClientServerConnection * sock) {
-  if (sock->sock != NULL) {
-    socket_destroy(sock->sock);
-    sock->sock = NULL;
-  }
-}
-
-void connection_destroy(struct ClientServerConnection * sock) {
-  connection_close_temporarily(sock);
-  MUTEX_DESTROY(sock->readlock);
-  MUTEX_DESTROY(sock->writelock);
-  FREE(sock);
-}
-
-int connection_test_open(struct ClientServerConnection * sock) {
-  return (sock->sock != NULL);
-}
-
-/**
- * Check a socket, open and connect if it is closed and it is a client-socket.
- */
-int connection_ensure_connected(struct ClientServerConnection * sock) {
-  struct sockaddr_in soaddr;
-  fd_set rset;
-  fd_set wset;
-  fd_set eset;
-  struct timeval timeout;
-  int ret;
-  int osock;
-  unsigned short port;
-  char * host;
-  IPaddr ip;
-
-  if (sock->sock != NULL)
-    return OK;
-  port = getGNUnetPort(sock->ectx,
-                      sock->cfg);
-  if (port == 0)
-    return SYSERR;
-  host = getGNUnetdHost(sock->ectx, 
-                       sock->cfg);
-  if (host == NULL)
-    return SYSERR;
-  if (SYSERR == get_host_by_name(sock->ectx, 
-                                host,
-                                &ip)) {
-    FREE(host);
-    return SYSERR;
-  }
-  osock = SOCKET(PF_INET, SOCK_STREAM, 6); /* 6: TCP */
-  if (osock == -1) {
-    GE_LOG_STRERROR(sock->ectx,
-                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                   "socket");
-    FREE(host);
-    return SYSERR;
-  }
-  sock->sock = socket_create(sock->ectx,
-                            NULL,
-                            osock);
-  socket_set_blocking(sock->sock, NO);
-  soaddr.sin_family = AF_INET;
-  GE_ASSERT(sock->ectx,
-           sizeof(struct in_addr) == sizeof(IPaddr));
-  memcpy(&soaddr.sin_addr,
-        &ip,
-        sizeof(struct in_addr));
-  soaddr.sin_port = htons(port);
-  ret = CONNECT(osock,
-               (struct sockaddr*)&soaddr,
-               sizeof(soaddr));
-  if ( (ret < 0) &&
-       (errno != EINPROGRESS) ) {
-    GE_LOG(sock->ectx,
-          GE_WARNING | GE_USER | GE_BULK,
-          _("Cannot connect to %s:u: %s\n"),
-          host,
-          port,
-          STRERROR(errno));
-    socket_destroy(sock->sock);
-    FREE(host);
-    return SYSERR;
-  }
-  /* we call select() first with a timeout of 5s to
-     avoid blocking on a later write indefinitely;
-     Important if a local firewall decides to just drop 
-     the TCP handshake...*/
-  FD_ZERO(&rset);
-  FD_ZERO(&wset);
-  FD_ZERO(&eset);
-  FD_SET(osock, &wset);
-  timeout.tv_sec = 5;
-  timeout.tv_usec = 0;
-  ret = SELECT(osock + 1, 
-              &rset,
-              &wset,
-              &eset, 
-              &timeout);
-  if ( (ret == -1) ||
-       (! FD_ISSET(osock,
-                  &wset)) ) {
-    GE_LOG(sock->ectx,
-          GE_WARNING | GE_USER | GE_BULK,
-          _("Cannot connect to %s:u: %s\n"),
-          host,
-          port,
-          STRERROR(errno));
-    socket_destroy(sock->sock);
-    FREE(host);
-    return SYSERR;
-  }
-  FREE(host);
-  socket_set_blocking(sock->sock, YES);
-  return OK;
-}
-
-/**
- * Write to a GNUnet TCP socket.  Will also potentially complete the
- * sending of a previous non-blocking writeToSocket call.
- *
- * @param sock the socket to write to
- * @param buffer the buffer to write
- * @return OK if the write was sucessful, otherwise SYSERR.
- */
-int connection_write(struct ClientServerConnection * sock,
-                    const MESSAGE_HEADER * buffer) {
-  size_t size;
-  size_t sent;
-  int res;
-
-  if (SYSERR == connection_ensure_connected(sock))
-    return SYSERR;
-  size = ntohs(buffer->size);
-  MUTEX_LOCK(sock->writelock);
-  res = socket_send(sock->sock,
-                   NC_Complete,
-                   buffer,
-                   size,
-                   &sent);
-  if ( (res != YES) ||
-       (sent != size) ) {
-    connection_close_temporarily(sock);
-    MUTEX_UNLOCK(sock->writelock);
-    return SYSERR;
-  }
-  MUTEX_UNLOCK(sock->writelock);
-  return OK;
-}
-
-int connection_read(struct ClientServerConnection * sock,
-                   MESSAGE_HEADER ** buffer) {
-  int res;
-  unsigned int pos;
-  char * buf;
-  unsigned short size;
-
-  if (OK != connection_ensure_connected(sock))
-    return SYSERR;
-  
-  MUTEX_LOCK(sock->readlock);
-  pos = 0;
-  res = 0;
-  if ( (OK != socket_recv(sock->sock,
-                         NC_Complete,
-                         &size,
-                         sizeof(unsigned short),
-                         &pos)) ||
-       (pos != sizeof(unsigned short)) ) {
-    connection_close_temporarily(sock);
-    MUTEX_UNLOCK(sock->readlock);
-    return SYSERR;
-  }
-  size = ntohs(size);
-  if (size < sizeof(MESSAGE_HEADER)) {
-    connection_close_temporarily(sock);
-    MUTEX_UNLOCK(sock->readlock);
-    return SYSERR; /* invalid header */
-  }
-
-  buf = MALLOC(size);
-  if ( (OK != socket_recv(sock->sock,
-                         NC_Complete,
-                         &buf[pos],
-                         size - pos,
-                         &pos)) ||
-       (pos != sizeof(unsigned short) + size) ) {
-    connection_close_temporarily(sock);
-    FREE(buf);
-    MUTEX_UNLOCK(sock->readlock);
-    return SYSERR;
-  }
-#if DEBUG_TCPIO
-  LOG(LOG_DEBUG,
-      "Successfully received %d bytes from TCP socket.\n",
-      size);
-#endif
-  MUTEX_UNLOCK(sock->readlock);
-  *buffer = (MESSAGE_HEADER*) buf;
-  (*buffer)->size = htons(size);
-  return OK; /* success */
-}
-
-
-
-/**
- * CS communication: simple return value
- */
-typedef struct {
-
-  /**
-   * The CS header (values: sizeof(CS_returnvalue_MESSAGE) + error-size, 
CS_PROTO_RETURN_VALUE)
-   */
-  MESSAGE_HEADER header;
-
-  /**
-   * The return value (network byte order)
-   */
-  int return_value;
-
-} RETURN_VALUE_MESSAGE;
-
-/**
- * Obtain a return value from a remote call from TCP.
- *
- * @param sock the TCP socket
- * @param ret the return value from TCP
- * @return SYSERR on error, OK if the return value was read
- * successfully
- */
-int connection_read_result(struct ClientServerConnection * sock,
-                          int * ret) {
-  RETURN_VALUE_MESSAGE * rv;
-
-  rv = NULL;
-  if (SYSERR == connection_read(sock,
-                               (MESSAGE_HEADER **) &rv)) 
-    return SYSERR;
-  if ( (ntohs(rv->header.size) != sizeof(RETURN_VALUE_MESSAGE)) ||
-       (ntohs(rv->header.type) != CS_PROTO_RETURN_VALUE) ) {
-    GE_LOG(sock->ectx,
-          GE_WARNING | GE_DEVELOPER | GE_BULK,
-          _("`%s' failed, reply invalid!\n"),
-          __FUNCTION__);
-    FREE(rv);
-    return SYSERR;
-  }
-  *ret = ntohl(rv->return_value);
-  FREE(rv);
-  return OK;
-}
-
-/**
- * Send a return value to the caller of a remote call via
- * TCP.
- * @param sock the TCP socket
- * @param ret the return value to send via TCP
- * @return SYSERR on error, OK if the return value was
- *         send successfully
- */
-int connection_write_result(struct ClientServerConnection * sock,
-                           int ret) {
-  RETURN_VALUE_MESSAGE rv;
-
-  rv.header.size
-    = htons(sizeof(RETURN_VALUE_MESSAGE));
-  rv.header.type
-    = htons(CS_PROTO_RETURN_VALUE);
-  rv.return_value
-    = htonl(ret);
-  return connection_write(sock,
-                         &rv.header);
-}
-
-/**
- * Send a return value that indicates
- * a serious error to the other side.
- *
- * @param sock the TCP socket
- * @param mask GE_MASK 
- * @param date date string
- * @param msg message string
- * @return SYSERR on error, OK if the error code was send
- *         successfully
- */
-int connection_write_error(struct ClientServerConnection * sock,
-                          GE_KIND mask,
-                          const char * date,
-                          const char * msg) {
-  return SYSERR; /* not implemented! */
-}
-
-
-
-
-/*  end of tcpio.c */

Deleted: GNUnet/src/util/network/tcpiotest.c
===================================================================
--- GNUnet/src/util/network/tcpiotest.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/tcpiotest.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,261 +0,0 @@
-/**
- * @file test/tcpiotest.c
- * @brief testcase for util/tcpiotest.c
- */
-
-#include "gnunet_util.h"
-#include "platform.h"
-
-static int openServerSocket() {
-  int listenerFD;
-  int listenerPort;
-  struct sockaddr_in serverAddr;
-  const int on = 1;
-
-  listenerPort = getGNUnetPort();
-  /* create the socket */
-  while ( (listenerFD = SOCKET(PF_INET, SOCK_STREAM, 0)) < 0) {
-    LOG(LOG_ERROR,
-       "ERROR opening socket (%s).  "
-       "No client service started.  "
-       "Trying again in 30 seconds.\n",
-       STRERROR(errno));
-    sleep(30);
-  }
-
-  /* fill in the inet address structure */
-  memset((char *) &serverAddr,
-        0,
-        sizeof(serverAddr));
-  serverAddr.sin_family = AF_INET;
-  serverAddr.sin_addr.s_addr=htonl(INADDR_ANY);
-  serverAddr.sin_port=htons(listenerPort);
-
-  if ( SETSOCKOPT(listenerFD,
-                 SOL_SOCKET,
-                 SO_REUSEADDR,
-                 &on, sizeof(on)) < 0 )
-    perror("setsockopt");
-
-  /* bind the socket */
-  if (BIND (listenerFD,
-          (struct sockaddr *) &serverAddr,
-           sizeof(serverAddr)) < 0) {
-    LOG(LOG_ERROR,
-       "ERROR (%s) binding the TCP listener to port %d. "
-       "Test failed.  Is gnunetd running?\n",
-       STRERROR(errno),
-       listenerPort);
-    return -1;
-  }
-
-  /* start listening for new connections */
-  if (0 != LISTEN(listenerFD, 5)) {
-    LOG(LOG_ERROR,
-       " listen failed: %s\n",
-       STRERROR(errno));
-    return -1;
-  }
-  return listenerFD;
-}
-
-static int doAccept(int serverSocket) {
-  int incomingFD;
-  int lenOfIncomingAddr;
-  struct sockaddr_in clientAddr;
-
-  incomingFD = -1;
-  while (incomingFD < 0) {
-    lenOfIncomingAddr = sizeof(clientAddr);
-    incomingFD = ACCEPT(serverSocket,
-                       (struct sockaddr *)&clientAddr,
-                       &lenOfIncomingAddr);
-    if (incomingFD < 0) {
-      LOG(LOG_ERROR,
-         "ERROR accepting new connection (%s).\n",
-         STRERROR(errno));
-      continue;
-    }
-  }
-  return incomingFD;
-}
-
-/**
- * Perform option parsing from the command line.
- */
-static int parseCommandLine(int argc,
-                           char * argv[]) {
-  char c;
-
-  while (1) {
-    int option_index = 0;
-    static struct GNoption long_options[] = {
-      { "config",  1, 0, 'c' },
-      { 0,0,0,0 }
-    };
-
-    c = GNgetopt_long(argc,
-                     argv,
-                     "c:",
-                     long_options,
-                     &option_index);
-
-    if (c == -1)
-      break;  /* No more flags to process */
-
-    switch(c) {
-    case 'c':
-      FREENONNULL(setConfigurationString("FILES",
-                                        "gnunet.conf",
-                                        GNoptarg));
-      break;
-    } /* end of parsing commandline */
-  }
-  FREENONNULL(setConfigurationString("GNUNETD",
-                                    "LOGFILE",
-                                    NULL));
-  FREENONNULL(setConfigurationString("GNUNETD",
-                                    "LOGLEVEL",
-                                    "DEBUG"));
-  return OK;
-}
-
-static int testTransmission(GNUNET_TCP_SOCKET * a,
-                           GNUNET_TCP_SOCKET * b) {
-  CS_MESSAGE_HEADER * hdr;
-  CS_MESSAGE_HEADER * buf;
-  int i;
-  int j;
-
-  hdr = MALLOC(1024);
-  for (i=0;i<1024-sizeof(CS_MESSAGE_HEADER);i+=7) {
-    fprintf(stderr, ".");
-    for (j=0;j<i;j++)
-      ((char*)&hdr[1])[j] = (char)i+j;
-    hdr->size = htons(i+sizeof(CS_MESSAGE_HEADER));
-    hdr->type = 0;
-    if (OK != writeToSocket(a, hdr)) {
-      FREE(hdr);
-      return 1;
-    }
-    buf = NULL;
-    if (OK != readFromSocket(b, &buf)) {
-      FREE(hdr);
-      return 2;
-    }
-    if (0 != memcmp(buf, hdr, i+sizeof(CS_MESSAGE_HEADER))) {
-      FREE(buf);
-      FREE(hdr);
-      return 4;
-    }
-    FREE(buf);
-  }
-  FREE(hdr);
-  return 0;
-}
-
-static int testNonblocking(GNUNET_TCP_SOCKET * a,
-                          GNUNET_TCP_SOCKET * b) {
-  CS_MESSAGE_HEADER * hdr;
-  CS_MESSAGE_HEADER * buf;
-  int i;
-  int cnt;
-
-  hdr = MALLOC(1024);
-  for (i=0;i<1024-sizeof(CS_MESSAGE_HEADER);i+=11)
-    ((char*)&hdr[1])[i] = (char)i;
-  hdr->size = htons(64+sizeof(CS_MESSAGE_HEADER));
-  hdr->type = 0;
-  while (OK == writeToSocketNonBlocking(a,
-                                       hdr))
-    hdr->type++;
-  i = 0;
-  cnt = hdr->type;
-  /* printf("Reading %u messages.\n", cnt); */
-  if (cnt < 2)
-    return 8; /* could not write ANY data non-blocking!? */
-  for (i=0;i<cnt;i++) {
-    hdr->type = i;
-    buf = NULL;
-    if (OK != readFromSocket(b, &buf)) {
-      FREE(hdr);
-      return 16;
-    }
-    if (0 != memcmp(buf, hdr, 64+sizeof(CS_MESSAGE_HEADER))) {
-      printf("Failure in message %u.  Headers: %d ? %d\n",
-            i,
-            buf->type,
-            hdr->type);
-      FREE(buf);
-      FREE(hdr);
-      return 32;
-    }
-    FREE(buf);
-    if (i == cnt - 2) {
-      /* printf("Blocking write to flush last non-blocking message.\n"); */
-      hdr->type = cnt;
-      if (OK != writeToSocket(a,
-                             hdr)) {
-       FREE(hdr);
-       return 64;
-      }
-    }
-  }
-  hdr->type = i;
-  buf = NULL;
-  if (OK != readFromSocket(b, &buf)) {
-    FREE(hdr);
-    return 128;
-  }
-  if (0 != memcmp(buf, hdr, 64+sizeof(CS_MESSAGE_HEADER))) {
-    FREE(buf);
-    FREE(hdr);
-    return 256;
-  }
-  FREE(buf);
-  FREE(hdr);
-  return 0;
-}
-
-int main(int argc, char * argv[]){
-  int i;
-  int ret;
-  int serverSocket;
-  GNUNET_TCP_SOCKET * clientSocket;
-  GNUNET_TCP_SOCKET acceptSocket;
-
-  ret = 0;
-  initUtil(argc, argv, &parseCommandLine);
-  serverSocket = openServerSocket();
-  clientSocket = getClientSocket();
-  if (serverSocket == -1) {
-    releaseClientSocket(clientSocket);
-    doneUtil();
-    return 1;
-  }
-  for (i=0;i<2;i++) {
-    if (OK == checkSocket(clientSocket)) {
-      if (OK == initGNUnetServerSocket(doAccept(serverSocket),
-                                      &acceptSocket)) {
-       ret = ret | testTransmission(clientSocket, &acceptSocket);
-       ret = ret | testTransmission(&acceptSocket, clientSocket);
-       ret = ret | testNonblocking(clientSocket, &acceptSocket);
-       ret = ret | testNonblocking(&acceptSocket, clientSocket);
-       closeSocketTemporarily(clientSocket);
-       destroySocket(&acceptSocket);
-       fprintf(stderr, "\n");
-      } else {
-       fprintf(stderr, "initGNUnetServerSocket failed.\n");
-       ret = -1;
-      }
-    } else {
-      fprintf(stderr, "checkSocket faild.\n");
-      ret = -1;
-    }
-  }
-  releaseClientSocket(clientSocket);
-  doneUtil();
-  if (ret > 0)
-    fprintf(stderr, "Error %d\n", ret);
-  return ret;
-}

Copied: GNUnet/src/util/network_client/Makefile.am (from rev 3130, 
GNUnet/src/util/network/Makefile.am)
===================================================================
--- GNUnet/src/util/network/Makefile.am 2006-07-24 23:05:12 UTC (rev 3130)
+++ GNUnet/src/util/network_client/Makefile.am  2006-07-28 08:29:51 UTC (rev 
3142)
@@ -0,0 +1,22 @@
+INCLUDES = -I$(top_srcdir)/src/include
+
+SUBDIRS = .
+
+lib_LTLIBRARIES = \
+  libgnunetutil_network_client.la
+
+libgnunetutil_network_client_la_SOURCES = \
+ daemon.c \
+ tcpio.c 
+
+check_PROGRAMS = \
+ tcpiotest 
+
+TESTS = $(check_PROGRAMS)
+
+tcpiotest_SOURCES = \
+ tcpiotest.c
+tcpiotest_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/util/network_client/libgnunetutil_network_client.la 
+

Copied: GNUnet/src/util/network_client/daemon.c (from rev 3130, 
GNUnet/src/util/network/daemon.c)

Copied: GNUnet/src/util/network_client/tcpio.c (from rev 3130, 
GNUnet/src/util/network/tcpio.c)
===================================================================
--- GNUnet/src/util/network/tcpio.c     2006-07-24 23:05:12 UTC (rev 3130)
+++ GNUnet/src/util/network_client/tcpio.c      2006-07-28 08:29:51 UTC (rev 
3142)
@@ -0,0 +1,409 @@
+/*
+     This file is part of GNUnet.
+     (C) 2001, 2002, 2006 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file util/network/tcpio.c
+ * @brief code for synchronized access to TCP streams
+ * @author Christian Grothoff
+ *
+ * Generic TCP code for reliable, mostly blocking, record-oriented TCP
+ * connections. GNUnet uses the "tcpio" code for trusted client-server
+ * (e.g. gnunet-gtk to gnunetd via loopback) communications.  Note
+ * that an unblocking write is also provided since if both client and
+ * server use blocking IO, both may block on a write and cause a
+ * mutual inter-process deadlock.
+ *
+ * Since we do not want other peers (!) to be able to block a peer by
+ * not reading from the TCP stream, the peer-to-peer TCP transport
+ * uses unreliable, buffered, non-blocking, record-oriented TCP code
+ * with a select call to reduce the number of threads which is
+ * provided in transports/tcp.c.
+ */
+
+#include "gnunet_util_network.h"
+#include "gnunet_util_os.h"
+#include "gnunet_util_config.h"
+#include "gnunet_protocols.h"
+#include "platform.h"
+
+#define DEBUG_TCPIO NO
+
+/**
+ * Struct to refer to a GNUnet TCP connection.
+ * This is more than just a socket because if the server
+ * drops the connection, the client automatically tries
+ * to reconnect (and for that needs connection information).
+ */
+typedef struct ClientServerConnection {
+
+  /**
+   * the socket handle, NULL if not life
+   */
+  struct SocketHandle * sock;
+
+  struct MUTEX * readlock;
+  
+  struct MUTEX * writelock;
+
+  struct GE_Context * ectx;
+
+  struct GC_Configuration * cfg;
+
+} ClientServerConnection;
+
+
+/**
+ * Return the port-number (in host byte order)
+ * @return 0 on error
+ */
+static unsigned short getGNUnetPort(struct GE_Context * ectx,
+                                   struct GC_Configuration * cfg) {
+  unsigned long long port;
+
+  port = 2087;
+  if (-1 == GC_get_configuration_value_number(cfg,
+                                             "NETWORK",
+                                             "PORT",
+                                             1,
+                                             65535,
+                                             2087,
+                                             &port)) {
+    GE_LOG(ectx,
+          GE_ERROR | GE_USER | GE_BULK,
+          _("Could not find valid value for PORT in section NETWORK."));
+    return 0;
+  }
+  return (unsigned short) port;
+}
+
+/**
+ * Configuration: get the GNUnetd host where the client
+ * should connect to (via TCP)
+ *
+ * @return the name of the host, NULL on error
+ */
+static char * getGNUnetdHost(struct GE_Context * ectx,
+                            struct GC_Configuration * cfg) {
+  char * res;
+
+  res = NULL;
+  if (-1 == GC_get_configuration_value_string(cfg,
+                                             "NETWORK",
+                                             "HOST",
+                                             "localhost",
+                                             &res)) {
+    GE_LOG(ectx,
+          GE_ERROR | GE_USER | GE_BULK,
+          _("Could not find valid value for HOST in section NETWORK."));
+    return NULL;
+  }
+  return res;
+}
+
+struct ClientServerConnection * 
+client_connection_create(struct GE_Context * ectx,
+                        struct GC_Configuration * cfg,
+                        struct SocketHandle * sock) {
+  ClientServerConnection * result;
+
+  result = MALLOC(sizeof(ClientServerConnection));  
+  result->sock = sock;
+  result->readlock = MUTEX_CREATE(NO);
+  result->writelock = MUTEX_CREATE(NO);
+  result->ectx = ectx;
+  result->cfg = cfg;
+  result->isServerSocket = YES;
+  return result;
+}
+
+void connection_close_temporarily(struct ClientServerConnection * sock) {
+  if (sock->sock != NULL) {
+    socket_destroy(sock->sock);
+    sock->sock = NULL;
+  }
+}
+
+void connection_destroy(struct ClientServerConnection * sock) {
+  connection_close_temporarily(sock);
+  MUTEX_DESTROY(sock->readlock);
+  MUTEX_DESTROY(sock->writelock);
+  FREE(sock);
+}
+
+int connection_test_open(struct ClientServerConnection * sock) {
+  return (sock->sock != NULL);
+}
+
+/**
+ * Check a socket, open and connect if it is closed and it is a client-socket.
+ */
+int connection_ensure_connected(struct ClientServerConnection * sock) {
+  struct sockaddr_in soaddr;
+  fd_set rset;
+  fd_set wset;
+  fd_set eset;
+  struct timeval timeout;
+  int ret;
+  int osock;
+  unsigned short port;
+  char * host;
+  IPaddr ip;
+
+  if (sock->sock != NULL)
+    return OK;
+  port = getGNUnetPort(sock->ectx,
+                      sock->cfg);
+  if (port == 0)
+    return SYSERR;
+  host = getGNUnetdHost(sock->ectx, 
+                       sock->cfg);
+  if (host == NULL)
+    return SYSERR;
+  if (SYSERR == get_host_by_name(sock->ectx, 
+                                host,
+                                &ip)) {
+    FREE(host);
+    return SYSERR;
+  }
+  osock = SOCKET(PF_INET, SOCK_STREAM, 6); /* 6: TCP */
+  if (osock == -1) {
+    GE_LOG_STRERROR(sock->ectx,
+                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+                   "socket");
+    FREE(host);
+    return SYSERR;
+  }
+  sock->sock = socket_create(sock->ectx,
+                            NULL,
+                            osock);
+  socket_set_blocking(sock->sock, NO);
+  soaddr.sin_family = AF_INET;
+  GE_ASSERT(sock->ectx,
+           sizeof(struct in_addr) == sizeof(IPaddr));
+  memcpy(&soaddr.sin_addr,
+        &ip,
+        sizeof(struct in_addr));
+  soaddr.sin_port = htons(port);
+  ret = CONNECT(osock,
+               (struct sockaddr*)&soaddr,
+               sizeof(soaddr));
+  if ( (ret < 0) &&
+       (errno != EINPROGRESS) ) {
+    GE_LOG(sock->ectx,
+          GE_WARNING | GE_USER | GE_BULK,
+          _("Cannot connect to %s:u: %s\n"),
+          host,
+          port,
+          STRERROR(errno));
+    socket_destroy(sock->sock);
+    FREE(host);
+    return SYSERR;
+  }
+  /* we call select() first with a timeout of 5s to
+     avoid blocking on a later write indefinitely;
+     Important if a local firewall decides to just drop 
+     the TCP handshake...*/
+  FD_ZERO(&rset);
+  FD_ZERO(&wset);
+  FD_ZERO(&eset);
+  FD_SET(osock, &wset);
+  timeout.tv_sec = 5;
+  timeout.tv_usec = 0;
+  ret = SELECT(osock + 1, 
+              &rset,
+              &wset,
+              &eset, 
+              &timeout);
+  if ( (ret == -1) ||
+       (! FD_ISSET(osock,
+                  &wset)) ) {
+    GE_LOG(sock->ectx,
+          GE_WARNING | GE_USER | GE_BULK,
+          _("Cannot connect to %s:u: %s\n"),
+          host,
+          port,
+          STRERROR(errno));
+    socket_destroy(sock->sock);
+    FREE(host);
+    return SYSERR;
+  }
+  FREE(host);
+  socket_set_blocking(sock->sock, YES);
+  return OK;
+}
+
+/**
+ * Write to a GNUnet TCP socket.  Will also potentially complete the
+ * sending of a previous non-blocking writeToSocket call.
+ *
+ * @param sock the socket to write to
+ * @param buffer the buffer to write
+ * @return OK if the write was sucessful, otherwise SYSERR.
+ */
+int connection_write(struct ClientServerConnection * sock,
+                    const MESSAGE_HEADER * buffer) {
+  size_t size;
+  size_t sent;
+  int res;
+
+  if (SYSERR == connection_ensure_connected(sock))
+    return SYSERR;
+  size = ntohs(buffer->size);
+  MUTEX_LOCK(sock->writelock);
+  res = socket_send(sock->sock,
+                   NC_Complete,
+                   buffer,
+                   size,
+                   &sent);
+  if ( (res != YES) ||
+       (sent != size) ) {
+    connection_close_temporarily(sock);
+    MUTEX_UNLOCK(sock->writelock);
+    return SYSERR;
+  }
+  MUTEX_UNLOCK(sock->writelock);
+  return OK;
+}
+
+int connection_read(struct ClientServerConnection * sock,
+                   MESSAGE_HEADER ** buffer) {
+  int res;
+  unsigned int pos;
+  char * buf;
+  unsigned short size;
+
+  if (OK != connection_ensure_connected(sock))
+    return SYSERR;
+  
+  MUTEX_LOCK(sock->readlock);
+  pos = 0;
+  res = 0;
+  if ( (OK != socket_recv(sock->sock,
+                         NC_Complete,
+                         &size,
+                         sizeof(unsigned short),
+                         &pos)) ||
+       (pos != sizeof(unsigned short)) ) {
+    connection_close_temporarily(sock);
+    MUTEX_UNLOCK(sock->readlock);
+    return SYSERR;
+  }
+  size = ntohs(size);
+  if (size < sizeof(MESSAGE_HEADER)) {
+    connection_close_temporarily(sock);
+    MUTEX_UNLOCK(sock->readlock);
+    return SYSERR; /* invalid header */
+  }
+
+  buf = MALLOC(size);
+  if ( (OK != socket_recv(sock->sock,
+                         NC_Complete,
+                         &buf[pos],
+                         size - pos,
+                         &pos)) ||
+       (pos != sizeof(unsigned short) + size) ) {
+    connection_close_temporarily(sock);
+    FREE(buf);
+    MUTEX_UNLOCK(sock->readlock);
+    return SYSERR;
+  }
+#if DEBUG_TCPIO
+  LOG(LOG_DEBUG,
+      "Successfully received %d bytes from TCP socket.\n",
+      size);
+#endif
+  MUTEX_UNLOCK(sock->readlock);
+  *buffer = (MESSAGE_HEADER*) buf;
+  (*buffer)->size = htons(size);
+  return OK; /* success */
+}
+
+/**
+ * Obtain a return value from a remote call from TCP.
+ *
+ * @param sock the TCP socket
+ * @param ret the return value from TCP
+ * @return SYSERR on error, OK if the return value was read
+ * successfully
+ */
+int connection_read_result(struct ClientServerConnection * sock,
+                          int * ret) {
+  RETURN_VALUE_MESSAGE * rv;
+
+  rv = NULL;
+  if (SYSERR == connection_read(sock,
+                               (MESSAGE_HEADER **) &rv)) 
+    return SYSERR;
+  if ( (ntohs(rv->header.size) != sizeof(RETURN_VALUE_MESSAGE)) ||
+       (ntohs(rv->header.type) != CS_PROTO_RETURN_VALUE) ) {
+    GE_LOG(sock->ectx,
+          GE_WARNING | GE_DEVELOPER | GE_BULK,
+          _("`%s' failed, reply invalid!\n"),
+          __FUNCTION__);
+    FREE(rv);
+    return SYSERR;
+  }
+  *ret = ntohl(rv->return_value);
+  FREE(rv);
+  return OK;
+}
+
+/**
+ * Send a return value to the caller of a remote call via
+ * TCP.
+ * @param sock the TCP socket
+ * @param ret the return value to send via TCP
+ * @return SYSERR on error, OK if the return value was
+ *         send successfully
+ */
+int connection_write_result(struct ClientServerConnection * sock,
+                           int ret) {
+  RETURN_VALUE_MESSAGE rv;
+
+  rv.header.size
+    = htons(sizeof(RETURN_VALUE_MESSAGE));
+  rv.header.type
+    = htons(CS_PROTO_RETURN_VALUE);
+  rv.return_value
+    = htonl(ret);
+  return connection_write(sock,
+                         &rv.header);
+}
+
+/**
+ * Send a return value that indicates
+ * a serious error to the other side.
+ *
+ * @param sock the TCP socket
+ * @param mask GE_MASK 
+ * @param date date string
+ * @param msg message string
+ * @return SYSERR on error, OK if the error code was send
+ *         successfully
+ */
+int connection_write_error(struct ClientServerConnection * sock,
+                          GE_KIND mask,
+                          const char * date,
+                          const char * msg) {
+  return SYSERR; /* not implemented! */
+}
+
+
+/*  end of tcpio.c */

Copied: GNUnet/src/util/network_client/tcpiotest.c (from rev 3130, 
GNUnet/src/util/network/tcpiotest.c)





reply via email to

[Prev in Thread] Current Thread [Next in Thread]