gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3149 - GNUnet/src/transports


From: grothoff
Subject: [GNUnet-SVN] r3149 - GNUnet/src/transports
Date: Fri, 28 Jul 2006 17:22:45 -0700 (PDT)

Author: grothoff
Date: 2006-07-28 17:22:42 -0700 (Fri, 28 Jul 2006)
New Revision: 3149

Added:
   GNUnet/src/transports/tcp_helper.c
Modified:
   GNUnet/src/transports/http.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/tcp6.c
   GNUnet/src/transports/udp.c
Log:
tcp cleanup

Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c        2006-07-28 22:31:20 UTC (rev 3148)
+++ GNUnet/src/transports/http.c        2006-07-29 00:22:42 UTC (rev 3149)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2003, 2004, 2005 Christian Grothoff (and other contributing authors)
+     (C) 2003, 2004, 2005, 2006 Christian Grothoff (and other contributing 
authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-07-28 22:31:20 UTC (rev 3148)
+++ GNUnet/src/transports/tcp.c 2006-07-29 00:22:42 UTC (rev 3149)
@@ -41,6 +41,8 @@
 
 #define TARGET_BUFFER_SIZE 4092
 
+#include "tcp_helper.c"
+
 /**
  * Host-Address in a TCP network.
  */
@@ -62,87 +64,27 @@
 
 } HostAddress;
 
-/**
- * Initial handshake message. Note that the beginning
- * must match the CS_MESSAGE_HEADER since we are using tcpio.
- */
-typedef struct {
-  MESSAGE_HEADER header;
-
-  /**
-   * Identity of the node connecting (TCP client)
-   */
-  PeerIdentity clientIdentity;
-} TCPWelcome;
-
-/**
- * Transport Session handle.
- */
-typedef struct {
-  /**
-   * the tcp socket (used to identify this connection with selector)
-   */
-  struct SocketHandle * sock;
-
-  /**
-   * number of users of this session (reference count)
-   */
-  int users;
-
-  /**
-   * mutex for synchronized access to 'users'
-   */
-  struct MUTEX * lock;
-
-  /**
-   * To whom are we talking to (set to our identity
-   * if we are still waiting for the welcome message)
-   */
-  PeerIdentity sender;
-
-  /**
-   * Are we still expecting the welcome? (YES/NO)
-   */
-  int expectingWelcome;
-
-} TCPSession;
-
 /* *********** globals ************* */
 
-/**
- * apis (our advertised API and the core api )
- */
-static CoreAPIForTransport * coreAPI;
-
 static TransportAPI tcpAPI;
 
-static Stats_ServiceAPI * stats;
-
-static int stat_bytesReceived;
-
-static int stat_bytesSent;
-
-static int stat_bytesDropped;
-
-/* configuration */
 static struct CIDRNetwork * filteredNetworks_;
 
-static struct SelectHandle * selector;
-
-static struct GE_Context * ectx;
-
 static struct GC_Configuration * cfg;
 
-static struct MUTEX * tcplock;
-
-/* ******************** helper functions *********************** */
-
 /**
  * Check if we are allowed to connect to the given IP.
  */
-static int isBlacklisted(IPaddr ip) {
+static int isBlacklisted(const void * addr,
+                        unsigned int addr_len) {
+  IPaddr ip;
   int ret;
-
+  
+  if (addr_len != sizeof(IPaddr))
+    return SYSERR;
+  memcpy(&ip,
+        addr,
+        addr_len);
   MUTEX_LOCK(tcplock);
   ret = check_ipv4_listed(filteredNetworks_,
                          ip);
@@ -175,215 +117,6 @@
 }
 
 /**
- * Disconnect from a remote node. May only be called
- * on sessions that were aquired by the caller first.
- * For the core, aquiration means to call associate or
- * connect. The number of disconnects must match the
- * number of calls to connect+associate.
- *
- * @param tsession the session that is closed
- * @return OK on success, SYSERR if the operation failed
- */
-static int tcpDisconnect(TSession * tsession) {
-  TCPSession * tcpsession = tsession->internal;
-
-  GE_ASSERT(ectx, tcpsession != NULL);
-  MUTEX_LOCK(tcpsession->lock);
-  tcpsession->users--;
-  if (tcpsession->users > 0) {
-    MUTEX_UNLOCK(tcpsession->lock);
-    return OK;
-  }  
-  select_disconnect(selector,
-                   tcpsession->sock);
-  MUTEX_UNLOCK(tcpsession->lock);
-  MUTEX_DESTROY(tcpsession->lock);
-  FREE(tcpsession);  
-  FREE(tsession);
-  return OK;
-}
-
-/**
- * A (core) Session is to be associated with a transport session. The
- * transport service may want to know in order to call back on the
- * core if the connection is being closed. Associate can also be
- * called to test if it would be possible to associate the session
- * later, in this case the argument session is NULL. This can be used
- * to test if the connection must be closed by the core or if the core
- * can assume that it is going to be self-managed (if associate
- * returns OK and session was NULL, the transport layer is responsible
- * for eventually freeing resources associated with the tesession). If
- * session is not NULL, the core takes responsbility for eventually
- * calling disconnect.
- *
- * @param tsession the session handle passed along
- *   from the call to receive that was made by the transport
- *   layer
- * @return OK if the session could be associated,
- *         SYSERR if not.
- */
-static int tcpAssociate(TSession * tsession) {
-  TCPSession * tcpSession;
-
-  GE_ASSERT(ectx, tsession != NULL);
-  tcpSession = tsession->internal;
-  MUTEX_LOCK(tcpSession->lock);
-  tcpSession->users++;
-  MUTEX_UNLOCK(tcpSession->lock);
-  return OK;
-}
-
-/**
- * The socket of session i has data waiting, process!
- *
- * This function may only be called if the tcplock is
- * already held by the caller.
- */
-static int select_message_handler(void * mh_cls,
-                                 struct SelectHandle * sh,
-                                 struct SocketHandle * sock,
-                                 void * sock_ctx,
-                                 const MESSAGE_HEADER * msg) {
-  TSession * tsession = sock_ctx;
-  TCPSession * tcpSession;
-  unsigned int len;
-  P2P_PACKET * mp;
-  const TCPWelcome * welcome;
-
-  if (SYSERR == tcpAssociate(tsession))
-    return SYSERR;
-  len = ntohs(msg->size);
-  tcpSession = tsession->internal;
-  if (YES == tcpSession->expectingWelcome) {    
-    welcome = (const TCPWelcome*) msg;
-    if ( (ntohs(welcome->header.type) != 0) ||
-        (len != sizeof(TCPWelcome)) ) {
-      tcpDisconnect(tsession);
-      return SYSERR;    
-    }
-    tcpSession->expectingWelcome = NO;
-    tcpSession->sender = welcome->clientIdentity;
-  } else {
-    /* send msg to core! */
-    if (len <= sizeof(MESSAGE_HEADER)) {
-      GE_LOG(ectx,
-            GE_WARNING | GE_USER | GE_BULK,
-            _("Received malformed message from tcp-peer connection. 
Closing.\n"));
-      tcpDisconnect(tsession);
-      return SYSERR;
-    }
-    mp      = MALLOC(sizeof(P2P_PACKET));
-    mp->msg = MALLOC(len - sizeof(MESSAGE_HEADER));
-    memcpy(mp->msg,
-          &msg[1],
-          len - sizeof(MESSAGE_HEADER));
-    mp->sender   = tcpSession->sender;
-    mp->size     = len - sizeof(MESSAGE_HEADER);
-    mp->tsession = tsession;
-    coreAPI->receive(mp);
-  }
-  tcpDisconnect(tsession);
-  return OK;
-}
-
-
-/**
- * Create a new session for an inbound connection on the given
- * socket. Adds the session to the array of sessions watched
- * by the select thread.
- */
-static void * select_accept_handler(void * ah_cls,
-                                   struct SelectHandle * sh,
-                                   struct SocketHandle * sock,
-                                   const void * addr,
-                                   unsigned int addr_len) {
-  TSession * tsession;
-  TCPSession * tcpSession;
-  IPaddr ip;
-
-  if (addr_len != sizeof(IPaddr))
-    return NULL;
-  memcpy(&ip,
-        addr,
-        addr_len);
-  if (isBlacklisted(ip))
-    return NULL;
-  tcpSession = MALLOC(sizeof(TCPSession));
-  tcpSession->sock = sock;
-  /* fill in placeholder identity to mark that we
-     are waiting for the welcome message */
-  tcpSession->sender = *(coreAPI->myIdentity);
-  tcpSession->expectingWelcome = YES;
-  tcpSession->lock = MUTEX_CREATE(YES);
-  tcpSession->users = 1; /* us only, core has not seen this tsession! */
-  tsession = MALLOC(sizeof(TSession));
-  tsession->ttype = TCP_PROTOCOL_NUMBER;
-  tsession->internal = tcpSession;
-
-  return tsession;
-}                                      
-
-static void select_close_handler(void * ch_cls,
-                                struct SelectHandle * sh,
-                                struct SocketHandle * sock,
-                                void * sock_ctx) {
-  TSession * tsession = sock_ctx;
-  tcpDisconnect(tsession);
-}
-
-/**
- * Send a message to the specified remote node.
- *
- * @param tsession the handle identifying the remote node
- * @param msg the message
- * @param size the size of the message
- * @return SYSERR on error, OK on success
- */
-static int tcpSend(TSession * tsession,
-                  const void * msg,
-                  const unsigned int size,
-                  int important) {
-  TCPSession * tcpSession;
-  MESSAGE_HEADER * mp;
-  int ok;
-
-  tcpSession = tsession->internal;
-  if (size >= MAX_BUFFER_SIZE - sizeof(MESSAGE_HEADER)) {
-    GE_BREAK(ectx, 0);
-    return SYSERR; /* too big */
-  }
-  if (selector == NULL) {
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   size);
-    return SYSERR;
-  }
-  if (size == 0) {
-    GE_BREAK(ectx, 0);
-    return SYSERR;
-  }
-  if (tcpSession->sock == NULL) {
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   size);
-    return SYSERR; /* other side closed connection */
-  }
-  mp = MALLOC(sizeof(MESSAGE_HEADER) + size);
-  mp->size = htons(size + sizeof(MESSAGE_HEADER));
-  mp->type = 0;
-  memcpy(&mp[1],
-        msg,
-        size);
-  ok = select_write(selector,
-                   tcpSession->sock,
-                   mp,
-                   NO,
-                   important);
-  FREE(mp);
-  return ok;
-}
-
-/**
  * Verify that a Hello-Message is correct (a node
  * is reachable at that address). Since the reply
  * will be asynchronous, a method must be called on
@@ -400,7 +133,8 @@
        (ntohs(helo->header.size) != P2P_hello_MESSAGE_size(helo)) ||
        (ntohs(helo->header.type) != p2p_PROTO_hello) ||
        (ntohs(helo->protocol) != TCP_PROTOCOL_NUMBER) ||
-       (YES == isBlacklisted(haddr->ip)) )
+       (YES == isBlacklisted(&haddr->ip,
+                            sizeof(IPaddr))) )
     return SYSERR; /* obviously invalid */
   else
     return OK;
@@ -463,10 +197,7 @@
 static int tcpConnect(const P2P_hello_MESSAGE * helo,
                      TSession ** tsessionPtr) {
   HostAddress * haddr;
-  TCPWelcome welcome;
   int sock;
-  TSession * tsession;
-  TCPSession * tcpSession;
   struct sockaddr_in soaddr;
   struct SocketHandle * s;
   int i;
@@ -521,39 +252,10 @@
     socket_destroy(s);
     return SYSERR;
   }
-  tcpSession = MALLOC(sizeof(TCPSession));
-  tcpSession->sock = s;
-  tsession = MALLOC(sizeof(TSession));
-  tsession->internal = tcpSession;
-  tsession->ttype = tcpAPI.protocolNumber;
-  tcpSession->lock = MUTEX_CREATE(YES);
-  tcpSession->users = 2; /* caller + us */
-  tcpSession->sender = helo->senderIdentity;
-  tcpSession->expectingWelcome = NO;
-  MUTEX_LOCK(tcplock);
-  select_connect(selector,
-                tcpSession->sock,
-                tsession);
-
-  /* send our node identity to the other side to fully establish the
-     connection! */
-  welcome.header.size
-    = htons(sizeof(TCPWelcome));
-  welcome.header.type
-    = htons(0);
-  welcome.clientIdentity
-    = *(coreAPI->myIdentity);
-  if (SYSERR == tcpSend(tsession,
-                       &welcome.header,
-                       sizeof(TCPWelcome),
-                       YES)) {
-    tcpDisconnect(tsession);
-    MUTEX_UNLOCK(tcplock);
-    return SYSERR;
-  }
-  MUTEX_UNLOCK(tcplock);
-  *tsessionPtr = tsession;
-  return OK;
+  return tcpConnectHelper(helo,
+                         s,
+                         tcpAPI.protocolNumber,
+                         tsessionPtr);
 }
 
 /**
@@ -618,11 +320,11 @@
                           coreAPI->load_monitor,
                           s,
                           sizeof(IPaddr),
-                          0, /* timeout */
+                          TCP_TIMEOUT,
                           &select_message_handler,
                           NULL,
                           &select_accept_handler,
