gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3686 - in GNUnet: . src/include src/transports


From: grothoff
Subject: [GNUnet-SVN] r3686 - in GNUnet: . src/include src/transports
Date: Fri, 10 Nov 2006 18:35:06 -0800 (PST)

Author: grothoff
Date: 2006-11-10 18:34:57 -0800 (Fri, 10 Nov 2006)
New Revision: 3686

Added:
   GNUnet/src/transports/tcp_old.c
Modified:
   GNUnet/UPDATING
   GNUnet/src/include/gnunet_protocols.h
   GNUnet/src/include/gnunet_transport.h
   GNUnet/src/transports/Makefile.am
   GNUnet/src/transports/udp.c
   GNUnet/src/transports/udp6.c
Log:
adding legacy TCP support

Modified: GNUnet/UPDATING
===================================================================
--- GNUnet/UPDATING     2006-11-11 02:26:01 UTC (rev 3685)
+++ GNUnet/UPDATING     2006-11-11 02:34:57 UTC (rev 3686)
@@ -8,7 +8,27 @@
 data-preserving migration may not be possible.  For SVN versions,
 you're on your own.
 
+0.7.0e to 0.7.1pre0
+===================
 
+There are now two TCP transports, one using the 0.7.1 protocol and one
+using the 0.7.0e protocol to allow the 0.7.1 network to connect with
+the 0.7.0 network.  The old TCP transport is hardwired to use port
+2089, so you may have to open some new holes in your firewall (you can
+only change the port by editing the source).  The new TCP transport
+will run at whatever port is specified in the configuration.
+
+Run 
+
+# gnunet-update
+# gnunet-setup -d
+$ gnunet-setup 
+
+to update the configuration and run the setup tool to configure
+new options.
+
+
+
 0.7.0pre6, 0.7.0, 0.7.0{a,b,c,d} to 0.7.0e
 ==========================================
 

Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h       2006-11-11 02:26:01 UTC (rev 
3685)
+++ GNUnet/src/include/gnunet_protocols.h       2006-11-11 02:34:57 UTC (rev 
3686)
@@ -63,31 +63,41 @@
 #define NAT_PROTOCOL_NUMBER 1
 
 /**
- * protocol number of TCP. Do NEVER change, also used in other context!
+ * protocol number of TCP. 
  */
-#define TCP_PROTOCOL_NUMBER 6
+#define TCP_PROTOCOL_NUMBER 2
 
 /**
- * protocol number for HTTP (80 is too big, so 8 will have to do)
+ * protocol number of UDP. 
  */
-#define HTTP_PROTOCOL_NUMBER 8
+#define UDP_PROTOCOL_NUMBER 3
 
 /**
  * Protocol number for TCP on IPv6 (TCP+6)
  */
-#define TCP6_PROTOCOL_NUMBER 12
+#define TCP6_PROTOCOL_NUMBER 4
 
 /**
- * protocol number of UDP. Do NEVER change, also used in other context!
+ * Protocol number for UDP on IPv6 (UDP+6)
  */
-#define UDP_PROTOCOL_NUMBER 17
+#define UDP6_PROTOCOL_NUMBER 5
 
 /**
- * Protocol number for UDP on IPv6 (UDP+6)
+ * protocol number of TCP.
  */
-#define UDP6_PROTOCOL_NUMBER 23
+#define TCP_OLD_PROTOCOL_NUMBER 6
 
 /**
+ * protocol number for HTTP (80 is too big, so 8 will have to do)
+ */
+#define HTTP_PROTOCOL_NUMBER 8
+
+/**
+ * protocol number of UDP. Do NEVER change, also used in other context!
+ */
+#define UDP_OLD_PROTOCOL_NUMBER 17
+
+/**
  * protocol number for SMTP
  */
 #define SMTP_PROTOCOL_NUMBER 25