-                          NULL,
+                          &isBlacklisted,
                           &select_close_handler,
                           NULL,
                           0 /* memory quota */ );
@@ -630,18 +332,6 @@
 }
 
 /**
- * Shutdown the server process (stop receiving inbound
- * traffic). Maybe restarted later!
- */
-static int stopTransportServer() {
-  if (selector != NULL) {
-    select_destroy(selector);
-    selector = NULL;
-  }
-  return OK;
-}
-
-/**
  * Reload the configuration. Should never fail (keep old
  * configuration on error, syslog errors!)
  */

Modified: GNUnet/src/transports/tcp6.c
===================================================================
--- GNUnet/src/transports/tcp6.c        2006-07-28 22:31:20 UTC (rev 3148)
+++ GNUnet/src/transports/tcp6.c        2006-07-29 00:22:42 UTC (rev 3149)
@@ -27,6 +27,7 @@
 #include "gnunet_util.h"
 #include "gnunet_protocols.h"
 #include "gnunet_transport.h"
+#include "gnunet_stats_service.h"
 #include "platform.h"
 #include "ip6.h"
 
@@ -40,6 +41,8 @@
 
 #define TARGET_BUFFER_SIZE 4092
 
+#include "tcp_helper.c"
+
 /**
  * @brief Host-Address in a TCP6 network.
  */
@@ -61,854 +64,58 @@
 
 } Host6Address;
 
-/**
- * @brief TCP6 Message-Packet header.
- */
-typedef struct {
-  /**
-   * size of the message, in bytes, including this header;
-   * max 65536-header (network byte order)
-   */
-  unsigned short size;
-
-  /**
-   * For alignment, always 0.
-   */
-  unsigned short reserved;
-
-} TCP6P2P_PACKET;
-
-/**
- * Initial handshake message. Note that the beginning
- * must match the CS_MESSAGE_HEADER since we are using tcp6io.
- */
-typedef struct {
-  TCP6P2P_PACKET header;
-
-  /**
-   * Identity of the node connecting (TCP6 client)
-   */
-  PeerIdentity clientIdentity;
-} TCP6Welcome;
-
-/**
- * @brief TCP6 Transport Session handle.
- */
-typedef struct {
-  /**
-   * the tcp6 socket
-   */
-  int sock;
-
-  /**
-   * number of users of this session
-   */
-  int users;
-
-  /**
-   * Last time this connection was used
-   */
-  cron_t lastUse;
-
-  /**
-   * mutex for synchronized access to 'users'
-   */
-  Mutex lock;
-
-  /**
-   * To whom are we talking to (set to our identity
-   * if we are still waiting for the welcome message)
-   */
-  PeerIdentity sender;
-
-  /**
-   * Are we still expecting the welcome? (YES/NO)
-   */
-  int expectingWelcome;
-
-  /**
-   * Current read position in the buffer.
-   */
-  unsigned int pos;
-
-  /**
-   * Current size of the buffer.
-   */
-  unsigned int rsize;
-
-  /**
-   * The read buffer.
-   */
-  char * rbuff;
-
-  /**
-   * Position in the write buffer
-   */
-  unsigned int wpos;
-
-  /**
-   * The write buffer.
-   */
-  char * wbuff;
-
-  /**
-   * Size of the write buffer
-   */
-  unsigned int wsize;
-
-} TCP6Session;
-
 /* *********** globals ************* */
 
-/**
- * apis (our advertised API and the core api )
- */
-static CoreAPIForTransport * coreAPI;
 static TransportAPI tcp6API;
 
-/**
- * one thread for listening for new connections,
- * and for reading on all open sockets
- */
-static PTHREAD_T listenThread;
-
-/**
- * sock is the tcp6 socket that we listen on for new inbound
- * connections.
- */
-static int tcp6_sock;
-
-/**
- * tcp6_pipe is used to signal the thread that is
- * blocked in a select call that the set of sockets to listen
- * to has changed.
- */
-static int tcp6_pipe[2];
-
-/**
- * Array of currently active TCP6 sessions.
- */
-static TSession ** tsessions = NULL;
-static unsigned int tsessionCount;
-static unsigned int tsessionArrayLength;
-
-/* configuration */
 static struct CIDR6Network * filteredNetworks_;
 
-/**
- * Lock for access to mutable state of the module,
- * that is the configuration and the tsessions array.
- * Note that we ONLY need to synchronize access to
- * the tsessions array when adding or removing sessions,
- * since removing is done only by one thread and we just
- * need to avoid another thread adding an element at the
- * same point in time. We do not need to synchronize at
- * every access point since adding new elements does not
- * prevent the select thread from operating and removing
- * is done by the only therad that reads from the array.
- */
-static Mutex tcp6lock;
+static struct GC_Configuration * cfg;
 
-/**
- * Semaphore used by the server-thread to signal that
- * the server has been started -- and later again to
- * signal that the server has been stopped.
- */
-static Semaphore * serverSignal = NULL;
-static int tcp6_shutdown = YES;
-
 /* ******************** helper functions *********************** */
 
 /**
  * Check if we are allowed to connect to the given IP.
  */