Modified: GNUnet/src/include/gnunet_transport.h
===================================================================
--- GNUnet/src/include/gnunet_transport.h       2006-11-11 02:26:01 UTC (rev 
3685)
+++ GNUnet/src/include/gnunet_transport.h       2006-11-11 02:34:57 UTC (rev 
3686)
@@ -273,7 +273,7 @@
    */
   int (*send)(TSession * tsession,
              const void * msg,
-             const unsigned int size,
+             unsigned int size,
              int important);
 
   /**

Modified: GNUnet/src/transports/Makefile.am
===================================================================
--- GNUnet/src/transports/Makefile.am   2006-11-11 02:26:01 UTC (rev 3685)
+++ GNUnet/src/transports/Makefile.am   2006-11-11 02:34:57 UTC (rev 3686)
@@ -27,6 +27,7 @@
 plugin_LTLIBRARIES = \
  libgnunettransport_tcp.la \
  libgnunettransport_udp.la \
+ libgnunettransport_tcp_old.la \
  libgnunettransport_nat.la \
  $(v6transports) # libgnunettransport_http.la 
 
@@ -77,3 +78,13 @@
 libgnunettransport_udp6_la_LDFLAGS = \
  -export-dynamic -avoid-version -module 
 
+
+
+libgnunettransport_tcp_old_la_SOURCES = tcp_old.c
+libgnunettransport_tcp_old_la_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ libip.la
+libgnunettransport_tcp_old_la_LDFLAGS = \
+ -export-dynamic -avoid-version -module 
+
+

Added: GNUnet/src/transports/tcp_old.c
===================================================================
--- GNUnet/src/transports/tcp_old.c     2006-11-11 02:26:01 UTC (rev 3685)
+++ GNUnet/src/transports/tcp_old.c     2006-11-11 02:34:57 UTC (rev 3686)
@@ -0,0 +1,1284 @@
+/*
+     This file is part of GNUnet
+     (C) 2002, 2003, 2004, 2005 Christian Grothoff (and other contributing 
authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 2, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file transports/tcp_old.c
+ * @brief Implementation of the TCP transport service (0.7.0 compatible)
+ * @author Christian Grothoff
+ */
+
+#include "gnunet_util.h"
+#include "gnunet_protocols.h"
+#include "gnunet_transport.h"
+#include "gnunet_stats_service.h"
+#include "platform.h"
+#include "ip.h"
+
+#define DEBUG_TCP NO
+
+/**
+ * after how much time of the core not being associated with a tcp
+ * connection anymore do we close it?
+ */
+#define TCP_TIMEOUT (30 * cronSECONDS)
+
+#define TARGET_BUFFER_SIZE 4092
+
+/**
+ * Host-Address in a TCP network.
+ */
+typedef struct {
+  /**
+   * claimed IP of the sender, network byte order
+   */
+  IPaddr ip;
+
+  /**
+   * claimed port of the sender, network byte order
+   */
+  unsigned short port;
+
+  /**
+   * reserved (set to 0 for signature verification)
+   */
+  unsigned short reserved;
+
+} HostAddress;
+
+/**
+ * TCP Message-Packet header.
+ */
+typedef struct {
+  /**
+   * size of the message, in bytes, excluding this header;
+   * max 65535; we do NOT want to make this field an int
+   * because then a malicious peer could cause us to allocate
+   * lots of memory -- this bounds it by 64k/peer.
+   * Field is in network byte order.
+   */
+  unsigned short size;
+
+  /**
+   * For alignment, always 0.
+   */
+  unsigned short reserved;
+
+  /**
+   * This struct is followed by MESSAGE_PARTs - until size is reached
+   * There is no "end of message".
+   */
+} TCPP2P_PACKET;
+
+/**
+ * Initial handshake message. Note that the beginning
+ * must match the CS_MESSAGE_HEADER since we are using tcpio.
+ */
+typedef struct {
+  TCPP2P_PACKET header;
+
+  /**
+   * Identity of the node connecting (TCP client)
+   */
+  PeerIdentity clientIdentity;
+} TCPWelcome;
+
+/**
+ * Transport Session handle.
+ */
+typedef struct {
+  /**
+   * the tcp socket
+   */
+  int sock;
+
+  /**
+   * number of users of this session
+   */
+  int users;
+
+  /**
+   * Last time this connection was used
+   */
+  cron_t lastUse;
+
+  /**
+   * mutex for synchronized access to 'users'
+   */
+  struct MUTEX * lock;
+
+  /**
+   * To whom are we talking to (set to our identity
+   * if we are still waiting for the welcome message)
+   */
+  PeerIdentity sender;
+
+  /**
+   * Are we still expecting the welcome? (YES/NO)
+   */
+  int expectingWelcome;
+
+  /**
+   * Current read position in the buffer.
+   */
+  unsigned int pos;
+
+  /**
+   * Current size of the read buffer.
+   */
+  unsigned int rsize;
+
+  /**
+   * The read buffer.
+   */
+  char * rbuff;
+
+  /**
+   * Position in the write buffer
+   */
+  unsigned int wpos;
+
+  /**
+   * The write buffer.
+   */
+  char * wbuff;
+
+  /**
+   * Size of the write buffer
+   */
+  unsigned int wsize;
+
+} TCPSession;
+
+/* *********** globals ************* */
+
+/**
+ * apis (our advertised API and the core api )
+ */
+static CoreAPIForTransport * coreAPI;
+static TransportAPI tcpAPI;
+
+static Stats_ServiceAPI * stats;
+
+static int stat_bytesReceived;
+
+static int stat_bytesSent;
+
+static int stat_bytesDropped;
+
+/**
+ * one thread for listening for new connections,
+ * and for reading on all open sockets
+ */
+static struct PTHREAD * listenThread;
+
+/**
+ * sock is the tcp socket that we listen on for new inbound
+ * connections.
+ */
+static int tcp_sock;
+
+/**
+ * tcp_pipe is used to signal the thread that is
+ * blocked in a select call that the set of sockets to listen
+ * to has changed.
+ */
+static int tcp_pipe[2];
+
+/**
+ * Array of currently active TCP sessions.
+ */
+static TSession ** tsessions = NULL;
+static unsigned int tsessionCount;
+static unsigned int tsessionArrayLength;
+
+/* configuration */
+static struct CIDRNetwork * filteredNetworks_;
+
+/**
+ * Lock for access to mutable state of the module,
+ * that is the configuration and the tsessions array.
+ * Note that we ONLY need to synchronize access to
+ * the tsessions array when adding or removing sessions,
+ * since removing is done only by one thread and we just
+ * need to avoid another thread adding an element at the
+ * same point in time. We do not need to synchronize at
+ * every access point since adding new elements does not
+ * prevent the select thread from operating and removing
+ * is done by the only therad that reads from the array.
+ */
+static struct MUTEX * tcplock;
+
+/**
+ * Semaphore used by the server-thread to signal that
+ * the server has been started -- and later again to
+ * signal that the server has been stopped.
+ */
+static struct SEMAPHORE * serverSignal = NULL;
+static int tcp_shutdown = YES;
+
+/* ******************** helper functions *********************** */
+
+/**
+ * Write to the pipe to wake up the select thread (the set of
+ * files to watch has changed).
+ */
+static void signalSelect() {
+  char i = 0;
+
+  WRITE(tcp_pipe[1],
+       &i,
+       sizeof(char));
+}
+
+/**
+ * Disconnect from a remote node. May only be called
+ * on sessions that were aquired by the caller first.
+ * For the core, aquiration means to call associate or
+ * connect. The number of disconnects must match the
+ * number of calls to connect+associate.
+ *
+ * @param tsession the session that is closed
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpDisconnect(TSession * tsession) {
+  if (tsession->internal != NULL) {
+    TCPSession * tcpsession = tsession->internal;
+
+    MUTEX_LOCK(tcpsession->lock);
+    tcpsession->users--;
+    if (tcpsession->users > 0) {
+      MUTEX_UNLOCK(tcpsession->lock);
+      return OK;
+    }
+    MUTEX_UNLOCK(tcpsession->lock);
+    MUTEX_DESTROY(tcpsession->lock);
+    FREE(tcpsession->rbuff);
+    FREENONNULL(tcpsession->wbuff);
+    tcpsession->wbuff = NULL;
+    FREE(tcpsession);
+  }
+  FREE(tsession);
+  return OK;
+}
+
+/**
+ * Remove a session, either the other side closed the connection
+ * or we have otherwise reason to believe that it should better
+ * be killed. Destroy session closes the session as far as the
+ * TCP layer is concerned, but since the core may still have
+ * references to it, tcpDisconnect may not instantly free all
+ * the associated resources. <p>
+ *
+ * destroySession may only be called if the tcplock is already
+ * held.
+ *
+ * @param i index to the session handle
+ */
+static void destroySession(int i) {
+  TCPSession * tcpSession;
+
+  tcpSession = tsessions[i]->internal;
+  if (tcpSession->sock != -1)
+    SHUTDOWN(tcpSession->sock, SHUT_RDWR);
+  CLOSE(tcpSession->sock);
+  tcpSession->sock = -1;
+  tcpDisconnect(tsessions[i]);
+  tsessions[i] = tsessions[--tsessionCount];
+  tsessions[tsessionCount] = NULL;
+}
+
+/**
+ * Get the GNUnet UDP port from the configuration,
+ * or from /etc/services if it is not specified in
+ * the config file.
+ */
+static unsigned short getGNUnetTCPPort() {
+  return 2089; /* so far unused port */
+}
+
+/**
+ * A (core) Session is to be associated with a transport session. The
+ * transport service may want to know in order to call back on the
+ * core if the connection is being closed. Associate can also be
+ * called to test if it would be possible to associate the session
+ * later, in this case the argument session is NULL. This can be used
+ * to test if the connection must be closed by the core or if the core
+ * can assume that it is going to be self-managed (if associate
+ * returns OK and session was NULL, the transport layer is responsible
+ * for eventually freeing resources associated with the tesession). If
+ * session is not NULL, the core takes responsbility for eventually
+ * calling disconnect.
+ *
+ * @param tsession the session handle passed along
+ *   from the call to receive that was made by the transport
+ *   layer
+ * @return OK if the session could be associated,
+ *         SYSERR if not.
+ */
+static int tcpAssociate(TSession * tsession) {
+  TCPSession * tcpSession;
+
+  if (tsession == NULL) {
+    return SYSERR;
+  }
+  tcpSession = (TCPSession*) tsession->internal;
+  MUTEX_LOCK(tcpSession->lock);
+  tcpSession->users++;
+  MUTEX_UNLOCK(tcpSession->lock);
+  return OK;
+}
+
+/**
+ * The socket of session i has data waiting, process!
+ *
+ * This function may only be called if the tcplock is
+ * already held by the caller.
+ */
+static int readAndProcess(int i) {
+  TSession * tsession;
+  TCPSession * tcpSession;
+  unsigned int len;
+  int ret;
+  TCPP2P_PACKET * pack;
+  P2P_PACKET * mp;
+
+  tsession = tsessions[i];
+  if (SYSERR == tcpAssociate(tsession))
+    return SYSERR;
+  tcpSession = tsession->internal;
+  if (tcpSession->rsize == tcpSession->pos) {
+    /* read buffer too small, grow */
+    GROW(tcpSession->rbuff,
+        tcpSession->rsize,
+        tcpSession->rsize * 2);
+  }
+  ret = READ(tcpSession->sock,
+            &tcpSession->rbuff[tcpSession->pos],
+            tcpSession->rsize - tcpSession->pos);
+  if ( (ret > 0) &&
+       (stats != NULL) )
+    stats->change(stat_bytesReceived,
+                 ret);
+  tcpSession->lastUse = get_time();
+  if (ret == 0) {
+    tcpDisconnect(tsession);
+    return SYSERR; /* other side closed connection */
+  }
+  if (ret < 0) {
+    if ( (errno == EINTR) ||
+        (errno == EAGAIN) ) {
+      tcpDisconnect(tsession);
+      return OK;
+    }
+    tcpDisconnect(tsession);
+    return SYSERR;
+  }
+  tcpSession->pos += ret;
+
+  while (tcpSession->pos > 2) {
+    len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
+      + sizeof(TCPP2P_PACKET);
+    if (len > tcpSession->rsize) /* if message larger than read buffer, grow! 
*/
+      GROW(tcpSession->rbuff,
+          tcpSession->rsize,
+          len);
+    if (tcpSession->pos < len) {
+      tcpDisconnect(tsession);
+      return OK;
+    }
+
+    /* complete message received, let's check what it is */
+    if (YES == tcpSession->expectingWelcome) {
+      TCPWelcome * welcome;
+#if DEBUG_TCP
+      EncName enc;
+#endif
+
+      welcome = (TCPWelcome*) &tcpSession->rbuff[0];
+      if ( (ntohs(welcome->header.reserved) != 0) ||
+          (ntohs(welcome->header.size)
+           != sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET)) ) {
+       tcpDisconnect(tsession);
+       return SYSERR;
+      }
+      tcpSession->expectingWelcome = NO;
+      tcpSession->sender = welcome->clientIdentity;
+      memmove(&tcpSession->rbuff[0],
+             &tcpSession->rbuff[sizeof(TCPWelcome)],
+             tcpSession->pos - sizeof(TCPWelcome));
+      tcpSession->pos -= sizeof(TCPWelcome);
+      len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
+       + sizeof(TCPP2P_PACKET);
+    }
+    if ( (tcpSession->pos < 2) ||
+        (tcpSession->pos < len) ) {
+      tcpDisconnect(tsession);
+      return OK;
+    }
+
+    pack = (TCPP2P_PACKET*)&tcpSession->rbuff[0];
+    /* send msg to core! */
+    if (len <= sizeof(TCPP2P_PACKET)) {
+      tcpDisconnect(tsession);
+      return SYSERR;
+    }
+    mp      = MALLOC(sizeof(P2P_PACKET));
+    mp->msg = MALLOC(len - sizeof(TCPP2P_PACKET));
+    memcpy(mp->msg,
+          &pack[1],
+          len - sizeof(TCPP2P_PACKET));
+    mp->sender   = tcpSession->sender;
+    mp->size     = len - sizeof(TCPP2P_PACKET);
+    mp->tsession = tsession;
+    coreAPI->receive(mp);
+    /* finally, shrink buffer adequately */
+    memmove(&tcpSession->rbuff[0],
+           &tcpSession->rbuff[len],
+           tcpSession->pos - len);
+    tcpSession->pos -= len;
+    if ( (tcpSession->pos + 1024 < tcpSession->rsize) &&
+        (tcpSession->rsize > 4 * 1024) ) {
+      /* read buffer far too large, shrink! */
+      GROW(tcpSession->rbuff,
+          tcpSession->rsize,
+          tcpSession->pos + 1024);
+    }
+  }
+  tcpDisconnect(tsession);
+  return OK;
+}
+
+/**
+ * Add a new session to the array watched by the select thread.  Grows
+ * the array if needed.  If the caller wants to do anything useful
+ * with the return value, it must have the lock on tcplock before
+ * calling.  It is ok to call this function without holding tcplock if
+ * the return value is ignored.
+ */
+static unsigned int addTSession(TSession * tsession) {
+  unsigned int i;
+
+  MUTEX_LOCK(tcplock);
+  if (tsessionCount == tsessionArrayLength)
+    GROW(tsessions,
+        tsessionArrayLength,
+        tsessionArrayLength * 2);
+  i = tsessionCount;
+  tsessions[tsessionCount++] = tsession;
+  MUTEX_UNLOCK(tcplock);
+  return i;
+}
+
+/**
+ * Create a new session for an inbound connection on the given
+ * socket. Adds the session to the array of sessions watched
+ * by the select thread.
+ */
+static void createNewSession(int sock) {
+  TSession * tsession;
+  TCPSession * tcpSession;
+
+  tcpSession = MALLOC(sizeof(TCPSession));
+  tcpSession->pos = 0;
+  tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
+  tcpSession->rbuff = MALLOC(tcpSession->rsize);
+  tcpSession->wpos = 0;
+  tcpSession->wbuff = NULL;
+  tcpSession->wsize = 0;
+  tcpSession->sock = sock;
+  /* fill in placeholder identity to mark that we
+     are waiting for the welcome message */
+  tcpSession->sender = *(coreAPI->myIdentity);
+  tcpSession->expectingWelcome = YES;
+  tcpSession->lock = MUTEX_CREATE(YES);
+  tcpSession->users = 1; /* us only, core has not seen this tsession! */
+  tcpSession->lastUse = get_time();
+  tsession = MALLOC(sizeof(TSession));
+  tsession->ttype = TCP_PROTOCOL_NUMBER;
+  tsession->internal = tcpSession;
+  addTSession(tsession);
+}                                      
+
+
+/**
+ * Depending on doBlock, enable or disable the nonblocking mode
+ * of socket s.
+ *
+ * @param doBlock use YES to change the socket to blocking, NO to non-blocking
+ * @return Upon successful completion, it returns zero, otherwise -1
+ */
+static int setBlocking(int s, int doBlock) {
+#if MINGW
+  u_long l = !doBlock;
+  if (ioctlsocket(s, FIONBIO, &l) == SOCKET_ERROR) {
+    SetErrnoFromWinsockError(WSAGetLastError());
+
+    return -1;
+  } else {
+    /* store the blocking mode */
+    __win_SetHandleBlockingMode(s, doBlock);
+    return 0;
+  }
+#else
+  int flags = fcntl(s, F_GETFL);
+  if (doBlock)
+    flags &= ~O_NONBLOCK;
+  else
+    flags |= O_NONBLOCK;
+
+  return fcntl(s,
+              F_SETFL,
+              flags);
+#endif
+}
+
+/**
+ * Do a NONBLOCKING write on the given socket.
+ * Write at most max bytes from buf.
+ * Interrupts are ignored (cause a re-try).
+ *
+ * The caller must be prepared to the fact that this function
+ * may fail with EWOULDBLOCK in any case (Win32).
+ *
+ * @param s socket
+ * @param buf buffer to send
+ * @param max maximum number of bytes to send
+ * @param sent number of bytes actually sent
+ * @return SYSERR on error, YES on success or
+ *         NO if the operation would have blocked.
+ */
+static int SEND_NONBLOCKING(int s,
+                    const void * buf,
+                    size_t max,
+                    size_t * sent) {
+  int flags;
+
+  setBlocking(s, NO);
+
+#ifdef SOMEBSD
+    flags = MSG_DONTWAIT;
+#elif SOLARIS
+    flags = MSG_DONTWAIT;
+#elif OSX
+    /* As braindead as Win32? */
+    flags = 0;
+#elif CYGWIN
+       flags = MSG_NOSIGNAL;
+#elif LINUX
+       flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+#else
+    /* pray */
+       flags = 0;
+#endif
+
+  do {
+    *sent = (size_t) SEND(s,
+                         buf,
+                         max,
+                         flags);
+
+  } while ( (*sent == -1) &&
+           (errno == EINTR) );
+
+  setBlocking(s, YES);
+
+  if (*sent == SYSERR && (errno == EWOULDBLOCK || errno == EAGAIN))
+    return NO;
+  else if ( (*sent < 0) || (*sent > max) )
+    return SYSERR;
+
+  return YES;
+}
+
+/**
+ * Check if socket is valid
+ * @return 1 if valid, 0 otherwise
+ */
+static int isSocketValid(int s)
+{
+#ifndef MINGW
+  struct stat buf;
+  return -1 != fstat(s, &buf);
+#else
+  long l;
+  return ioctlsocket(s, FIONREAD, &l) != SOCKET_ERROR;
+#endif
+}
+
+/**
+ * Main method for the thread listening on the tcp socket and all tcp
+ * connections. Whenever a message is received, it is forwarded to the
+ * core. This thread waits for activity on any of the TCP connections
+ * and processes deferred (async) writes and buffers reads until an
+ * entire message has been received.
+ */
+static void * tcpListenMain() {
+  struct sockaddr_in clientAddr;
+  fd_set readSet;
+  fd_set errorSet;
+  fd_set writeSet;
+  struct stat buf;
+  socklen_t lenOfIncomingAddr;
+  int i;
+  int max;
+  int ret;
+
+  if (tcp_sock != -1)
+    LISTEN(tcp_sock, 5);
+  SEMAPHORE_UP(serverSignal); /* we are there! */
+  MUTEX_LOCK(tcplock);
+  while (tcp_shutdown == NO) {
+    FD_ZERO(&readSet);
+    FD_ZERO(&errorSet);
+    FD_ZERO(&writeSet);
+    if (tcp_sock != -1) {
+      if (isSocketValid(tcp_sock)) {
+       FD_SET(tcp_sock, &readSet);
+      } else { 
+       tcp_sock = -1; /* prevent us from error'ing all the time */
+      }
+    }
+    if (tcp_pipe[0] != -1) {
+      if (-1 != FSTAT(tcp_pipe[0], &buf)) {
+       FD_SET(tcp_pipe[0], &readSet);
+      } else {
+       tcp_pipe[0] = -1; /* prevent us from error'ing all the time */  
+      }
+    }
+    max = tcp_pipe[0];
+    if (tcp_sock > tcp_pipe[0])
+      max = tcp_sock;
+    for (i=0;i<tsessionCount;i++) {
+      TCPSession * tcpSession = tsessions[i]->internal;
+      int sock = tcpSession->sock;
+      if (sock != -1) {
+       if (isSocketValid(sock)) {
+         FD_SET(sock, &readSet);
+         FD_SET(sock, &errorSet);
+         if (tcpSession->wpos > 0)
+           FD_SET(sock, &writeSet); /* do we have a pending write request? */
+       } else {
+         destroySession(i);
+       }
+      } else {
+       destroySession(i);
+      }
+      if (sock > max)
+       max = sock;
+    }
+    MUTEX_UNLOCK(tcplock);
+    ret = SELECT(max+1, &readSet, &writeSet, &errorSet, NULL);
+    MUTEX_LOCK(tcplock);
+    if ( (ret == -1) &&
+        ( (errno == EAGAIN) || (errno == EINTR) ) )
+      continue;
+    if (tcp_sock != -1) {
+      if (FD_ISSET(tcp_sock, &readSet)) {
+       int sock;
+       
+       lenOfIncomingAddr = sizeof(clientAddr);
+       sock = ACCEPT(tcp_sock,
+                     (struct sockaddr *)&clientAddr,
+                     &lenOfIncomingAddr);
+       if (sock != -1) {       
+         /* verify clientAddr for eligibility here (ipcheck-style,
+            user should be able to specify who is allowed to connect,
+            otherwise we just close and reject the communication! */
+
+         IPaddr ipaddr;
+         memcpy(&ipaddr,
+                &clientAddr.sin_addr,
+                sizeof(struct in_addr));
+
+         createNewSession(sock);         
+       }
+      }
+    }
+    if (FD_ISSET(tcp_pipe[0], &readSet)) {
+      /* allow reading multiple signals in one go in case we get many
+        in one shot... */
+#define MAXSIG_BUF 128
+      char buf[MAXSIG_BUF];
+      /* just a signal to refresh sets, eat and continue */
+      if (0 >= READ(tcp_pipe[0],
+                   &buf[0],
+                   MAXSIG_BUF)) {
+      }
+    }
+    for (i=0;i<tsessionCount;i++) {
+      TCPSession * tcpSession = tsessions[i]->internal;
+      int sock = tcpSession->sock;
+      if (FD_ISSET(sock, &readSet)) {
+       if (SYSERR == readAndProcess(i)) {
+         destroySession(i);
+         i--;
+         continue;
+       }
+      }
+      if (FD_ISSET(sock, &writeSet)) {
+       size_t ret;
+       int success;
+
+try_again_1:
+       success = SEND_NONBLOCKING(sock,
+                                  tcpSession->wbuff,
+                                  tcpSession->wpos,
+                                  &ret);
+       if ( (success == SYSERR) || (ret == (size_t) -1) ) {
+         destroySession(i);
+         i--;
+         continue;
+        } else if (success == NO) {
+         /* this should only happen under Win9x because
+            of a bug in the socket implementation (KB177346).
+            Let's sleep and try again. */
+         PTHREAD_SLEEP(20);
+         goto try_again_1;
+        }
+       if (stats != NULL)
+         stats->change(stat_bytesSent,
+                       ret);
+
+       if (ret == 0) {
+          /* send only returns 0 on error (other side closed connection),
+          * so close the session */
+         destroySession(i);
+         i--;
+         continue;
+       }
+       if (ret == tcpSession->wpos) {
+         FREENONNULL(tcpSession->wbuff);
+         tcpSession->wbuff = NULL;
+         tcpSession->wpos  = 0;
+         tcpSession->wsize = 0;
+       } else {
+         memmove(tcpSession->wbuff,
+                 &tcpSession->wbuff[ret],
+                 tcpSession->wpos - ret);
+         tcpSession->wpos -= ret;
+       }
+      }
+      if (FD_ISSET(sock, &errorSet)) {
+       destroySession(i);
+       i--;
+       continue;
+      }
+      if ( ( tcpSession->users == 1) &&
+          (get_time() > tcpSession->lastUse + TCP_TIMEOUT) ) {
+       destroySession(i);
+       i--;
+       continue;
+      }
+    }
+  }
+  /* shutdown... */
+  if (tcp_sock != -1) {
+    CLOSE(tcp_sock);
+    tcp_sock = -1;
+  }
+  /* close all sessions */
+  while (tsessionCount > 0)
+    destroySession(0);
+  MUTEX_UNLOCK(tcplock);
+  SEMAPHORE_UP(serverSignal); /* we are there! */
+  return NULL;
+} /* end of tcp listen main */
+
+/**
+ * Send a message (already encapsulated if needed) via the
+ * tcp socket (or enqueue if sending now would block).
+ *
+ * @param tcpSession the session to use for sending
+ * @param mp the message to send
+ * @param ssize the size of the message
+ * @return OK if message send or queued, NO if queue is full and
+ * message was dropped, SYSERR on error
+ */
+static int tcpDirectSend(TCPSession * tcpSession,
+                        void * mp,
+                        unsigned int ssize) {
+  size_t ret;
+  int success;
+
+  if (tcp_shutdown == YES) {
+    return SYSERR;
+  }
+  if (tcpSession->sock == -1) {
+    return SYSERR;
+  }
+  if (ssize == 0) {
+    return SYSERR;
+  }
+  MUTEX_LOCK(tcplock);
+  if (tcpSession->wpos > 0) {
+    /* select already pending... */
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   ssize);
+    MUTEX_UNLOCK(tcplock);
+    return NO;
+  }
+  success = SEND_NONBLOCKING(tcpSession->sock,
+                            mp,
+                            ssize,
+                            &ret);
+  if (success == SYSERR) {
+    MUTEX_UNLOCK(tcplock);
+    return SYSERR;
+  }
+  if (success == NO)
+    ret = 0;
+  if (stats != NULL)
+    stats->change(stat_bytesSent,
+                 ret);
+
+  if (ret < ssize) {/* partial send */
+    if (tcpSession->wsize < ssize - ret) {
+      GROW(tcpSession->wbuff,
+          tcpSession->wsize,
+          ssize - ret);
+    }
+    memcpy(tcpSession->wbuff,
+          mp + ret,
+          ssize - ret);
+    tcpSession->wpos = ssize - ret;
+    signalSelect(); /* select set changed! */
+  }
+  tcpSession->lastUse = get_time();
+  MUTEX_UNLOCK(tcplock);
+  return OK;
+}
+
+/**
+ * Send a message (already encapsulated if needed) via the
+ * tcp socket.  Block if required.
+ *
+ * @param tcpSession the session to use for sending
+ * @param mp the message to send
+ * @param ssize the size of the message
+ * @return OK if message send or queued, NO if queue is full and
+ * message was dropped, SYSERR on error
+ */
+static int tcpDirectSendReliable(TCPSession * tcpSession,
+                                void * mp,
+                                unsigned int ssize) {
+  int ok;
+
+  if (tcp_shutdown == YES) {
+    return SYSERR;
+  }
+  if (tcpSession->sock == -1) {
+    return SYSERR;
+  }
+  if (ssize == 0) {
+    return SYSERR;
+  }
+  MUTEX_LOCK(tcplock);
+  if (tcpSession->wpos > 0) {
+    unsigned int old = tcpSession->wpos;
+    GROW(tcpSession->wbuff,
+        tcpSession->wsize,
+        tcpSession->wpos + ssize);
+    tcpSession->wpos += ssize;
+    memcpy(&tcpSession->wbuff[old],
+          mp,
+          ssize);
+    ok = OK;
+  } else {
+    ok = tcpDirectSend(tcpSession,
+                      mp,
+                      ssize);
+  }
+  MUTEX_UNLOCK(tcplock);
+  return ok;
+}
+
+/**
+ * Verify that a Hello-Message is correct (a node
+ * is reachable at that address). Since the reply
+ * will be asynchronous, a method must be called on
+ * success.
+ * @param helo the Hello message to verify
+ *        (the signature/crc have been verified before)
+ * @return OK on success, SYSERR on error
+ */
+static int verifyHelo(const P2P_hello_MESSAGE * helo) {
+  HostAddress * haddr;
+
+  haddr = (HostAddress*) &helo[1];
+  if ( (ntohs(helo->senderAddressSize) != sizeof(HostAddress)) ||
+       (ntohs(helo->header.size) != P2P_hello_MESSAGE_size(helo)) ||
+       (ntohs(helo->header.type) != p2p_PROTO_hello) ||
+       (ntohs(helo->protocol) != TCP_PROTOCOL_NUMBER) )
+    return SYSERR; /* obviously invalid */
+  else
+    return OK;
+}
+
+/**
+ * Create a hello-Message for the current node. The hello is
+ * created without signature and without a timestamp. The
+ * GNUnet core will sign the message and add an expiration time.
+ *
+ * @return hello on success, NULL on error
+ */
+static P2P_hello_MESSAGE * createhello() {
+  P2P_hello_MESSAGE * msg;
+  HostAddress * haddr;
+  unsigned short port;
+
+  port = getGNUnetTCPPort();
+  if (0 == port) {
+    static int once = 0;
+    if (once == 0) {
+      once = 1;
+    }
+    return NULL; /* TCP transport is configured SEND-only! */
+  }
+  msg = (P2P_hello_MESSAGE *) MALLOC(sizeof(P2P_hello_MESSAGE) + 
sizeof(HostAddress));
+  haddr = (HostAddress*) &msg[1];
+
+  if (SYSERR == getPublicIPAddress(coreAPI->cfg,
+                                  coreAPI->ectx,
+                                  &haddr->ip)) {
+    FREE(msg);
+    return NULL;
+  }
+  haddr->port = htons(port);
+  haddr->reserved = htons(0);
+  msg->senderAddressSize = htons(sizeof(HostAddress));
+  msg->protocol = htons(TCP_PROTOCOL_NUMBER);
+  msg->MTU = htonl(tcpAPI.mtu);
+  return msg;
+}
+
+/**
+ * Establish a connection to a remote node.
+ *
+ * @param helo the hello-Message for the target node
+ * @param tsessionPtr the session handle that is set
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpConnect(const P2P_hello_MESSAGE * helo,
+                     TSession ** tsessionPtr) {
+  int i;
+  HostAddress * haddr;
+  TCPWelcome welcome;
+  int sock;
+  TSession * tsession;
+  TCPSession * tcpSession;
+  struct sockaddr_in soaddr;
+
+  if (tcp_shutdown == YES)
+    return SYSERR;
+  haddr = (HostAddress*) &helo[1];
+  sock = SOCKET(PF_INET,
+               SOCK_STREAM,
+               6); /* 6: TCP */
+  if (sock == -1) {
+    return SYSERR;
+  }
+  if (0 != setBlocking(sock, NO)) {
+    CLOSE(sock);
+    return SYSERR;
+  }
+  memset(&soaddr,
+        0,
+        sizeof(soaddr));
+  soaddr.sin_family = AF_INET;
+
+  memcpy(&soaddr.sin_addr,
+        &haddr->ip,
+        sizeof(IPaddr));
+  soaddr.sin_port = haddr->port;
+  i = CONNECT(sock,
+             (struct sockaddr*)&soaddr,
+             sizeof(soaddr));
+  if ( (i < 0) &&
+       (errno != EINPROGRESS) ) {
+    CLOSE(sock);
+    return SYSERR;
+  }
+  if (0 != setBlocking(sock, NO)) {
+    CLOSE(sock);
+    return SYSERR;
+  }
+  tcpSession = MALLOC(sizeof(TCPSession));
+  tcpSession->sock = sock;
+  tcpSession->wpos = 0;
+  tcpSession->wbuff = NULL;
+  tcpSession->wsize = 0;
+  tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
+  tcpSession->rbuff = MALLOC(tcpSession->rsize);
+  tsession = MALLOC(sizeof(TSession));
+  tsession->internal = tcpSession;
+  tsession->ttype = tcpAPI.protocolNumber;
+  tcpSession->lock = MUTEX_CREATE(YES);
+  tcpSession->users = 2; /* caller + us */
+  tcpSession->pos = 0;
+  tcpSession->lastUse = get_time();
+  tcpSession->sender = helo->senderIdentity;
+  tcpSession->expectingWelcome = NO;
+  MUTEX_LOCK(tcplock);
+  i = addTSession(tsession);
+
+  /* send our node identity to the other side to fully establish the
+     connection! */
+  welcome.header.size
+    = htons(sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET));
+  welcome.header.reserved
+    = htons(0);
+  welcome.clientIdentity
+    = *(coreAPI->myIdentity);
+  if (SYSERR == tcpDirectSend(tcpSession,
+                             &welcome,
+                             sizeof(TCPWelcome))) {
+    destroySession(i);
+    tcpDisconnect(tsession);
+    MUTEX_UNLOCK(tcplock);
+    return SYSERR;
+  }
+  MUTEX_UNLOCK(tcplock);
+  signalSelect();
+
+  *tsessionPtr = tsession;
+  return OK;
+}
+
+/**
+ * Send a message to the specified remote node.
+ *
+ * @param tsession the P2P_hello_MESSAGE identifying the remote node
+ * @param msg the message
+ * @param size the size of the message
+ * @return SYSERR on error, OK on success
+ */
+static int tcpSend(TSession * tsession,
+                  const void * msg,
+                  unsigned int size,
+                  int important) {
+  TCPP2P_PACKET * mp;
+  int ok;
+
+  if (size >= MAX_BUFFER_SIZE) {
+    return SYSERR;
+  }
+
+  if (tcp_shutdown == YES) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
+    return SYSERR;
+  }
+  if (size == 0) {
+    return SYSERR;
+  }
+  if (((TCPSession*)tsession->internal)->sock == -1) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
+    return SYSERR; /* other side closed connection */
+  }
+  mp = MALLOC(sizeof(TCPP2P_PACKET) + size);
+  memcpy(&mp[1],
+        msg,
+        size);
+  mp->size = htons(size);
+  mp->reserved = 0;
+  /* if we would have less than TARGET_BUFFER_SIZE in buffers,
+     do reliable send */
+  if (((TCPSession*)tsession->internal)->wpos + size < TARGET_BUFFER_SIZE)
+    ok = tcpDirectSendReliable(tsession->internal,
+                              mp,
+                              size + sizeof(TCPP2P_PACKET));
+  else
+    ok = tcpDirectSend(tsession->internal,
+                      mp,
+                      size + sizeof(TCPP2P_PACKET));
+  FREE(mp);
+  return ok;
+}
+
+/**
+ * Start the server process to receive inbound traffic.
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int startTransportServer(void) {
+  struct sockaddr_in serverAddr;
+  const int on = 1;
+  unsigned short port;
+
+  if (serverSignal != NULL) {
+    return SYSERR;
+  }
+  serverSignal = SEMAPHORE_CREATE(0);
+  tcp_shutdown = NO;
+
+  if (0 != PIPE(tcp_pipe)) 
+    return SYSERR;  
+  setBlocking(tcp_pipe[1], NO);
+
+  port = getGNUnetTCPPort();
+  if (port != 0) { /* if port == 0, this is a read-only
+                     business! */
+    tcp_sock = SOCKET(PF_INET,
+                     SOCK_STREAM,
+                     0);
+    if (tcp_sock < 0) {
+      CLOSE(tcp_pipe[0]);
+      CLOSE(tcp_pipe[1]);
+      SEMAPHORE_DESTROY(serverSignal);
+      serverSignal = NULL;
+      tcp_shutdown = YES;
+      return SYSERR;
+    }
+    SETSOCKOPT(tcp_sock,
+              SOL_SOCKET,
+              SO_REUSEADDR,
+              &on,
+              sizeof(on));
+    memset((char *) &serverAddr,
+          0,
+          sizeof(serverAddr));
+    serverAddr.sin_family      = AF_INET;
+    serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+    serverAddr.sin_port        = htons(getGNUnetTCPPort());
+    if (BIND(tcp_sock,
+            (struct sockaddr *) &serverAddr,
+            sizeof(serverAddr)) < 0) {
+      CLOSE(tcp_sock);
+      tcp_sock = -1;
+      SEMAPHORE_DESTROY(serverSignal);
+      serverSignal = NULL;
+      return SYSERR;
+    }
+  } else
+    tcp_sock = -1;
+  listenThread = PTHREAD_CREATE((PThreadMain) &tcpListenMain,
+                               NULL,
+                               4092);
+  SEMAPHORE_DOWN(serverSignal, YES); /* wait for server to be up */
+  return OK;
+}
+
+/**
+ * Shutdown the server process (stop receiving inbound
+ * traffic). Maybe restarted later!
+ */
+static int stopTransportServer() {
+  void * unused;
+  int haveThread;
+
+  if (tcp_shutdown == YES)
+    return OK;
+  tcp_shutdown = YES;
+  signalSelect();
+  if (serverSignal != NULL) {
+    haveThread = YES;
+    SEMAPHORE_DOWN(serverSignal, YES);
+    SEMAPHORE_DESTROY(serverSignal);
+  } else
+    haveThread = NO;
+  serverSignal = NULL;
+  CLOSE(tcp_pipe[1]);
+  CLOSE(tcp_pipe[0]);
+  if (tcp_sock != -1) {
+    CLOSE(tcp_sock);
+    tcp_sock = -1;
+  }
+  if (haveThread == YES)
+    PTHREAD_JOIN(listenThread, &unused);
+  return OK;
+}
+
+/**
+ * Convert TCP address to a string.
+ */
+static char * addressToString(const P2P_hello_MESSAGE * helo) {
+  char * ret;
+  HostAddress * haddr;
+  size_t n;
+
+  haddr = (HostAddress*) &helo[1];
+  n = 4*4+6+6;
+  ret = MALLOC(n);
+  SNPRINTF(ret,
+          n,
+          "%u.%u.%u.%u:%u (TCP)",
+          PRIP(ntohl(*(int*)&haddr->ip.addr)),
+          ntohs(haddr->port));
+  return ret;
+}
+
+
+/* ******************** public API ******************** */
+
+/**
+ * The exported method. Makes the core api available
+ * via a global and returns the udp transport API.
+ */
+TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
+  tcplock = MUTEX_CREATE(YES);
+  tsessionCount = 0;
+  tsessionArrayLength = 0;
+  GROW(tsessions,
+       tsessionArrayLength,
+       32);
+  coreAPI = core;
+  stats = coreAPI->requestService("stats");
+  if (stats != NULL) {
+    stat_bytesReceived
+      = stats->create(gettext_noop("# bytes received via TCP"));
+    stat_bytesSent
+      = stats->create(gettext_noop("# bytes sent via TCP"));
+    stat_bytesDropped
+      = stats->create(gettext_noop("# bytes dropped by TCP (outgoing)"));
+  }
+  tcpAPI.protocolNumber       = TCP_PROTOCOL_NUMBER;
+  tcpAPI.mtu                  = 0;
+  tcpAPI.cost                 = 20000; /* about equal to udp */
+  tcpAPI.verifyHelo           = &verifyHelo;
+  tcpAPI.createhello           = &createhello;
+  tcpAPI.connect              = &tcpConnect;
+  tcpAPI.associate            = &tcpAssociate;
+  tcpAPI.send                 = &tcpSend;
+  tcpAPI.disconnect           = &tcpDisconnect;
+  tcpAPI.startTransportServer = &startTransportServer;
+  tcpAPI.stopTransportServer  = &stopTransportServer;
+  tcpAPI.addressToString      = &addressToString;
+
+  return &tcpAPI;
+}
+
+void donetransport_tcp() {
+  int i;
+
+  coreAPI->releaseService(stats);
+  stats = NULL;
+  for (i=tsessionCount-1;i>=0;i--)
+    destroySession(i);
+  GROW(tsessions,
+       tsessionArrayLength,
+       0);
+  FREENONNULL(filteredNetworks_);
+  MUTEX_DESTROY(tcplock);
+}
+
+/* end of tcp.c */


Property changes on: GNUnet/src/transports/tcp_old.c
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-11-11 02:26:01 UTC (rev 3685)
+++ GNUnet/src/transports/udp.c 2006-11-11 02:34:57 UTC (rev 3686)
@@ -97,7 +97,7 @@
   int sock;
   const int on = 1;
 
-  sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
+  sock = SOCKET(PF_INET, SOCK_DGRAM, 17);
   if (sock < 0) {
     GE_DIE_STRERROR(ectx,
                    GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
@@ -357,7 +357,7 @@
     if (selector == NULL)
       return SYSERR;
   }
-  sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
+  sock = SOCKET(PF_INET, SOCK_DGRAM, 17);
   if (sock == -1) {
     GE_LOG_STRERROR(ectx,
                    GE_ERROR | GE_ADMIN | GE_BULK,

Modified: GNUnet/src/transports/udp6.c
===================================================================
--- GNUnet/src/transports/udp6.c        2006-11-11 02:26:01 UTC (rev 3685)
+++ GNUnet/src/transports/udp6.c        2006-11-11 02:34:57 UTC (rev 3686)
@@ -101,7 +101,7 @@
 
   sock = SOCKET(PF_INET6,
                SOCK_DGRAM,
-               UDP_PROTOCOL_NUMBER);
+               17);
   if (sock < 0)
     GE_DIE_STRERROR(ectx,
                    GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
@@ -349,7 +349,7 @@
     if (selector == NULL)
       return SYSERR;
   }
-  sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
+  sock = SOCKET(PF_INET, SOCK_DGRAM, 17);
   if (sock == -1) {
     GE_LOG_STRERROR(ectx,
                    GE_ERROR | GE_ADMIN | GE_BULK,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]