-static int isBlacklisted(IP6addr * ip) {
+static int isBlacklisted(const void * addr,
+                        unsigned int addr_len) {
+  const IP6addr * ip = addr;
   int ret;
 
-  MUTEX_LOCK(&tcp6lock);
-  ret = checkIP6Listed(filteredNetworks_,
-                      ip);
-  MUTEX_UNLOCK(&tcp6lock);
+  if (addr_len != sizeof(IP6addr))
+    return SYSERR;
+  MUTEX_LOCK(tcplock);
+  ret = check_ipv6_listed(filteredNetworks_,
+                         *ip);
+  MUTEX_UNLOCK(tcplock);
   return ret;
 }
 
 /**
- * Write to the pipe to wake up the select thread (the set of
- * files to watch has changed).
- */
-static void signalSelect() {
-  char i = 0;
-  int ret;
-
-  ret = WRITE(tcp6_pipe[1],
-             &i,
-             sizeof(char));
-  if (ret != sizeof(char))
-    LOG_STRERROR(LOG_ERROR, "write");
-}
-
-/**
- * Disconnect from a remote node. May only be called
- * on sessions that were aquired by the caller first.
- * For the core, aquiration means to call associate or
- * connect. The number of disconnects must match the
- * number of calls to connect+associate.
- *
- * @param tsession the session that is closed
- * @return OK on success, SYSERR if the operation failed
- */
-static int tcp6Disconnect(TSession * tsession) {
-  if (tsession->internal != NULL) {
-    TCP6Session * tcp6session = tsession->internal;
-
-    MUTEX_LOCK(&tcp6session->lock);
-    tcp6session->users--;
-    if (tcp6session->users > 0) {
-      MUTEX_UNLOCK(&tcp6session->lock);
-      return OK;
-    }
-    MUTEX_UNLOCK(&tcp6session->lock);
-    MUTEX_DESTROY(&tcp6session->lock);
-    FREE(tcp6session->rbuff);
-    FREENONNULL(tcp6session->wbuff);
-    FREE(tcp6session);
-    FREE(tsession);
-  }
-  return OK;
-}
-
-/**
- * Remove a session, either the other side closed the connection
- * or we have otherwise reason to believe that it should better
- * be killed. Destroy session closes the session as far as the
- * TCP6 layer is concerned, but since the core may still have
- * references to it, tcp6Disconnect may not instantly free all
- * the associated resources. <p>
- *
- * destroySession may only be called if the tcp6lock is already
- * held.
- *
- * @param i index to the session handle
- */
-static void destroySession(int i) {
-  TCP6Session * tcp6Session;
-
-  tcp6Session = tsessions[i]->internal;
-  if (-1 != tcp6Session->sock)
-    if (0 != SHUTDOWN(tcp6Session->sock, SHUT_RDWR))
-      LOG_STRERROR(LOG_EVERYTHING, "shutdown");
-  closefile(tcp6Session->sock);
-  tcp6Session->sock = -1;
-  tcp6Disconnect(tsessions[i]);
-  tsessions[i] = tsessions[--tsessionCount];
-  tsessions[tsessionCount] = NULL;
-}
-
-/**
  * Get the GNUnet UDP port from the configuration,
  * or from /etc/services if it is not specified in
  * the config file.
  */
 static unsigned short getGNUnetTCP6Port() {
   struct servent * pse;        /* pointer to service information entry */
-  unsigned short port;
+  unsigned long long port;
 
-  port = (unsigned short) getConfigurationInt("TCP6",
-                                             "PORT");
-  if (port == 0) { /* try lookup in services */
+  if (-1 == GC_get_configuration_value_number(cfg,
+                                             "TCP6",
+                                             "PORT",
+                                             1,
+                                             65535,
+                                             2086,
+                                             &port)) {
     if ((pse = getservbyname("gnunet", "tcp6")))
       port = htons(pse->s_port);
+    else
+      port = 0;
   }
-  return port;
+  return (unsigned short) port;
 }
 
 /**
- * A (core) Session is to be associated with a transport session. The
- * transport service may want to know in order to call back on the
- * core if the connection is being closed. Associate can also be
- * called to test if it would be possible to associate the session
- * later, in this case the argument session is NULL. This can be used
- * to test if the connection must be closed by the core or if the core
- * can assume that it is going to be self-managed (if associate
- * returns OK and session was NULL, the transport layer is responsible
- * for eventually freeing resources associated with the tesession). If
- * session is not NULL, the core takes responsbility for eventually
- * calling disconnect.
- *
- * @param tsession the session handle passed along
- *   from the call to receive that was made by the transport
- *   layer
- * @return OK if the session could be associated,
- *         SYSERR if not.
- */
-static int tcp6Associate(TSession * tsession) {
-  TCP6Session * tcp6Session;
-
-  GNUNET_ASSERT(tsession != NULL);
-  tcp6Session = (TCP6Session*) tsession->internal;
-  MUTEX_LOCK(&tcp6Session->lock);
-  tcp6Session->users++;
-  MUTEX_UNLOCK(&tcp6Session->lock);
-  return OK;
-}
-
-/**
- * The socket of session i has data waiting, process!
- *
- * This function may only be called if the tcp6lock is
- * already held by the caller.
- */
-static int readAndProcess(int i) {
-  TSession * tsession;
-  TCP6Session * tcp6Session;
-  unsigned int len;
-  int ret;
-  TCP6P2P_PACKET * pack;
-  P2P_PACKET * mp;
-
-  tsession = tsessions[i];
-  if (SYSERR == tcp6Associate(tsession))
-    return SYSERR;
-  tcp6Session = tsession->internal;
-  if (tcp6Session->rsize == tcp6Session->pos) {
-    /* read buffer too small, grow */
-    GROW(tcp6Session->rbuff,
-        tcp6Session->rsize,
-        tcp6Session->rsize * 2);
-  }
-  ret = READ(tcp6Session->sock,
-            &tcp6Session->rbuff[tcp6Session->pos],
-            tcp6Session->rsize - tcp6Session->pos);
-  cronTime(&tcp6Session->lastUse);
-  if (ret == 0) {
-    tcp6Disconnect(tsession);
-#if DEBUG_TCP6
-    LOG(LOG_DEBUG,
-       "READ on socket %d returned 0 bytes, closing connection\n",
-       tcpSession->sock);
-#endif
-    return SYSERR; /* other side closed connection */
-  }
-  if (ret < 0) {
-    if ( (errno == EINTR) ||
-        (errno == EAGAIN) ) {
-#if DEBUG_TCP
-      LOG_STRERROR(LOG_DEBUG, "read");
-#endif
-      tcp6Disconnect(tsession);
-      return OK;
-    }
-#if DEBUG_TCP
-    LOG_STRERROR(LOG_INFO, "read");
-#endif
-    tcp6Disconnect(tsession);
-    return SYSERR;
-  }
-  incrementBytesReceived(ret);
-  tcp6Session->pos += ret;
-
-  while (tcp6Session->pos > 2) {
-    len = ntohs(((TCP6P2P_PACKET*)&tcp6Session->rbuff[0])->size) + 
sizeof(TCP6P2P_PACKET);
-    if (len > tcp6Session->rsize) /* if MTU larger than expected, grow! */
-      GROW(tcp6Session->rbuff,
-          tcp6Session->rsize,
-          len);
-#if DEBUG_TCP6
-    LOG(LOG_DEBUG,
-       "Read %d bytes on socket %d, expecting %d for full message\n",
-       tcp6Session->pos,
-       tcp6Session->sock,
-       len);
-#endif
-    if (tcp6Session->pos < len) {
-      tcp6Disconnect(tsession);
-      return OK;
-    }
-
-    /* complete message received, let's check what it is */
-    if (YES == tcp6Session->expectingWelcome) {
-      TCP6Welcome * welcome;
-#if DEBUG_TCP6
-      EncName hex;
-#endif
-
-      welcome = (TCP6Welcome*) &tcp6Session->rbuff[0];
-      if ( (ntohs(welcome->header.reserved) != 0) ||
-          (ntohs(welcome->header.size) != sizeof(TCP6Welcome) - 
sizeof(TCP6P2P_PACKET)) ) {
-       LOG(LOG_WARNING,
-           _("Expected welcome message on tcp connection, got garbage. 
Closing.\n"));
-       tcp6Disconnect(tsession);
-       return SYSERR;
-      }
-      tcp6Session->expectingWelcome = NO;
-      tcp6Session->sender = welcome->clientIdentity;
-#if DEBUG_TCP6
-      IFLOG(LOG_DEBUG,
-           hash2enc(&tcp6Session->sender.hashPubKey,
-                    &enc));
-      LOG(LOG_DEBUG,
-         "tcp6 welcome message from %s received\n",
-         &enc);
-#endif
-      memmove(&tcp6Session->rbuff[0],
-             &tcp6Session->rbuff[sizeof(TCP6Welcome)],
-             tcp6Session->pos - sizeof(TCP6Welcome));
-      tcp6Session->pos -= sizeof(TCP6Welcome);
-      len = ntohs(((TCP6P2P_PACKET*)&tcp6Session->rbuff[0])->size) + 
sizeof(TCP6P2P_PACKET);
-    }
-    if ( (tcp6Session->pos < 2) ||
-        (tcp6Session->pos < len) ) {
-      tcp6Disconnect(tsession);
-      return OK;
-    }
-
-    pack = (TCP6P2P_PACKET*)&tcp6Session->rbuff[0];
-    /* send msg to core! */
-    if (len <= sizeof(TCP6P2P_PACKET)) {
-      LOG(LOG_WARNING,
-         _("Received malformed message from tcp6-peer connection. Closing 
connection.\n"));
-      tcp6Disconnect(tsession);
-      return SYSERR;
-    }
-    mp      = MALLOC(sizeof(P2P_PACKET));
-    mp->msg = MALLOC(len - sizeof(TCP6P2P_PACKET));
-    memcpy(mp->msg,
-          &pack[1],
-          len - sizeof(TCP6P2P_PACKET));
-    mp->sender   = tcp6Session->sender;
-    mp->size     = len - sizeof(TCP6P2P_PACKET);
-    mp->tsession = tsession;
-#if DEBUG_TCP6
-    LOG(LOG_DEBUG,
-       "tcp6 transport received %d bytes, forwarding to core\n",
-       mp->size);
-#endif
-    coreAPI->receive(mp);
-
-    if (tcp6Session->pos < len) {
-      BREAK();
-      tcp6Disconnect(tsession);
-      return SYSERR;
-    }
-    /* finally, shrink buffer adequately */
-    memmove(&tcp6Session->rbuff[0],
-           &tcp6Session->rbuff[len],
-           tcp6Session->pos - len);
-    tcp6Session->pos -= len;   
-    if ( (tcp6Session->pos + 1024 < tcp6Session->rsize) &&
-        (tcp6Session->rsize > 4 * 1024) ) {
-      /* read buffer far too large, shrink! */
-      GROW(tcp6Session->rbuff,
-          tcp6Session->rsize,
-          tcp6Session->pos + 1024);
-    }
-  }
-  tcp6Disconnect(tsession);
-  return OK;
-}
-
-/**
- * Add a new session to the array watched by the select thread.  Grows
- * the array if needed.  If the caller wants to do anything useful
- * with the return value, it must have the lock on tcp6lock before
- * calling.  It is ok to call this function without holding tcp6lock if
- * the return value is ignored.
- */
-static unsigned int addTSession(TSession * tsession) {
-  unsigned int i;
-
-  MUTEX_LOCK(&tcp6lock);
-  if (tsessionCount == tsessionArrayLength)
-    GROW(tsessions,
-        tsessionArrayLength,
-        tsessionArrayLength * 2);
-  i = tsessionCount;
-  tsessions[tsessionCount++] = tsession;
-  MUTEX_UNLOCK(&tcp6lock);
-  return i;
-}
-
-/**
- * Create a new session for an inbound connection on the given
- * socket. Adds the session to the array of sessions watched
- * by the select thread.
- */
-static void createNewSession(int sock) {
-  TSession * tsession;
-  TCP6Session * tcp6Session;
-
-  tcp6Session = MALLOC(sizeof(TCP6Session));
-  tcp6Session->pos = 0;
-  tcp6Session->rsize = 2 * 1024 + sizeof(TCP6P2P_PACKET);
-  tcp6Session->rbuff = MALLOC(tcp6Session->rsize);
-  tcp6Session->wpos = 0;
-  tcp6Session->wbuff = NULL;
-  tcp6Session->sock = sock;
-  /* fill in placeholder identity to mark that we
-     are waiting for the welcome message */
-  tcp6Session->sender = *(coreAPI->myIdentity);
-  tcp6Session->expectingWelcome = YES;
-  MUTEX_CREATE_RECURSIVE(&tcp6Session->lock);
-  tcp6Session->users = 1; /* us only, core has not seen this tsession! */
-  cronTime(&tcp6Session->lastUse);
-  tsession = MALLOC(sizeof(TSession));
-  tsession->ttype = TCP6_PROTOCOL_NUMBER;
-  tsession->internal = tcp6Session;
-  addTSession(tsession);
-}                                      
-
-/**
- * Main method for the thread listening on the tcp6 socket and all tcp6
- * connections. Whenever a message is received, it is forwarded to the
- * core. This thread waits for activity on any of the TCP6 connections
- * and processes deferred (async) writes and buffers reads until an
- * entire message has been received.
- */
-static void * tcp6ListenMain() {
-  struct sockaddr_in6 clientAddr;
-  fd_set readSet;
-  fd_set errorSet;
-  fd_set writeSet;
-  struct stat buf;
-  socklen_t lenOfIncomingAddr;
-  int i;
-  int max;
-  int ret;
-
-  if (tcp6_sock != -1)
-    if (0 != LISTEN(tcp6_sock, 5))
-      LOG_STRERROR(LOG_ERROR, "listen");
-  SEMAPHORE_UP(serverSignal); /* we are there! */
-  MUTEX_LOCK(&tcp6lock);
-  while (tcp6_shutdown == NO) {
-    FD_ZERO(&readSet);
-    FD_ZERO(&errorSet);
-    FD_ZERO(&writeSet);
-    if (tcp6_sock != -1) {
-      if (isSocketValid(tcp6_sock)) {
-       FD_SET(tcp6_sock, &readSet);
-      } else {
-       LOG_STRERROR(LOG_ERROR, "isSocketValid");
-       tcp6_sock = -1; /* prevent us from error'ing all the time */
-      }
-    } else
-      LOG(LOG_DEBUG,
-         "TCP6 server socket not open!\n");
-    if (tcp6_pipe[0] != -1) {
-      if (-1 != FSTAT(tcp6_pipe[0], &buf)) {
-       FD_SET(tcp6_pipe[0], &readSet);
-      } else {
-       LOG_STRERROR(LOG_ERROR, "fstat");
-       tcp6_pipe[0] = -1; /* prevent us from error'ing all the time */ 
-      }
-    }
-    max = tcp6_pipe[0];
-    if (tcp6_sock > tcp6_pipe[0])
-      max = tcp6_sock;
-    for (i=0;i<tsessionCount;i++) {
-      TCP6Session * tcp6Session = tsessions[i]->internal;
-      int sock = tcp6Session->sock;
-      if (sock != -1) {
-       if (isSocketValid(sock)) {
-         FD_SET(sock, &readSet);
-         FD_SET(sock, &errorSet);
-         if (tcp6Session->wpos > 0)
-           FD_SET(sock, &writeSet); /* do we have a pending write request? */
-       } else {
-         LOG_STRERROR(LOG_ERROR, "isSocketValid");
-         destroySession(i);
-       }
-      } else {
-       BREAK();
-       destroySession(i);
-      }
-      if (sock > max)
-       max = sock;
-    }
-    MUTEX_UNLOCK(&tcp6lock);
-    ret = SELECT(max+1, &readSet, &writeSet, &errorSet, NULL);
-    MUTEX_LOCK(&tcp6lock);
-    if ( (ret == -1) &&
-        ( (errno == EAGAIN) || (errno == EINTR) ) )
-      continue;
-    if (ret == -1) {
-      if (errno == EBADF) {
-       LOG_STRERROR(LOG_ERROR, "select");
-      } else {
-       DIE_STRERROR("select");
-      }
-    }
-    if (tcp6_sock != -1) {
-      if (FD_ISSET(tcp6_sock, &readSet)) {
-       int sock;
-       
-       lenOfIncomingAddr = sizeof(clientAddr);
-       sock = ACCEPT(tcp6_sock,
-                     (struct sockaddr *)&clientAddr,
-                     &lenOfIncomingAddr);
-       if (sock != -1) {       
-         /* verify clientAddr for eligibility here (ipcheck-style,
-            user should be able to specify who is allowed to connect,
-            otherwise we just close and reject the communication! */   
-         GNUNET_ASSERT(sizeof(struct in6_addr) == sizeof(IP6addr));
-         if (YES == isBlacklisted((IP6addr*)&clientAddr.sin6_addr)) {
-           char inet6[INET6_ADDRSTRLEN];
-           LOG(LOG_INFO,
-               _("%s: Rejected connection from blacklisted address %s.\n"),
-               "TCP6",
-               inet_ntop(AF_INET6,
-                         &clientAddr,
-                         inet6,
-                         INET6_ADDRSTRLEN));
-           SHUTDOWN(sock, 2);
-           closefile(sock);
-         } else
-           createNewSession(sock);
-       } else {
-         LOG_STRERROR(LOG_INFO, "accept");
-       }
-      }
-    }
-    if (FD_ISSET(tcp6_pipe[0], &readSet)) {
-      /* allow reading multiple signals in one go in case we get many
-        in one shot... */
-
-#define MAXSIG_BUF 128
-      char buf[MAXSIG_BUF];
-      /* just a signal to refresh sets, eat and continue */
-      if (0 >= READ(tcp6_pipe[0],
-                   &buf[0],
-                   MAXSIG_BUF)) {
-       LOG_STRERROR(LOG_WARNING, "read");
-      }
-    }
-    for (i=0;i<tsessionCount;i++) {
-      TCP6Session * tcp6Session = tsessions[i]->internal;
-      int sock = tcp6Session->sock;
-      if (FD_ISSET(sock, &readSet)) {
-       if (SYSERR == readAndProcess(i)) {
-         destroySession(i);
-         i--;
-         continue;
-       }
-      }
-      if (FD_ISSET(sock, &writeSet)) {
-       size_t ret;
-       int success;
-
-try_again_1:
-       success = SEND_NONBLOCKING(sock,
-                                  tcp6Session->wbuff,
-                                  tcp6Session->wpos,
-                                  &ret);
-       if (success == SYSERR) {
-         LOG_STRERROR(LOG_WARNING, "send");
-         destroySession(i);
-         i--;
-         continue;
-        } else if (success == NO) {
-         /* this should only happen under Win9x because
-            of a bug in the socket implementation (KB177346).
-            Let's sleep and try again. */
-         gnunet_util_sleep(20);
-         goto try_again_1;
-        }
-        if (ret == 0) {
-          /* send only returns 0 on error (other side closed connection),
-          * so close the session */
-         destroySession(i);
-         i--;
-         continue;
-       }
-       if ((unsigned int)ret == tcp6Session->wpos) {
-         FREENONNULL(tcp6Session->wbuff);
-         tcp6Session->wbuff = NULL;
-         tcp6Session->wpos  = 0;
-         tcp6Session->wsize = 0;
-       } else {
-         memmove(tcp6Session->wbuff,
-                 &tcp6Session->wbuff[ret],
-                 tcp6Session->wpos - ret);
-         tcp6Session->wpos -= ret;
-       }
-      }
-      if (FD_ISSET(sock, &errorSet)) {
-       destroySession(i);
-       i--;
-       continue;
-      }
-      if ( ( tcp6Session->users == 1) &&
-          (cronTime(NULL) > tcp6Session->lastUse + TCP6_TIMEOUT) ) {
-       destroySession(i);
-       i--;
-       continue;
-      }
-    }
-  }
-  /* shutdown... */
-  if (tcp6_sock != -1) {
-    closefile(tcp6_sock);
-    tcp6_sock = -1;
-  }
-  /* close all sessions */
-  while (tsessionCount > 0)
-    destroySession(0);
-  MUTEX_UNLOCK(&tcp6lock);
-  SEMAPHORE_UP(serverSignal); /* we are there! */
-  return NULL;
-} /* end of tcp6 listen main */
-
-/**
- * Send a message (already encapsulated if needed) via the
- * tcp6 socket (or enqueue if sending now would block).
- *
- * @param tcp6Session the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcp6DirectSend(TCP6Session * tcp6Session,
-                         void * mp,
-                         unsigned int ssize) {
-  size_t ret;
-  int success;
-
-  if (tcp6_shutdown == YES)
-    return SYSERR;
-  if (tcp6Session->sock == -1) {
-#if DEBUG_TCP6
-    LOG(LOG_INFO,
-       "tcp6DirectSend called, but socket is closed\n");
-#endif
-    return SYSERR;
-  }
-  if (ssize == 0) {
-    BREAK();
-    return SYSERR;
-  }
-  MUTEX_LOCK(&tcp6lock);
-  if (tcp6Session->wpos > 0) {
-    MUTEX_UNLOCK(&tcp6lock);
-    return NO;
-  }
-  success = SEND_NONBLOCKING(tcp6Session->sock,
-                            mp,
-                            ssize,
-                            &ret);
-  if (success == SYSERR) {
-    LOG_STRERROR(LOG_INFO, "send");
-    MUTEX_UNLOCK(&tcp6lock);
-    return SYSERR;
-  }
-  if (success == NO)
-    ret = 0;
-
-  if (ret < ssize) { /* partial send */
-    if (tcp6Session->wsize < ssize - ret) {
-      GROW(tcp6Session->wbuff,
-          tcp6Session->wsize,
-          ssize - ret);
-    }
-    memcpy(tcp6Session->wbuff,
-          mp + ret,
-          ssize - ret);
-    tcp6Session->wpos = ssize - ret;
-    signalSelect(); /* select set changed! */
-  }
-  MUTEX_UNLOCK(&tcp6lock);
-  cronTime(&tcp6Session->lastUse);
-  incrementBytesSent(ssize);
-  return OK;
-}
-
-
-/**
- * Send a message (already encapsulated if needed) via the
- * tcp6 socket.  Block if required.
- *
- * @param tcp6Session the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcp6DirectSendReliable(TCP6Session * tcp6Session,
-                                 void * mp,
-                                 unsigned int ssize) {
-  int ok;
-
-  if (tcp6Session->sock == -1) {
-#if DEBUG_TCP6
-    LOG(LOG_INFO,
-       "tcp6DirectSendReliable called, but socket is closed\n");
-#endif
-    return SYSERR;
-  }
-  if (ssize == 0) {
-    BREAK();
-    return SYSERR;
-  }
-  MUTEX_LOCK(&tcp6lock);
-  if (tcp6Session->wpos > 0) {
-    unsigned int old = tcp6Session->wpos;
-    /* reliable: grow send-buffer above limit! */
-    GROW(tcp6Session->wbuff,
-        tcp6Session->wsize,
-        tcp6Session->wpos + ssize);
-    tcp6Session->wpos += ssize;
-    memcpy(&tcp6Session->wbuff[old],
-          mp,
-          ssize);
-    ok = OK;
-  } else {
-    ok = tcp6DirectSend(tcp6Session,
-                       mp,
-                       ssize);
-  }
-  MUTEX_UNLOCK(&tcp6lock);
-  return ok;
-}
-
-/**
- * Send a message to the specified remote node.
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
- * @param msg the message
- * @param size the size of the message
- * @return SYSERR on error, OK on success, NO if queue is full
- */
-static int tcp6SendReliable(TSession * tsession,
-                          const void * msg,
-                          const unsigned int size) {
-  TCP6P2P_PACKET * mp;
-  int ok;
-
-  if (size >= MAX_BUFFER_SIZE)
-    return SYSERR;
-  if (tcp6_shutdown == YES)
-    return SYSERR;
-  if (size == 0) {
-    BREAK();
-    return SYSERR;
-  }
-  if (((TCP6Session*)tsession->internal)->sock == -1)
-    return SYSERR; /* other side closed connection */
-  mp = MALLOC(sizeof(TCP6P2P_PACKET) + size);
-  memcpy(&mp[1],
-        msg,
-        size);
-  mp->size = htons(size);
-  mp->reserved = 0;
-  ok = tcp6DirectSendReliable(tsession->internal,
-                             mp,
-                             size + sizeof(TCP6P2P_PACKET));
-  FREE(mp);
-  return ok;
-}
-
-
-/**
  * Verify that a hello-Message is correct (a node
  * is reachable at that address). Since the reply
  * will be asynchronous, a method must be called on
@@ -925,7 +132,8 @@
        (ntohs(helo->header.size) != P2P_hello_MESSAGE_size(helo)) ||
        (ntohs(helo->header.type) != p2p_PROTO_hello) ||
        (ntohs(helo->protocol) != TCP6_PROTOCOL_NUMBER) ||
-       (YES == isBlacklisted(&haddr->ip)) )
+       (YES == isBlacklisted(&haddr->ip,
+                            sizeof(IP6addr))) )
     return SYSERR; /* obviously invalid */
   else
     return OK;
@@ -945,17 +153,21 @@
 
   port = getGNUnetTCP6Port();
   if (0 == port) {
-    LOG(LOG_DEBUG,
-       "TCP6 port is 0, will only send using TCP6\n");
+    GE_LOG(ectx,
+          GE_DEBUG,
+          "TCP6 port is 0, will only send using TCP6\n");
     return NULL; /* TCP6 transport is configured SEND-only! */
   }
   msg = (P2P_hello_MESSAGE *) MALLOC(sizeof(P2P_hello_MESSAGE) + 
sizeof(Host6Address));
   haddr = (Host6Address*) &msg[1];
 
-  if (SYSERR == getPublicIP6Address(&haddr->ip)) {
+  if (SYSERR == getPublicIP6Address(cfg,
+                                   ectx,
+                                   &haddr->ip)) {
     FREE(msg);
-    LOG(LOG_WARNING,
-       _("Could not determine my public IPv6 address.\n"));
+    GE_LOG(ectx,
+          GE_WARNING | GE_USER | GE_BULK,
+          _("Could not determine my public IPv6 address.\n"));
     return NULL;
   }
   haddr->port = htons(port);
@@ -977,18 +189,15 @@
                       TSession ** tsessionPtr) {
   int i;
   Host6Address * haddr;
-  TCP6Welcome welcome;
   int sock;
-  TSession * tsession;
-  TCP6Session * tcp6Session;
   char hostname[INET6_ADDRSTRLEN];
   struct addrinfo hints, *res, *res0;
   int rtn;
+  struct SocketHandle * s;
 
-  if (tcp6_shutdown == YES)
+  if (selector == NULL)
     return SYSERR;
   haddr = (Host6Address*) &helo[1];
-
   memset(&hints, 0, sizeof(hints));
   hints.ai_family = PF_INET6;
   hints.ai_socktype = SOCK_STREAM;
@@ -998,21 +207,23 @@
            INET6_ADDRSTRLEN);
   rtn = getaddrinfo(hostname, NULL, &hints, &res0);
   if (rtn != 0) {
-    LOG(LOG_WARNING,   
-       _("`%s': unknown service: %s\n"),
-       __FUNCTION__,
-       gai_strerror(rtn));
+    GE_LOG(ectx,
+          GE_WARNING | GE_ADMIN | GE_BULK,     
+          _("`%s': unknown service: %s\n"),
+          __FUNCTION__,
+          gai_strerror(rtn));
     return SYSERR;
   }
 
 #if DEBUG_TCP6
-  LOG(LOG_DEBUG,
-      "Creating TCP6 connection to %s:%d\n",
-      inet_ntop(AF_INET6,
-               haddr,
-               &hostname,
-               INET6_ADDRSTRLEN),
-      ntohs(haddr->port));
+  GE_LOG(ectx,
+        GE_DEBUG,
+        "Creating TCP6 connection to %s:%d\n",
+        inet_ntop(AF_INET6,
+                  haddr,
+                  &hostname,
+                  INET6_ADDRSTRLEN),
+        ntohs(haddr->port));
 #endif
 
   sock = -1;
@@ -1022,122 +233,46 @@
     sock = SOCKET(res->ai_family,
                  res->ai_socktype,
                  res->ai_protocol);
-    if (sock < 0)
+    if (sock < 0) {
+      GE_LOG_STRERROR(ectx,
+                     GE_WARNING | GE_ADMIN | GE_BULK,
+                     "socket");
       continue;
-    if (0 != setBlocking(sock, NO)) {
-      closefile(sock);
-      LOG_STRERROR(LOG_FAILURE, "setBlocking");
+    }
+    s = socket_create(ectx,
+                     coreAPI->load_monitor,
+                     sock);
+    if (-1 == socket_set_blocking(s, NO)) {
+      socket_destroy(s);
+      freeaddrinfo(res0);
       return SYSERR;
     }
-    ((struct sockaddr_in6*)(res->ai_addr))->sin6_port
-      = haddr->port;
+    ((struct sockaddr_in6*)(res->ai_addr))->sin6_port = haddr->port;
     i = CONNECT(sock,
                res->ai_addr,
                res->ai_addrlen);
     if ( (i < 0) &&
         (errno != EINPROGRESS) ) {
-      LOG_STRERROR(LOG_WARNING, "connect");
-      closefile(sock);
+      GE_LOG_STRERROR(ectx,
+                     GE_WARNING | GE_ADMIN | GE_BULK,
+                     "connect");
+      CLOSE(sock);
       sock = -1;
       continue;
     }
     break;
   }
   freeaddrinfo(res0);
-  if (sock == -1) {
-    LOG_STRERROR(LOG_FAILURE, "socket");
+  if (sock == -1) 
     return SYSERR;
-  }
-  if (0 != setBlocking(sock, NO)) {
-    LOG_STRERROR(LOG_FAILURE, "setBlocking");
-    closefile(sock);
-    return SYSERR;
-  }
-  tcp6Session = MALLOC(sizeof(TCP6Session));
-  tcp6Session->sock = sock;
-  tcp6Session->wpos = 0;
-  tcp6Session->wbuff = NULL;
-  tcp6Session->rsize = 2 * 1024 + sizeof(TCP6P2P_PACKET);
-  tcp6Session->rbuff = MALLOC(tcp6Session->rsize);
-  tsession = MALLOC(sizeof(TSession));
-  tsession->internal = tcp6Session;
-  tsession->ttype = tcp6API.protocolNumber;
-  MUTEX_CREATE_RECURSIVE(&tcp6Session->lock);
-  tcp6Session->users = 2; /* caller + us */
-  tcp6Session->pos = 0;
-  cronTime(&tcp6Session->lastUse);
-  memcpy(&tcp6Session->sender,
-        &helo->senderIdentity,
-        sizeof(PeerIdentity));
-  tcp6Session->expectingWelcome = NO;
-  MUTEX_LOCK(&tcp6lock);
-  i = addTSession(tsession);
-
-  /* send our node identity to the other side to fully establish the
-     connection! */
-  welcome.header.size = htons(sizeof(TCP6Welcome) - sizeof(TCP6P2P_PACKET));
-  welcome.header.reserved = htons(0);
-  memcpy(&welcome.clientIdentity,
-        coreAPI->myIdentity,
-        sizeof(PeerIdentity));
-  if (SYSERR == tcp6DirectSend(tcp6Session,
-                              &welcome,
-                              sizeof(TCP6Welcome))) {
-    destroySession(i);
-    tcp6Disconnect(tsession);
-    MUTEX_UNLOCK(&tcp6lock);
-    return SYSERR;
-  }
-  MUTEX_UNLOCK(&tcp6lock);
-  signalSelect();
-
-  *tsessionPtr = tsession;
-  return OK;
+  
+  return tcpConnectHelper(helo,
+                         s,
+                         tcp6API.protocolNumber,
+                         tsessionPtr);
 }
 
 /**
- * Send a message to the specified remote node.
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
- * @param msg the message
- * @param size the size of the message
- * @return SYSERR on error, OK on success
- */
-static int tcp6Send(TSession * tsession,
-                   const void * msg,
-                   const unsigned int size) {
-  TCP6P2P_PACKET * mp;
-  int ok;
-
-  if (size >= MAX_BUFFER_SIZE)
-    return SYSERR;
-  if (tcp6_shutdown == YES)
-    return SYSERR;
-  if (size == 0) {
-    BREAK();
-    return SYSERR;
-  }
-  if (((TCP6Session*)tsession->internal)->sock == -1)
-    return SYSERR; /* other side closed connection */
-  mp = MALLOC(sizeof(TCP6P2P_PACKET) + size);
-  memcpy(&mp[1],
-        msg,
-        size);
-  mp->size = htons(size);
-  mp->reserved = 0;
-  if (((TCP6Session*)tsession->internal)->wpos + size < TARGET_BUFFER_SIZE)
-    ok = tcp6DirectSendReliable(tsession->internal,
-                               mp,
-                               size + sizeof(TCP6P2P_PACKET));
-  else
-    ok = tcp6DirectSend(tsession->internal,
-                       mp,
-                       size + sizeof(TCP6P2P_PACKET));
-  FREE(mp);
-  return ok;
-}
-
-/**
  * Start the server process to receive inbound traffic.
  * @return OK on success, SYSERR if the operation failed
  */
@@ -1145,132 +280,99 @@
   struct sockaddr_in6 serverAddr;
   const int on = 1;
   unsigned short port;
+  int s;
 
-  if (serverSignal != NULL) {
-    BREAK();
+  if (selector != NULL) {
+    GE_BREAK(ectx, 0);
     return SYSERR;
   }
-  serverSignal = SEMAPHORE_NEW(0);
-  tcp6_shutdown = NO;
-
-  if (0 != PIPE(tcp6_pipe)) {
-    LOG_STRERROR(LOG_ERROR, "pipe");
+  port = getGNUnetTCP6Port();
+  if (port == 0)
+    return OK; /* read-only TCP6 */
+  s = SOCKET(PF_INET6,
+            SOCK_STREAM,
+            0);
+  if (s < 0) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_ADMIN | GE_BULK,
+                   "socket");
     return SYSERR;
   }
-  setBlocking(tcp6_pipe[1], NO);
-
-  port = getGNUnetTCP6Port();
-  if (port != 0) { /* if port == 0, this is a read-only
-                     business! */
-    tcp6_sock = SOCKET(PF_INET6,
-                      SOCK_STREAM,
-                      0);
-    if (tcp6_sock < 0) {
-      LOG_STRERROR(LOG_FAILURE, "socket");
-      closefile(tcp6_pipe[0]);
-      closefile(tcp6_pipe[1]);
-      SEMAPHORE_FREE(serverSignal);
-      serverSignal = NULL;
-      tcp6_shutdown = YES;
-      return SYSERR;
-    }
-    if (SETSOCKOPT(tcp6_sock,
-                  SOL_SOCKET,
-                  SO_REUSEADDR,
-                  &on,
-                  sizeof(on)) < 0 )
-      DIE_STRERROR("setsockopt");
-    memset((char *) &serverAddr,
-          0,
-          sizeof(serverAddr));
-    serverAddr.sin6_family   = AF_INET6;
-    serverAddr.sin6_flowinfo = 0;
-    serverAddr.sin6_addr     = in6addr_any;
-    serverAddr.sin6_port     = htons(getGNUnetTCP6Port());
-#if DEBUG_TCP6
-    LOG(LOG_INFO,
-       "starting tcp6 peer server on port %d\n",
-       ntohs(serverAddr.sin6_port));
-#endif
-    if (BIND(tcp6_sock,
-            (struct sockaddr *) &serverAddr,
-            sizeof(serverAddr)) < 0) {
-      LOG_STRERROR(LOG_ERROR, "bind");
-      LOG(LOG_ERROR,
-         _("Failed to start transport service on port %d.\n"),
-         getGNUnetTCP6Port());
-      closefile(tcp6_sock);
-      tcp6_sock = -1;
-      SEMAPHORE_FREE(serverSignal);
-      serverSignal = NULL;
-      return SYSERR;
-    }
-  } else
-    tcp6_sock = -1;
-  if (0 == PTHREAD_CREATE(&listenThread,
-                         (PThreadMain) &tcp6ListenMain,
-                         NULL,
-                         4092)) {
-    SEMAPHORE_DOWN(serverSignal); /* wait for server to be up */
-  } else {
-    LOG_STRERROR(LOG_ERROR,
-                "pthread_create");
-    closefile(tcp6_sock);
-    SEMAPHORE_FREE(serverSignal);
-    serverSignal = NULL;
+  if (SETSOCKOPT(s,
+                SOL_SOCKET,
+                SO_REUSEADDR,
+                &on,
+                sizeof(on)) < 0 )
+    GE_DIE_STRERROR(ectx,
+                   GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+                   "setsockopt");
+  memset((char *) &serverAddr,
+        0,
+        sizeof(serverAddr));
+  serverAddr.sin6_family   = AF_INET6;
+  serverAddr.sin6_flowinfo = 0;
+  serverAddr.sin6_addr     = in6addr_any;
+  serverAddr.sin6_port     = htons(getGNUnetTCP6Port());
+  if (BIND(s,
+          (struct sockaddr *) &serverAddr,
+          sizeof(serverAddr)) < 0) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+                   "bind");
+    GE_LOG(ectx,
+          GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+          _("Failed to start transport service on port %d.\n"),
+          getGNUnetTCP6Port());
+    if (0 != CLOSE(s))
+      GE_LOG_STRERROR(ectx,
+                     GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+                     "close");
     return SYSERR;
   }
+  selector = select_create(ectx,
+                          coreAPI->load_monitor,
+                          s,
+                          sizeof(IPaddr),
+                          TCP6_TIMEOUT,
+                          &select_message_handler,
+                          NULL,
+                          &select_accept_handler,
+                          &isBlacklisted,
+                          &select_close_handler,
+                          NULL,
+                          0 /* memory quota */ );
   return OK;
 }
 
 /**
- * Shutdown the server process (stop receiving inbound
- * traffic). Maybe restarted later!
- */
-static int stopTransportServer() {
-  void * unused;
-  int haveThread;
-
-  if (tcp6_shutdown == YES)
-    return OK;
-  tcp6_shutdown = YES;
-  signalSelect();
-  if (serverSignal != NULL) {
-    haveThread = YES;
-    SEMAPHORE_DOWN(serverSignal);
-    SEMAPHORE_FREE(serverSignal);
-  } else
-    haveThread = NO;
-  serverSignal = NULL;
-  closefile(tcp6_pipe[1]);
-  closefile(tcp6_pipe[0]);
-  if (tcp6_sock != -1) {
-    closefile(tcp6_sock);
-    tcp6_sock = -1;
-  }
-  if (haveThread == YES)
-    PTHREAD_JOIN(&listenThread, &unused);
-  return OK;
-}
-
-/**
  * Reload the configuration. Should never fail (keep old
  * configuration on error, syslog errors!)
  */
-static void reloadConfiguration(void) {
+static int reloadConfiguration(void * ctx,
+                              struct GC_Configuration * cfg, 
+                              struct GE_Context * ectx,
+                              const char * section,
+                              const char * option) {
   char * ch;
 
-  MUTEX_LOCK(&tcp6lock);
+  if (0 != strcmp(section, "TCP6"))
+    return OK; /* fast path */
+  MUTEX_LOCK(tcplock);
   FREENONNULL(filteredNetworks_);
-  ch = getConfigurationString("TCP6",
-                             "BLACKLIST");
-  if (ch == NULL)
-    filteredNetworks_ = parseRoutes6("");
+  if (0 != GC_get_configuration_value_string(cfg,
+                                            "TCP",
+                                            "BLACKLIST",
+                                            NULL,
+                                            &ch)) 
+    filteredNetworks_ = parse_ipv6_network_specification(ectx,
+                                                        "");
   else {
-    filteredNetworks_ = parseRoutes6(ch);
+    filteredNetworks_ = parse_ipv6_network_specification(ectx,
+                                                        ch);
     FREE(ch);
   }
-  MUTEX_UNLOCK(&tcp6lock);
+  MUTEX_UNLOCK(tcplock);
+  return OK;
 }
 
 /**
@@ -1302,44 +404,50 @@
  * via a global and returns the udp transport API.
  */
 TransportAPI * inittransport_tcp6(CoreAPIForTransport * core) {
-  MUTEX_CREATE_RECURSIVE(&tcp6lock);
-  reloadConfiguration();
-  tsessionCount = 0;
-  tsessionArrayLength = 0;
-  GROW(tsessions,
-       tsessionArrayLength,
-       32);
+  ectx = core->ectx;
+  cfg = core->cfg;
+  tcplock = MUTEX_CREATE(YES);
+  if (0 != GC_attach_change_listener(cfg,
+                                    &reloadConfiguration,
+                                    NULL)) {
+    MUTEX_DESTROY(tcplock);
+    tcplock = NULL;
+    return NULL;
+  }
   coreAPI = core;
-  tcp6API.protocolNumber       = TCP6_PROTOCOL_NUMBER;
+  stats = coreAPI->requestService("stats");
+  if (stats != NULL) {
+    stat_bytesReceived
+      = stats->create(gettext_noop("# bytes received via TCP6"));
+    stat_bytesSent
+      = stats->create(gettext_noop("# bytes sent via TCP6"));
+    stat_bytesDropped
+      = stats->create(gettext_noop("# bytes dropped by TCP6 (outgoing)"));
+  }
+ tcp6API.protocolNumber       = TCP6_PROTOCOL_NUMBER;
   tcp6API.mtu                  = 0;
   tcp6API.cost                 = 19950; /* about equal to udp6 */
   tcp6API.verifyHelo           = &verifyHelo;
-  tcp6API.createhello           = &createhello;
+  tcp6API.createhello          = &createhello;
   tcp6API.connect              = &tcp6Connect;
-  tcp6API.associate            = &tcp6Associate;
-  tcp6API.send                 = &tcp6Send;
-  tcp6API.sendReliable         = &tcp6SendReliable;
-  tcp6API.disconnect           = &tcp6Disconnect;
+  tcp6API.associate            = &tcpAssociate;
+  tcp6API.send                 = &tcpSend;
+  tcp6API.disconnect           = &tcpDisconnect;
   tcp6API.startTransportServer = &startTransportServer;
   tcp6API.stopTransportServer  = &stopTransportServer;
-  tcp6API.reloadConfiguration  = &reloadConfiguration;
   tcp6API.addressToString      = &addressToString;
 
   return &tcp6API;
 }
 
 void donetransport_tcp6() {
-  int i;
-
-  for (i=0;i<tsessionCount;i++)
-    LOG(LOG_DEBUG,
-       "tsessions array still contains %p\n",
-       tsessions[i]);
-  GROW(tsessions,
-       tsessionArrayLength,
-       0);
+  GC_detach_change_listener(cfg,
+                           &reloadConfiguration,
+                           NULL);
+  coreAPI->releaseService(stats);
+  stats = NULL;
   FREENONNULL(filteredNetworks_);
-  MUTEX_DESTROY(&tcp6lock);
+  MUTEX_DESTROY(tcplock);
 }
 
 /* end of tcp6.c */

Added: GNUnet/src/transports/tcp_helper.c
===================================================================
--- GNUnet/src/transports/tcp_helper.c  2006-07-28 22:31:20 UTC (rev 3148)
+++ GNUnet/src/transports/tcp_helper.c  2006-07-29 00:22:42 UTC (rev 3149)
@@ -0,0 +1,365 @@
+/*
+     This file is part of GNUnet
+     (C) 2002, 2003, 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file transports/tcp_helper.c
+ * @brief common functions for the TCP services
+ * @author Christian Grothoff
+ */
+
+typedef int (*BlacklistedTester)(const void * addr,
+                                unsigned int addr_len);
+
+/**
+ * Initial handshake message. Note that the beginning
+ * must match the CS_MESSAGE_HEADER since we are using tcpio.
+ */
+typedef struct {
+  MESSAGE_HEADER header;
+
+  /**
+   * Identity of the node connecting (TCP client)
+   */
+  PeerIdentity clientIdentity;
+} TCPWelcome;
+
+/**
+ * Transport Session handle.
+ */
+typedef struct {
+  /**
+   * the tcp socket (used to identify this connection with selector)
+   */
+  struct SocketHandle * sock;
+
+  /**
+   * number of users of this session (reference count)
+   */
+  int users;
+
+  /**
+   * mutex for synchronized access to 'users'
+   */
+  struct MUTEX * lock;
+
+  /**
+   * To whom are we talking to (set to our identity
+   * if we are still waiting for the welcome message)
+   */
+  PeerIdentity sender;
+
+  /**
+   * Are we still expecting the welcome? (YES/NO)
+   */
+  int expectingWelcome;
+
+} TCPSession;
+
+/* *********** globals ************* */
+
+/**
+ * apis (our advertised API and the core api )
+ */
+static CoreAPIForTransport * coreAPI;
+
+static Stats_ServiceAPI * stats;
+
+static int stat_bytesReceived;
+
+static int stat_bytesSent;
+
+static int stat_bytesDropped;
+
+static struct SelectHandle * selector;
+
+static struct GE_Context * ectx;
+
+static struct MUTEX * tcplock;
+
+/**
+ * Disconnect from a remote node. May only be called
+ * on sessions that were aquired by the caller first.
+ * For the core, aquiration means to call associate or
+ * connect. The number of disconnects must match the
+ * number of calls to connect+associate.
+ *
+ * @param tsession the session that is closed
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpDisconnect(TSession * tsession) {
+  TCPSession * tcpsession = tsession->internal;
+
+  GE_ASSERT(ectx, tcpsession != NULL);
+  MUTEX_LOCK(tcpsession->lock);
+  tcpsession->users--;
+  if (tcpsession->users > 0) {
+    MUTEX_UNLOCK(tcpsession->lock);
+    return OK;
+  }  
+  select_disconnect(selector,
+                   tcpsession->sock);
+  MUTEX_UNLOCK(tcpsession->lock);
+  MUTEX_DESTROY(tcpsession->lock);
+  FREE(tcpsession);  
+  FREE(tsession);
+  return OK;
+}
+
+/**
+ * A (core) Session is to be associated with a transport session. The
+ * transport service may want to know in order to call back on the
+ * core if the connection is being closed. Associate can also be
+ * called to test if it would be possible to associate the session
+ * later, in this case the argument session is NULL. This can be used
+ * to test if the connection must be closed by the core or if the core
+ * can assume that it is going to be self-managed (if associate
+ * returns OK and session was NULL, the transport layer is responsible
+ * for eventually freeing resources associated with the tesession). If
+ * session is not NULL, the core takes responsbility for eventually
+ * calling disconnect.
+ *
+ * @param tsession the session handle passed along
+ *   from the call to receive that was made by the transport
+ *   layer
+ * @return OK if the session could be associated,
+ *         SYSERR if not.
+ */
+static int tcpAssociate(TSession * tsession) {
+  TCPSession * tcpSession;
+
+  GE_ASSERT(ectx, tsession != NULL);
+  tcpSession = tsession->internal;
+  MUTEX_LOCK(tcpSession->lock);
+  tcpSession->users++;
+  MUTEX_UNLOCK(tcpSession->lock);
+  return OK;
+}
+
+/**
+ * The socket of session has data waiting, process!
+ *
+ * This function may only be called if the tcplock is
+ * already held by the caller.
+ */
+static int select_message_handler(void * mh_cls,
+                                 struct SelectHandle * sh,
+                                 struct SocketHandle * sock,
+                                 void * sock_ctx,
+                                 const MESSAGE_HEADER * msg) {
+  TSession * tsession = sock_ctx;
+  TCPSession * tcpSession;
+  unsigned int len;
+  P2P_PACKET * mp;
+  const TCPWelcome * welcome;
+
+  if (SYSERR == tcpAssociate(tsession))
+    return SYSERR;
+  len = ntohs(msg->size);
+  if (stats != NULL)
+    stats->change(stat_bytesReceived,
+                 len);
+  tcpSession = tsession->internal;
+  if (YES == tcpSession->expectingWelcome) {    
+    welcome = (const TCPWelcome*) msg;
+    if ( (ntohs(welcome->header.type) != 0) ||
+        (len != sizeof(TCPWelcome)) ) {
+      tcpDisconnect(tsession);
+      return SYSERR;    
+    }
+    tcpSession->expectingWelcome = NO;
+    tcpSession->sender = welcome->clientIdentity;
+  } else {
+    /* send msg to core! */
+    if (len <= sizeof(MESSAGE_HEADER)) {
+      GE_LOG(ectx,
+            GE_WARNING | GE_USER | GE_BULK,
+            _("Received malformed message from tcp-peer connection. 
Closing.\n"));
+      tcpDisconnect(tsession);
+      return SYSERR;
+    }
+    mp      = MALLOC(sizeof(P2P_PACKET));
+    mp->msg = MALLOC(len - sizeof(MESSAGE_HEADER));
+    memcpy(mp->msg,
+          &msg[1],
+          len - sizeof(MESSAGE_HEADER));
+    mp->sender   = tcpSession->sender;
+    mp->size     = len - sizeof(MESSAGE_HEADER);
+    mp->tsession = tsession;
+    coreAPI->receive(mp);
+  }
+  tcpDisconnect(tsession);
+  return OK;
+}
+
+
+/**
+ * Create a new session for an inbound connection on the given
+ * socket. Adds the session to the array of sessions watched
+ * by the select thread.
+ */
+static void * select_accept_handler(void * ah_cls,
+                                   struct SelectHandle * sh,
+                                   struct SocketHandle * sock,
+                                   const void * addr,
+                                   unsigned int addr_len) {
+  BlacklistedTester blt = ah_cls;
+  TSession * tsession;
+  TCPSession * tcpSession;
+
+  if (NO != blt(addr, addr_len))
+    return NULL;
+  tcpSession = MALLOC(sizeof(TCPSession));
+  tcpSession->sock = sock;
+  /* fill in placeholder identity to mark that we
+     are waiting for the welcome message */
+  tcpSession->sender = *(coreAPI->myIdentity);
+  tcpSession->expectingWelcome = YES;
+  tcpSession->lock = MUTEX_CREATE(YES);
+  tcpSession->users = 1; /* us only, core has not seen this tsession! */
+  tsession = MALLOC(sizeof(TSession));
+  tsession->ttype = TCP_PROTOCOL_NUMBER;
+  tsession->internal = tcpSession;
+
+  return tsession;
+}                                      
+
+static void select_close_handler(void * ch_cls,
+                                struct SelectHandle * sh,
+                                struct SocketHandle * sock,
+                                void * sock_ctx) {
+  TSession * tsession = sock_ctx;
+  tcpDisconnect(tsession);
+}
+
+/**
+ * Send a message to the specified remote node.
+ *
+ * @param tsession the handle identifying the remote node
+ * @param msg the message
+ * @param size the size of the message
+ * @return SYSERR on error, OK on success
+ */
+static int tcpSend(TSession * tsession,
+                  const void * msg,
+                  const unsigned int size,
+                  int important) {
+  TCPSession * tcpSession;
+  MESSAGE_HEADER * mp;
+  int ok;
+
+  tcpSession = tsession->internal;
+  if (size >= MAX_BUFFER_SIZE - sizeof(MESSAGE_HEADER)) {
+    GE_BREAK(ectx, 0);
+    return SYSERR; /* too big */
+  }
+  if (selector == NULL) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
+    return SYSERR;
+  }
+  if (size == 0) {
+    GE_BREAK(ectx, 0);
+    return SYSERR;
+  }
+  if (tcpSession->sock == NULL) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
+    return SYSERR; /* other side closed connection */
+  }
+  mp = MALLOC(sizeof(MESSAGE_HEADER) + size);
+  mp->size = htons(size + sizeof(MESSAGE_HEADER));
+  mp->type = 0;
+  memcpy(&mp[1],
+        msg,
+        size);
+  ok = select_write(selector,
+                   tcpSession->sock,
+                   mp,
+                   NO,
+                   important);
+  FREE(mp);
+  return ok;
+}
+
+/**
+ * Establish a connection to a remote node.
+ *
+ * @param helo the hello-Message for the target node
+ * @param tsessionPtr the session handle that is set
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpConnectHelper(const P2P_hello_MESSAGE * helo,
+                           struct SocketHandle * s,
+                           unsigned int protocolNumber,
+                           TSession ** tsessionPtr) {
+  TCPWelcome welcome;
+  TSession * tsession;
+  TCPSession * tcpSession;
+
+  tcpSession = MALLOC(sizeof(TCPSession));
+  tcpSession->sock = s;
+  tsession = MALLOC(sizeof(TSession));
+  tsession->internal = tcpSession;
+  tsession->ttype = protocolNumber;
+  tcpSession->lock = MUTEX_CREATE(YES);
+  tcpSession->users = 2; /* caller + us */
+  tcpSession->sender = helo->senderIdentity;
+  tcpSession->expectingWelcome = NO;
+  MUTEX_LOCK(tcplock);
+  select_connect(selector,
+                tcpSession->sock,
+                tsession);
+
+  /* send our node identity to the other side to fully establish the
+     connection! */
+  welcome.header.size
+    = htons(sizeof(TCPWelcome));
+  welcome.header.type
+    = htons(0);
+  welcome.clientIdentity
+    = *(coreAPI->myIdentity);
+  if (SYSERR == tcpSend(tsession,
+                       &welcome.header,
+                       sizeof(TCPWelcome),
+                       YES)) {
+    tcpDisconnect(tsession);
+    MUTEX_UNLOCK(tcplock);
+    return SYSERR;
+  }
+  MUTEX_UNLOCK(tcplock);
+  *tsessionPtr = tsession;
+  return OK;
+}
+
+/**
+ * Shutdown the server process (stop receiving inbound
+ * traffic). Maybe restarted later!
+ */
+static int stopTransportServer() {
+  if (selector != NULL) {
+    select_destroy(selector);
+    selector = NULL;
+  }
+  return OK;
+}
+
+/* end of tcp_helper.c */


Property changes on: GNUnet/src/transports/tcp_helper.c
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-07-28 22:31:20 UTC (rev 3148)
+++ GNUnet/src/transports/udp.c 2006-07-29 00:22:42 UTC (rev 3149)
@@ -66,14 +66,9 @@
   /**
    * size of the message, in bytes, including this header.
    */
-  unsigned short size;
+  MESSAGE_HEADER header;
 
   /**
-   * Currently always 0.
-   */
-  unsigned short reserved;
-
-  /**
    * What is the identity of the sender (hash of public key)
    */
   PeerIdentity sender;
@@ -104,7 +99,7 @@
 /**
  * thread that listens for inbound messages
  */
-static struct PTHREAD * dispatchThread;
+static struct SelectHandle * selector;
 
 /**
  * the socket that we receive all data from
@@ -112,14 +107,6 @@
 static struct SocketHandle * udp_sock;
 
 /**
- * Semaphore for communication with the
- * udp server thread.
- */
-static struct SEMAPHORE * serverSignal;
-
-static int udp_shutdown = YES;
-
-/**
  * configuration
  */
 static struct CIDRNetwork * filteredNetworks_;
@@ -145,7 +132,7 @@
   unsigned long long port;
   
   if (-1 == GC_get_configuration_value_number(cfg,
-                                             "TCP",
+                                             "UDP",
                                              "PORT",
                                              1,
                                              65535,
@@ -162,7 +149,7 @@
 /**
  * Allocate and bind a server socket for the UDP transport.
  */
-static struct SocketHandle * passivesock(unsigned short port) {
+static int listensock(unsigned short port) {
   struct sockaddr_in sin;
   int sock;
   const int on = 1;
@@ -203,9 +190,7 @@
     }
   } /* do not bind if port == 0, then we use
        send-only! */
-  return socket_create(ectx, 
-                      load_monitor,
-                      sock);
+  return sock;
 }
 
 /**
@@ -222,186 +207,65 @@
 }
 
 /**
- * Listen on the given socket and distribute the packets to the UDP
- * handler.
+ * The socket of session has data waiting, process!
+ *
+ * This function may only be called if the tcplock is
+ * already held by the caller.
  */
-static void * listenAndDistribute(void * unused) {
-  struct sockaddr_in incoming;
-  socklen_t addrlen = sizeof(incoming);
-  size_t size;
+static int select_message_handler(void * mh_cls,
+                                 struct SelectHandle * sh,
+                                 struct SocketHandle * sock,
+                                 void * sock_ctx,
+                                 const MESSAGE_HEADER * msg) {
+  unsigned int len;
   P2P_PACKET * mp;
-  UDPMessage udpm;
-  IPaddr ipaddr;
-  int error;
-  int pending;
-  int ret;
-  fd_set readSet;
-  fd_set errorSet;
-  fd_set writeSet;
-  int max;
-#if DEBUG_UDP
-  EncName enc;
-#endif
+  const UDPMessage * um;
 
-  SEMAPHORE_UP(serverSignal);
-  while (udp_shutdown == NO) {
-    FD_ZERO(&readSet);
-    FD_ZERO(&writeSet);
-    FD_ZERO(&errorSet);
-    max = 0;
-    socket_add_to_select_set(udp_sock, &readSet, &max);
-    ret = SELECT(max + 1,
-                &readSet,
-                &writeSet,
-                &errorSet,
-                NULL);
-    if (ret == -1) {
-      if (udp_shutdown == YES)
-       break;
-      if (errno == EINTR)
-       continue;
-      GE_DIE_STRERROR(ectx, 
-                     GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
-                     "select");
-    }
-    if (! socket_test_select_set(udp_sock, &readSet))
-      continue;
-    pending = 0;
-    /* @todo FIXME in PlibC */
-#ifdef MINGW
-    error = ioctlsocket(socket_get_os_socket(udp_sock),
-                       FIONREAD,
-                       &pending);
-#else
-    error = ioctl(socket_get_os_socket(udp_sock),
-                 FIONREAD,
-                 &pending);
-#endif
-    if (error != 0) {
-      GE_LOG_STRERROR(ectx,
-                     GE_ERROR | GE_ADMIN | GE_BULK,
-                     "ioctl");
-      continue;
-    }
-    if (pending <= 0) {
-      GE_LOG(ectx,
-            GE_WARNING | GE_ADMIN | GE_BULK,
-            _("UDP: select returned, but ioctl reports %d bytes available!\n"),
-            pending);
-      if (pending == 0) {
-       /* maybe empty UDP packet was sent (see report on bug-gnunet,
-          5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
-       memset(&incoming,
-              0, 
-              sizeof(struct sockaddr_in));
-       socket_recv(udp_sock,
-                   NC_Nonblocking,
-                   NULL,
-                   0,
-                   &size);
-      }
-      continue;
-    }   
-    if (pending >= 65536) {
-      GE_BREAK(ectx, 0);
-      continue;
-    }   
-    mp = MALLOC(sizeof(P2P_PACKET));
-    mp->msg = MALLOC(pending);    
-    memset(&incoming,
-          0,
-          sizeof(struct sockaddr_in));
-    if (udp_shutdown == YES) {
-      FREE(mp->msg);
-      FREE(mp);
-      break;
-    }
-    if (YES != socket_recv_from(udp_sock,
-                               NC_Blocking,
-                               mp->msg,
-                               pending,
-                               &size,
-                               (struct sockaddr * )&incoming,
-                               &addrlen) ||
-       (udp_shutdown == YES) ) {
-      FREE(mp->msg);
-      FREE(mp);
-      if (udp_shutdown == NO) {
-       if ( (errno == EINTR) ||
-            (errno == EAGAIN) ||
-            (errno == ECONNREFUSED) ) 
-         continue;     
-      }
-      break; /* die/shutdown */
-    }
+  len = ntohs(msg->size);
+  if (len <= sizeof(UDPMessage)) {
+    GE_LOG(ectx,
+          GE_WARNING | GE_USER | GE_BULK,
+          _("Received malformed message from udp-peer connection. 
Closing.\n"));
+    return SYSERR;
+  }
+  um = (const UDPMessage*) msg;
+  mp      = MALLOC(sizeof(P2P_PACKET));
+  mp->msg = MALLOC(len - sizeof(UDPMessage));
+  memcpy(mp->msg,
+        &um[1],
+        len - sizeof(UDPMessage));
+  mp->sender = um->sender;
+  mp->size   = len - sizeof(UDPMessage);
+  mp->tsession = NULL;
+  coreAPI->receive(mp);
+  if (stats != NULL)
     stats->change(stat_bytesReceived,
-                 size);
+                 len);
+  return OK;
+}
 
-    if ((unsigned int)size <= sizeof(UDPMessage)) {
-      GE_LOG(ectx,
-            GE_INFO | GE_BULK | GE_USER,
-            _("Received invalid UDP message from %u.%u.%u.%u:%u, dropping.\n"),
-            PRIP(ntohl(*(int*)&incoming.sin_addr)),
-            ntohs(incoming.sin_port));
-      FREE(mp->msg);
-      FREE(mp);
-      continue;
-    }
-    memcpy(&udpm,
-          &((char*)mp->msg)[size - sizeof(UDPMessage)],
-          sizeof(UDPMessage));
 
-#if DEBUG_UDP
-    GE_IFLOG(ectx,
-            GE_DEBUG | GE_USER | GE_BULK,
-            hash2enc(&udpm.sender.hashPubKey,
-                  &enc));
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,
-          "received %d bytes via UDP from %u.%u.%u.%u:%u (%s)\n",
-          size,
-          PRIP(ntohl(*(int*)&incoming.sin_addr)),
-          ntohs(incoming.sin_port),
-          &enc);
-#endif
-    /* quick test of the packet, if failed, repeat! */
-    if (size != ntohs(udpm.size)) {
-      GE_LOG(ectx,
-            GE_WARNING | GE_USER | GE_BULK,
-            _("Packet received from %u.%u.%u.%u:%u (UDP) failed format 
check.\n"),
-            PRIP(ntohl(*(int*)&incoming.sin_addr)),
-            ntohs(incoming.sin_port));
-      FREE(mp->msg);
-      FREE(mp);
-      continue;
-    }
-    GE_ASSERT(ectx, sizeof(struct in_addr) == sizeof(IPaddr));
-    memcpy(&ipaddr,
-          &incoming.sin_addr,
-          sizeof(struct in_addr));
-    if (YES == isBlacklisted(ipaddr)) {
-      GE_LOG(ectx,
-            GE_WARNING | GE_USER | GE_BULK,
-            _("%s: Rejected connection from blacklisted "
-              "address %u.%u.%u.%u.\n"),
-            "UDP",
-            PRIP(ntohl(*(int*)&incoming.sin_addr)));
-      FREE(mp->msg);
-      FREE(mp);
-      continue;
-    }
-    /* message ok, fill in mp and pass to core */
-    mp->tsession = NULL;
-    mp->size     = ntohs(udpm.size) - sizeof(UDPMessage);
-    mp->sender   = udpm.sender;
-    coreAPI->receive(mp);
-  }
-  /* shutdown */
-  SEMAPHORE_UP(serverSignal);
-  return NULL;
+static void * select_accept_handler(void * ah_cls,
+                                   struct SelectHandle * sh,
+                                   struct SocketHandle * sock,
+                                   const void * addr,
+                                   unsigned int addr_len) {
+  static int nonnullpointer;
+  return &nonnullpointer;
 }
 
+/**
+ * Select has been forced to close a connection.
+ * Free the associated context.
+ */
+static void select_close_handler(void * ch_cls,
+                                struct SelectHandle * sh,
+                                struct SocketHandle * sock,
+                                void * sock_ctx) {
+  /* do nothing */
+}
 
+
 /* *************** API implementation *************** */
 
 /**
@@ -444,8 +308,8 @@
   P2P_hello_MESSAGE * msg;
   HostAddress * haddr;
 
-  if ( ( (udp_shutdown == YES) && (getGNUnetUDPPort() == 0) ) ||
-       ( (udp_shutdown == NO) && (port == 0) ) )
+  if ( ( (selector == NULL) && (getGNUnetUDPPort() == 0) ) ||
+       ( (selector != NULL) && (port == 0) ) )
     return NULL; /* UDP transport configured send-only */
 
   msg = MALLOC(sizeof(P2P_hello_MESSAGE) + sizeof(HostAddress));
@@ -466,7 +330,7 @@
         "UDP uses IP address %u.%u.%u.%u.\n",
         PRIP(ntohl(*(int*)&haddr->senderIP)));
 #endif
-  if (udp_shutdown == YES)
+  if (selector == NULL)
     haddr->senderPort      = htons(getGNUnetUDPPort());
   else
     haddr->senderPort      = htons(port);
@@ -541,7 +405,7 @@
   int ssize;
   size_t sent;
 
-  if (udp_shutdown == YES)
+  if (udp_sock == NULL)
     return SYSERR;
   if (size == 0) {
     GE_BREAK(ectx, 0);
@@ -558,8 +422,8 @@
   haddr = (HostAddress*) &helo[1];
   ssize = size + sizeof(UDPMessage);
   msg     = MALLOC(ssize);
-  mp.size = htons(ssize);
-  mp.reserved = 0;
+  mp.header.size = htons(ssize);
+  mp.header.type = 0;
   mp.sender = *(coreAPI->myIdentity);
   memcpy(&msg[size],
         &mp,
@@ -588,11 +452,12 @@
                            msg,
                            ssize,
                            &sent,
-                           (struct sockaddr*) &sin,
+                           (const char *) &sin,
                            sizeof(sin))) {
     ok = OK;
-    stats->change(stat_bytesSent,
-                 sent);
+    if (stats != NULL)
+      stats->change(stat_bytesSent,
+                   sent);
   } else {
     GE_LOG(ectx,
           GE_WARNING | GE_ADMIN | GE_BULK,
@@ -601,8 +466,9 @@
           PRIP(ntohl(*(int*)&sin.sin_addr)),
           ntohs(sin.sin_port),
           STRERROR(errno));
-    stats->change(stat_bytesDropped,
-                 ssize);
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   ssize);
   }
   FREE(msg);
   return ok;
@@ -629,22 +495,42 @@
  * @return OK on success, SYSERR if the operation failed
  */
 static int startTransportServer(void) {
+  int sock;
+
+  GE_ASSERT(ectx, selector == NULL);
    /* initialize UDP network */
   port = getGNUnetUDPPort();
-  udp_sock = passivesock(port);
   if (port != 0) {
-    udp_shutdown = NO;
-    serverSignal = SEMAPHORE_CREATE(0);
-    dispatchThread = PTHREAD_CREATE(&listenAndDistribute,
-                                   NULL,
-                                   5 * 1024);
-    if (dispatchThread == NULL) {
-      SEMAPHORE_DESTROY(serverSignal);
-      serverSignal = NULL;
+    sock = listensock(port);
+    if (sock == -1)
       return SYSERR;
-    }
-    SEMAPHORE_DOWN(serverSignal, YES);
+    selector = select_create(ectx,
+                            load_monitor,
+                            sock,
+                            sizeof(IPaddr),
+                            0, /* timeout */
+                            &select_message_handler,
+                            NULL,
+                            &select_accept_handler,
+                            NULL,
+                            &select_close_handler,
+                            NULL,
+                            0 /* memory quota */ );
+    if (selector == NULL)
+      return SYSERR;
   }
+  sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
+  if (sock == -1) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_ADMIN | GE_BULK,
+                   "socket");
+    select_destroy(selector);
+    selector = NULL;
+    return SYSERR;
+  }
+  udp_sock = socket_create(ectx,
+                          load_monitor,
+                          sock);
   return OK;
 }
 
@@ -654,39 +540,10 @@
  */
 static int stopTransportServer() {
   GE_ASSERT(ectx, udp_sock != NULL);
-  if (udp_shutdown == NO) {
-    /* stop the thread, first set shutdown
-       to YES, then ensure that the thread
-       actually sees the flag by sending
-       a dummy message of 1 char */
-    udp_shutdown = YES;
-    if (serverSignal != NULL) {
-      char msg = '\0';
-      struct sockaddr_in sin;
-      void * unused;
-      int mySock;
-
-      mySock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
-      if (mySock < 0)
-       GE_DIE_STRERROR(ectx,
-                       GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
-                       "socket");
-      /* send to loopback */
-      sin.sin_family = AF_INET;
-      sin.sin_port = htons(port);
-      *(int*)&sin.sin_addr = htonl(0x7F000001); /* 127.0.0.1 = localhost */
-      SENDTO(mySock,
-            &msg,
-            sizeof(msg),
-            0,
-            (struct sockaddr*) &sin,
-            sizeof(sin));
-      PTHREAD_STOP_SLEEP(dispatchThread);
-      SEMAPHORE_DOWN(serverSignal, YES);
-      SEMAPHORE_DESTROY(serverSignal);
-      PTHREAD_JOIN(dispatchThread, &unused);
-    }
-  }
+  if (selector != NULL) {
+    select_destroy(selector);
+    selector = NULL;
+  }  
   socket_destroy(udp_sock);
   udp_sock = NULL;
   return OK;





reply via email to

[Prev in Thread] Current Thread [Next in Thread]