[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3142 - in GNUnet/src: include transports util util/network
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3142 - in GNUnet/src: include transports util util/network util/network_client |
Date: |
Fri, 28 Jul 2006 01:30:10 -0700 (PDT) |
Author: grothoff
Date: 2006-07-28 01:29:51 -0700 (Fri, 28 Jul 2006)
New Revision: 3142
Added:
GNUnet/src/util/network/select.c
GNUnet/src/util/network_client/
GNUnet/src/util/network_client/Makefile.am
GNUnet/src/util/network_client/daemon.c
GNUnet/src/util/network_client/tcpio.c
GNUnet/src/util/network_client/tcpiotest.c
Removed:
GNUnet/src/util/network/daemon.c
GNUnet/src/util/network/tcpio.c
GNUnet/src/util/network/tcpiotest.c
Modified:
GNUnet/src/include/gnunet_protocols.h
GNUnet/src/include/gnunet_util_network.h
GNUnet/src/transports/Makefile.am
GNUnet/src/transports/tcp.c
GNUnet/src/util/Makefile.am
GNUnet/src/util/README
GNUnet/src/util/network/Makefile.am
GNUnet/src/util/network/io.c
Log:
improved select abstraction
Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h 2006-07-27 08:02:54 UTC (rev
3141)
+++ GNUnet/src/include/gnunet_protocols.h 2006-07-28 08:29:51 UTC (rev
3142)
@@ -414,6 +414,8 @@
*/
#define ONDEMAND_BLOCK 0xFFFFFFFF
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: GNUnet/src/include/gnunet_util_network.h
===================================================================
--- GNUnet/src/include/gnunet_util_network.h 2006-07-27 08:02:54 UTC (rev
3141)
+++ GNUnet/src/include/gnunet_util_network.h 2006-07-28 08:29:51 UTC (rev
3142)
@@ -36,8 +36,6 @@
#include "gnunet_util_string.h"
#include "gnunet_util_os.h"
#include "gnunet_util_threads.h"
-#include <sys/socket.h>
-#include <sys/select.h>
#ifdef __cplusplus
extern "C" {
@@ -115,6 +113,24 @@
} MESSAGE_HEADER;
/**
+ * Client-server communication: simple return value
+ */
+typedef struct {
+
+ /**
+ * The CS header (values: sizeof(CS_returnvalue_MESSAGE) + error-size,
CS_PROTO_RETURN_VALUE)
+ */
+ MESSAGE_HEADER header;
+
+ /**
+ * The return value (network byte order)
+ */
+ int return_value;
+
+} RETURN_VALUE_MESSAGE;
+
+
+/**
* @brief an IPv4 address
*/
typedef struct {
@@ -144,10 +160,56 @@
*/
struct CIDR6Network;
-struct ClientServerConnection;
-
+/**
+ * @brief handle for a system socket
+ */
struct SocketHandle;
+/**
+ * @brief handle for a select manager
+ */
+struct SelectHandle;
+
+/**
+ * @brief callback for handling messages received by select
+ *
+ * @param sock socket on which the message was received
+ * (should ONLY be used to queue reply using select methods)
+ * @return OK if message was valid, SYSERR if corresponding
+ * socket should be closed
+ */
+typedef int (*SelectMessageHandler)(void * mh_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx,
+ const MESSAGE_HEADER * msg);
+
+/**
+ * We've accepted a connection, check that
+ * the connection is valid and create the
+ * corresponding sock_ctx for the new
+ * connection.
+ *
+ * @param addr the address of the other side as reported by OS
+ * @param addr_len the size of the address
+ * @return NULL to reject connection, otherwise value of sock_ctx
+ * for the new connection
+ */
+typedef void * (*SelectAcceptHandler)(void * ah_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ const void * addr,
+ unsigned int addr_len);
+
+/**
+ * Select has been forced to close a connection.
+ * Free the associated context.
+ */
+typedef void (*SelectCloseHandler)(void * ch_cls,
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx);
+
/* *********************** endianess conversion ************* */
/**
@@ -230,171 +292,6 @@
const char * hostname,
IPaddr * ip);
-/* ***************** high-level GNUnet client-server connections *********** */
-
-/**
- * Get a connection with gnunetd.
- */
-struct ClientServerConnection *
-daemon_connection_create(struct GE_Context * ectx,
- struct GC_Configuration * cfg);
-
-/**
- * Initialize a GNUnet server socket.
- * @param sock the open socket
- * @param result the SOCKET (filled in)
- * @return OK (always successful)
- */
-struct ClientServerConnection *
-client_connection_create(struct GE_Context * ectx,
- struct GC_Configuration * cfg,
- struct SocketHandle * sock);
-
-/**
- * Close a GNUnet TCP socket for now (use to temporarily close
- * a TCP connection that will probably not be used for a long
- * time; the socket will still be auto-reopened by the
- * readFromSocket/writeToSocket methods if it is a client-socket).
- *
- * Also, you must still call connection_destroy to free all
- * resources associated with the connection.
- */
-void connection_close_temporarily(struct ClientServerConnection * sock);
-
-/**
- * Destroy connection between gnunetd and clients.
- * Also closes the connection if it is still active.
- */
-void connection_destroy(struct ClientServerConnection * con);
-
-/**
- * Check if a socket is open. Will ALWAYS return 'true' for a valid
- * client socket (even if the connection is closed), but will return
- * false for a closed server socket.
- *
- * @return 1 if open, 0 if closed
- */
-int connection_test_open(struct ClientServerConnection * sock);
-
-/**
- * Check a socket, open and connect if it is closed and it is a
- * client-socket.
- *
- * @return OK if the socket is now open, SYSERR if not
- */
-int connection_ensure_connected(struct ClientServerConnection * sock);
-
-/**
- * Read from a GNUnet client-server connection.
- *
- * @param sock the socket
- * @param buffer the buffer to write data to
- * if NULL == *buffer, *buffer is allocated (caller frees)
- * @return OK if the read was successful, SYSERR if the socket
- * was closed by the other side (if the socket is a
- * client socket and is used again, the next
- * read/write call will automatically attempt
- * to re-establish the connection).
- */
-int connection_read(struct ClientServerConnection * sock,
- MESSAGE_HEADER ** buffer);
-
-/**
- * Write to a GNUnet TCP socket.
- *
- * @param sock the socket to write to
- * @param buffer the buffer to write
- * @return OK if the write was sucessful,
- * NO if it would block and isBlocking was NO,
- * SYSERR if the write failed (error will be logged)
- */
-int connection_write(struct ClientServerConnection * sock,
- const MESSAGE_HEADER * buffer);
-
-/**
- * Obtain a simple return value from the connection.
- * Note that the protocol will automatically communicate
- * errors and pass those to the error context used when
- * the socket was created. In that case, read_result
- * will return SYSERR for the corresponding communication.
- *
- * @param sock the TCP socket
- * @param ret the return value from TCP
- * @return SYSERR on error, OK if the return value was
- * read successfully
- */
-int connection_read_result(struct ClientServerConnection * sock,
- int * ret);
-
-/**
- * Send a simple return value to the other side.
- *
- * @param sock the TCP socket
- * @param ret the return value to send via TCP
- * @return SYSERR on error, OK if the return value was
- * send successfully
- */
-int connection_write_result(struct ClientServerConnection * sock,
- int ret);
-
-/**
- * Send a return value that indicates
- * a serious error to the other side.
- *
- * @param sock the TCP socket
- * @param mask GE_MASK
- * @param date date string
- * @param msg message string
- * @return SYSERR on error, OK if the error code was send
- * successfully
- */
-int connection_write_error(struct ClientServerConnection * sock,
- GE_KIND mask,
- const char * date,
- const char * msg);
-
-/**
- * Stop gnunetd
- *
- * Note that returning an error does NOT mean that
- * gnunetd will continue to run (it may have been
- * shutdown by something else in the meantime or
- * crashed). Call connection_test_running() frequently
- * to check the status of gnunetd.
- *
- * Furthermore, note that this WILL potentially kill
- * gnunetd processes on remote machines that cannot
- * be restarted with startGNUnetDaemon!
- *
- * This function does NOT need the PID and will also
- * kill daemonized gnunetd's.
- *
- * @return OK successfully stopped, SYSERR: error
- */
-int connection_request_shutdown(struct ClientServerConnection * sock);
-
-/**
- * Checks if gnunetd is running
- *
- * Uses CS_PROTO_traffic_COUNT query to determine if gnunetd is
- * running.
- *
- * @return OK if gnunetd is running, SYSERR if not
- */
-int connection_test_running(struct GE_Context * ectx,
- struct GC_Configuration * cfg);
-
-/**
- * Wait until the gnunet daemon is
- * running.
- *
- * @param timeout how long to wait at most in ms
- * @return OK if gnunetd is now running
- */
-int connection_wait_for_running(struct GE_Context * ectx,
- struct GC_Configuration * cfg,
- cron_t timeout);
-
/* ********************* low-level socket operations **************** */
/**
@@ -407,17 +304,11 @@
struct LoadMonitor * mon,
int osSocket);
+/**
+ * Destroy the socket (also closes it).
+ */
void socket_destroy(struct SocketHandle * s);
-void socket_add_to_select_set(struct SocketHandle * s,
- fd_set * set,
- int * max);
-
-int socket_test_select_set(struct SocketHandle * sock,
- fd_set * set);
-
-int socket_get_os_socket(struct SocketHandle * sock);
-
/**
* Depending on doBlock, enable or disable the nonblocking mode
* of socket s.
@@ -459,8 +350,8 @@
void * buf,
size_t max,
size_t * read,
- struct sockaddr * from,
- socklen_t * fromlen);
+ char * from,
+ unsigned int * fromlen);
/**
* Do a write on the given socket.
@@ -484,8 +375,8 @@
const void * buf,
size_t max,
size_t * sent,
- const struct sockaddr * dst,
- socklen_t dstlen);
+ const char * dst,
+ unsigned int dstlen);
/**
* Check if socket is valid
@@ -494,6 +385,75 @@
int socket_test_valid(struct SocketHandle * s);
+/* ********************* select operations **************** */
+
+
+/**
+ * Start a select thread that will accept connections
+ * from the given socket and pass messages read to the
+ * given message handler.
+ *
+ * @param sock the listen socket
+ * @param max_addr_len maximum expected length of addresses for
+ * connections accepted on the given socket
+ * @param timeout after how long should inactive connections be
+ * closed? Use 0 for no timeout
+ * @param mon maybe NULL
+ * @param memory_quota amount of memory available for
+ * queueing messages (in bytes)
+ * @return NULL on error
+ */
+struct SelectHandle * select_create(struct GE_Context * ectx,
+ struct LoadMonitor * mon,
+ int sock,
+ unsigned int max_addr_len,
+ cron_t timeout,
+ SelectMessageHandler mh,
+ void * mh_cls,
+ SelectAcceptHandler ah,
+ void * ah_cls,
+ SelectCloseHandler ch,
+ void * ch_cls,
+ unsigned int memory_quota);
+
+/**
+ * Terminate the select thread, close the socket and
+ * all associated connections.
+ */
+void select_destroy(struct SelectHandle * sh);
+
+/**
+ * Queue the given message with the select thread.
+ *
+ * @param mayBlock if YES, blocks this thread until message
+ * has been sent
+ * @param force message is important, queue even if
+ * there is not enough space
+ * @return OK if the message was sent or queued
+ * NO if there was not enough memory to queue it,
+ * SYSERR if the sock does not belong with this select
+ */
+int select_write(struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ const MESSAGE_HEADER * msg,
+ int mayBlock,
+ int force);
+
+/**
+ * Add another (already connected) socket to the set of
+ * sockets managed by the select.
+ */
+int select_connect(struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx);
+
+/**
+ * Close the associated socket and remove it from the
+ * set of sockets managed by select.
+ */
+int select_disconnect(struct SelectHandle * sh,
+ struct SocketHandle * sock);
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: GNUnet/src/transports/Makefile.am
===================================================================
--- GNUnet/src/transports/Makefile.am 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/transports/Makefile.am 2006-07-28 08:29:51 UTC (rev 3142)
@@ -15,7 +15,7 @@
libip.la
if !MINGW
-# smtptransport = libgnunettransport_smtp.la
+ smtptransport = libgnunettransport_smtp.la
endif
libip_la_SOURCES = \
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/transports/tcp.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -134,36 +134,6 @@
*/
int expectingWelcome;
- /**
- * Current read position in the buffer.
- */
- unsigned int pos;
-
- /**
- * Current size of the read buffer.
- */
- unsigned int rsize;
-
- /**
- * The read buffer.
- */
- char * rbuff;
-
- /**
- * Position in the write buffer
- */
- unsigned int wpos;
-
- /**
- * The write buffer.
- */
- char * wbuff;
-
- /**
- * Size of the write buffer
- */
- unsigned int wsize;
-
} TCPSession;
/* *********** globals ************* */
@@ -183,66 +153,15 @@
static int stat_bytesDropped;
-/**
- * one thread for listening for new connections,
- * and for reading on all open sockets
- */
-static struct PTHREAD * listenThread;
-
-/**
- * sock is the tcp socket that we listen on for new inbound
- * connections.
- */
-static struct SocketHandle * tcp_sock;
-
-/**
- * tcp_pipe is used to signal the thread that is
- * blocked in a select call that the set of sockets to listen
- * to has changed.
- */
-static int tcp_pipe[2];
-
-/**
- * Array of currently active TCP sessions.
- */
-static TSession ** tsessions = NULL;
-
-static unsigned int tsessionCount;
-
-static unsigned int tsessionArrayLength;
-
/* configuration */
static struct CIDRNetwork * filteredNetworks_;
-/**
- * Lock for access to mutable state of the module,
- * that is the configuration and the tsessions array.
- * Note that we ONLY need to synchronize access to
- * the tsessions array when adding or removing sessions,
- * since removing is done only by one thread and we just
- * need to avoid another thread adding an element at the
- * same point in time. We do not need to synchronize at
- * every access point since adding new elements does not
- * prevent the select thread from operating and removing
- * is done by the only therad that reads from the array.
- */
-static struct MUTEX * tcplock;
+static struct SelectHandle * selector;
-/**
- * Semaphore used by the server-thread to signal that
- * the server has been started -- and later again to
- * signal that the server has been stopped.
- */
-static struct SEMAPHORE * serverSignal;
-
-static int tcp_shutdown = YES;
-
static struct GE_Context * ectx;
static struct GC_Configuration * cfg;
-static struct LoadMonitor * load_monitor;
-
/* ******************** helper functions *********************** */
/**
@@ -259,23 +178,6 @@
}
/**
- * Write to the pipe to wake up the select thread (the set of
- * files to watch has changed).
- */
-static void signalSelect() {
- char i = 0;
- int ret;
-
- ret = WRITE(tcp_pipe[1],
- &i,
- sizeof(char));
- if (ret != sizeof(char))
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_BULK,
- "write");
-}
-
-/**
* Disconnect from a remote node. May only be called
* on sessions that were aquired by the caller first.
* For the core, aquiration means to call associate or
@@ -604,254 +506,6 @@
}
/**
- * Main method for the thread listening on the tcp socket and all tcp
- * connections. Whenever a message is received, it is forwarded to the
- * core. This thread waits for activity on any of the TCP connections
- * and processes deferred (async) writes and buffers reads until an
- * entire message has been received.
- */
-static void * tcpListenMain(void * unused) {
- struct sockaddr_in clientAddr;
- fd_set readSet;
- fd_set errorSet;
- fd_set writeSet;
- struct stat buf;
- socklen_t lenOfIncomingAddr;
- int i;
- int max;
- int ret;
-
- SEMAPHORE_UP(serverSignal); /* we are there! */
- MUTEX_LOCK(tcplock);
- while (tcp_shutdown == NO) {
- FD_ZERO(&readSet);
- FD_ZERO(&errorSet);
- FD_ZERO(&writeSet);
- if (tcp_pipe[0] != -1) {
- if (-1 != FSTAT(tcp_pipe[0], &buf)) {
- FD_SET(tcp_pipe[0],
- &readSet);
- } else {
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- "fstat");
- tcp_pipe[0] = -1; /* prevent us from error'ing all the time */
- }
- }
- max = tcp_pipe[0];
- if (tcp_sock != NULL) {
- if (socket_test_valid(tcp_sock)) {
- socket_add_to_select_set(tcp_sock, &readSet, &max);
- } else {
- socket_destroy(tcp_sock);
- tcp_sock = NULL; /* prevent us from error'ing all the time */
- }
- }
-#if DEBUG_TCP
- else
- GE_LOG(ectx,
- GE_USER | GE_WARNING | GE_BULK,
- _("TCP server socket not open!\n"));
-#endif
- for (i=0;i<tsessionCount;i++) {
- TCPSession * tcpSession = tsessions[i]->internal;
- struct SocketHandle * sock = tcpSession->sock;
- if (sock != NULL) {
- if (socket_test_valid(sock)) {
- socket_add_to_select_set(sock, &readSet, &max);
- socket_add_to_select_set(sock, &errorSet, &max);
- if (tcpSession->wpos > 0)
- socket_add_to_select_set(sock, &writeSet, &max); /* do we have a
pending write request? */
- } else {
- destroySession(i);
- }
- } else {
- GE_BREAK(ectx, 0); /* sock in tsessions array should never be -1 */
- destroySession(i);
- }
- }
- MUTEX_UNLOCK(tcplock);
- ret = SELECT(max+1,
- &readSet,
- &writeSet,
- &errorSet,
- NULL);
- MUTEX_LOCK(tcplock);
- if ( (ret == -1) &&
- ( (errno == EAGAIN) || (errno == EINTR) ) )
- continue;
- if (ret == -1) {
- if (errno == EBADF) {
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
- "select");
- } else {
- GE_DIE_STRERROR(ectx,
- GE_FATAL | GE_ADMIN | GE_USER | GE_IMMEDIATE,
- "select");
- }
- }
- if (tcp_sock != NULL) {
- if (socket_test_select_set(tcp_sock, &readSet)) {
- int sock;
-
- lenOfIncomingAddr = sizeof(clientAddr);
- sock = ACCEPT(socket_get_os_socket(tcp_sock),
- (struct sockaddr *)&clientAddr,
- &lenOfIncomingAddr);
- if (sock != -1) {
- /* verify clientAddr for eligibility here (ipcheck-style,
- user should be able to specify who is allowed to connect,
- otherwise we just close and reject the communication! */
-
- IPaddr ipaddr;
- GE_ASSERT(ectx,
- sizeof(struct in_addr) == sizeof(IPaddr));
- memcpy(&ipaddr,
- &clientAddr.sin_addr,
- sizeof(struct in_addr));
-
- if (YES == isBlacklisted(ipaddr)) {
- GE_LOG(ectx,
- GE_INFO | GE_USER | GE_ADMIN | GE_REQUEST,
- _("%s: Rejected connection from blacklisted "
- "address %u.%u.%u.%u.\n"),
- "TCP",
- PRIP(ntohl(*(int*)&clientAddr.sin_addr)));
- if (0 != SHUTDOWN(sock, 2))
- GE_LOG_STRERROR(ectx,
- GE_USER | GE_ADMIN | GE_WARNING | GE_BULK,
- "shutdown");
- if (0 != CLOSE(sock))
- GE_LOG_STRERROR(ectx,
- GE_USER | GE_ADMIN | GE_WARNING | GE_BULK,
- "close");
- } else {
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_INFO,
- "Accepted connection from %u.%u.%u.%u.\n",
- PRIP(ntohl(*(int*)&clientAddr.sin_addr)));
-#endif
- createNewSession(sock);
- }
- } else {
- GE_LOG_STRERROR(ectx,
- GE_WARNING | GE_ADMIN | GE_BULK,
- "accept");
- }
- }
- }
- if (FD_ISSET(tcp_pipe[0], &readSet)) {
- /* allow reading multiple signals in one go in case we get many
- in one shot... */
-#define MAXSIG_BUF 128
- char buf[MAXSIG_BUF];
- /* just a signal to refresh sets, eat and continue */
- if (0 >= READ(tcp_pipe[0],
- &buf[0],
- MAXSIG_BUF)) {
- GE_LOG_STRERROR(ectx,
- GE_WARNING | GE_USER | GE_BULK,
- "read");
- }
- }
- for (i=0;i<tsessionCount;i++) {
- TCPSession * tcpSession = tsessions[i]->internal;
- struct SocketHandle * sock = tcpSession->sock;
- if (socket_test_select_set(sock, &readSet)) {
- if (SYSERR == readAndProcess(i)) {
- destroySession(i);
- i--;
- continue;
- }
- }
- if (socket_test_select_set(sock, &writeSet)) {
- size_t ret;
- int success;
-
-try_again_1:
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "TCP: trying to send %u bytes\n",
- tcpSession->wpos);
-#endif
- success = socket_send(sock,
- NC_Nonblocking,
- tcpSession->wbuff,
- tcpSession->wpos,
- &ret);
- if (success == SYSERR) {
- GE_LOG_STRERROR(ectx,
- GE_WARNING | GE_USER | GE_ADMIN | GE_BULK,
- "send");
- destroySession(i);
- i--;
- continue;
- } else if (success == NO) {
- /* this should only happen under Win9x because
- of a bug in the socket implementation (KB177346).
- Let's sleep and try again. */
- PTHREAD_SLEEP(20 * cronMILLIS);
- goto try_again_1;
- }
- if (stats != NULL)
- stats->change(stat_bytesSent,
- ret);
-
-#if DEBUG_TCP
- GE_LOG(ectx,
- GE_DEBUG | GE_USER | GE_BULK,
- "TCP: transmitted %u bytes\n",
- ret);
-#endif
- if (ret == 0) {
- /* send only returns 0 on error (other side closed connection),
- * so close the session */
- destroySession(i);
- i--;
- continue;
- }
- if (ret == tcpSession->wpos) {
- FREENONNULL(tcpSession->wbuff);
- tcpSession->wbuff = NULL;
- tcpSession->wpos = 0;
- tcpSession->wsize = 0;
- } else {
- memmove(tcpSession->wbuff,
- &tcpSession->wbuff[ret],
- tcpSession->wpos - ret);
- tcpSession->wpos -= ret;
- }
- }
- if (socket_test_select_set(sock, &errorSet)) {
- destroySession(i);
- i--;
- continue;
- }
- if ( ( tcpSession->users == 1) &&
- (get_time() > tcpSession->lastUse + TCP_TIMEOUT) ) {
- destroySession(i);
- i--;
- continue;
- }
- }
- }
- /* shutdown... */
- if (tcp_sock != NULL) {
- socket_destroy(tcp_sock);
- tcp_sock = NULL;
- }
- /* close all sessions */
- while (tsessionCount > 0)
- destroySession(0);
- MUTEX_UNLOCK(tcplock);
- SEMAPHORE_UP(serverSignal); /* we are there! */
- return NULL;
-} /* end of tcp listen main */
-
-/**
* Send a message (already encapsulated if needed) via the
* tcp socket (or enqueue if sending now would block).
*
@@ -1425,20 +1079,8 @@
s);
} else
tcp_sock = NULL;
- listenThread = PTHREAD_CREATE(&tcpListenMain,
- NULL,
- 5 * 1024);
- if (listenThread == NULL) {
- GE_LOG_STRERROR(ectx,
- GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
- "pthread_create");
- socket_destroy(tcp_sock);
- tcp_sock = NULL;
- SEMAPHORE_DESTROY(serverSignal);
- serverSignal = NULL;
- return SYSERR;
- }
- SEMAPHORE_DOWN(serverSignal, YES); /* wait for server to be up */
+
+ /* FIXME: call network/select code! */
return OK;
}
Modified: GNUnet/src/util/Makefile.am
===================================================================
--- GNUnet/src/util/Makefile.am 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/Makefile.am 2006-07-28 08:29:51 UTC (rev 3142)
@@ -3,7 +3,7 @@
getopt disk threads \
os network . \
config_impl cron crypto \
- containers loggers
+ containers loggers network_client
INCLUDES = -I$(top_srcdir)/src/include
Modified: GNUnet/src/util/README
===================================================================
--- GNUnet/src/util/README 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/README 2006-07-28 08:29:51 UTC (rev 3142)
@@ -25,6 +25,8 @@
=> linked to gnunetutil_containers.so (also requires
libgnunetutil_crypto)
util/loggers: specific logging implementations (depends on gnunetutil.so)
=> linked to gnunetutil_logging.so
+util/network_client: library for synchronous communiation of clients with
gnunetd (depends on gnunetutil.so)
+ => linked to gnunetutil_network_client.so
Most GNUnet libraries and plugins will only need to (directly) link
against gnunetutil.so. Some may also require crypto or containers.
Modified: GNUnet/src/util/network/Makefile.am
===================================================================
--- GNUnet/src/util/network/Makefile.am 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/Makefile.am 2006-07-28 08:29:51 UTC (rev 3142)
@@ -6,11 +6,10 @@
libnetwork.la
libnetwork_la_SOURCES = \
- daemon.c \
endian.c \
io.c \
ipcheck.c \
- tcpio.c
+ select.c
check_PROGRAMS = \
tcpiotest
Deleted: GNUnet/src/util/network/daemon.c
===================================================================
--- GNUnet/src/util/network/daemon.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/daemon.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,101 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2005, 2006 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file src/util/network/daemon.c
- * @brief code for client-gnunetd interaction (stop, check running)
- * @author Christian Grothoff
- */
-
-#include "platform.h"
-#include "gnunet_util_network.h"
-#include "gnunet_protocols.h"
-#include "gnunet_util_threads.h"
-
-int connection_test_running(struct GE_Context * ectx,
- struct GC_Configuration * cfg) {
- struct ClientServerConnection * sock;
- MESSAGE_HEADER csHdr;
- int ret;
-
- sock = daemon_connection_create(ectx, cfg);
- if (sock == NULL)
- return SYSERR;
- csHdr.size
- = htons(sizeof(MESSAGE_HEADER));
- csHdr.type
- = htons(CS_PROTO_traffic_COUNT);
- if (SYSERR == connection_write(sock,
- &csHdr)) {
- connection_destroy(sock);
- return SYSERR;
- }
- if (SYSERR == connection_read_result(sock,
- &ret)) {
- connection_destroy(sock);
- return SYSERR;
- }
- connection_destroy(sock);
- return OK;
-}
-
-int connection_request_shutdown(struct ClientServerConnection * sock) {
- MESSAGE_HEADER csHdr;
- int ret;
-
- csHdr.size
- = htons(sizeof(MESSAGE_HEADER));
- csHdr.type
- = htons(CS_PROTO_SHUTDOWN_REQUEST);
- if (SYSERR == connection_write(sock,
- &csHdr)) {
- connection_close_temporarily(sock);
- return SYSERR;
- }
- if (SYSERR == connection_read_result(sock,
- &ret)) {
- connection_close_temporarily(sock);
- return SYSERR;
- }
- return ret;
-}
-
-/**
- * Wait until the gnunet daemon is
- * running.
- *
- * @param timeout how long to wait at most
- * @return OK if gnunetd is now running
- */
-int connection_wait_for_running(struct GE_Context * ectx,
- struct GC_Configuration * cfg,
- cron_t timeout) {
- timeout += get_time();
- while (OK != connection_test_running(ectx,
- cfg)) {
- PTHREAD_SLEEP(100 * cronMILLIS);
- if (timeout < get_time())
- return connection_test_running(ectx,
- cfg);
- }
- return OK;
-}
-
-/* end of daemon.c */
Modified: GNUnet/src/util/network/io.c
===================================================================
--- GNUnet/src/util/network/io.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/io.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -28,36 +28,8 @@
#include "gnunet_util_network.h"
#include "platform.h"
+#include "network.h"
-typedef struct SocketHandle {
-
- struct LoadMonitor * mon;
-
- struct GE_Context * ectx;
-
- int handle;
-
-} SocketHandle;
-
-
-void socket_add_to_select_set(struct SocketHandle * s,
- fd_set * set,
- int * max) {
- FD_SET(s->handle,
- set);
- if (*max < s->handle)
- *max = s->handle;
-}
-
-int socket_test_select_set(struct SocketHandle * sock,
- fd_set * set) {
- return FD_ISSET(sock->handle, set);
-}
-
-int socket_get_os_socket(struct SocketHandle * sock) {
- return sock->handle;
-}
-
struct SocketHandle *
socket_create(struct GE_Context * ectx,
struct LoadMonitor * mon,
@@ -190,8 +162,8 @@
void * buf,
size_t max,
size_t * read,
- struct sockaddr * from,
- socklen_t * fromlen) {
+ char * from,
+ unsigned int * fromlen) {
int flags;
size_t pos;
size_t ret;
@@ -221,7 +193,7 @@
&((char*)buf)[pos],
max - pos,
flags,
- from,
+ (struct sockaddr*) from,
fromlen);
if ( (ret == (size_t) -1) &&
(errno == EINTR) &&
@@ -321,8 +293,8 @@
const void * buf,
size_t max,
size_t * sent,
- const struct sockaddr * dst,
- socklen_t dstlen) {
+ const char * dst,
+ unsigned int dstlen) {
int flags;
size_t pos;
size_t ret;
@@ -353,7 +325,7 @@
&((char*)buf)[pos],
max - pos,
flags,
- dst,
+ (const struct sockaddr*) dst,
dstlen);
if ( (ret == (size_t) -1) &&
(errno == EINTR) &&
Added: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/select.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -0,0 +1,740 @@
+/*
+ This file is part of GNUnet.
+ (C) 2003, 2006 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file util/network/select.c
+ * @brief (network) input/output operations
+ * @author Christian Grothoff
+ *
+ * TODO: memory management (pool allocation!)
+ */
+
+#include "gnunet_util_network.h"
+#include "platform.h"
+#include "network.h"
+
+/**
+ * Select Session handle.
+ */
+typedef struct {
+
+ /**
+ * the socket
+ */
+ struct SocketHandle * sock;
+
+ /**
+ * Client connection context.
+ */
+ void * sock_ctx;
+
+ cron_t lastUse;
+
+ /**
+ * Current read position in the buffer.
+ */
+ unsigned int pos;
+
+ /**
+ * Current size of the read buffer.
+ */
+ unsigned int rsize;
+
+ /**
+ * The read buffer.
+ */
+ char * rbuff;
+
+ /**
+ * Position in the write buffer
+ */
+ unsigned int wpos;
+
+ /**
+ * The write buffer.
+ */
+ char * wbuff;
+
+ /**
+ * Size of the write buffer
+ */
+ unsigned int wsize;
+
+} Session;
+
+typedef struct SelectHandle {
+
+ /**
+ * mutex for synchronized access
+ */
+ struct MUTEX * lock;
+
+ /**
+ * one thread for listening for new connections,
+ * and for reading on all open sockets
+ */
+ struct PTHREAD * thread;
+
+ /**
+ * sock is the tcp socket that we listen on for new inbound
+ * connections.
+ */
+ struct SocketHandle * listen_sock;
+
+ /**
+ * tcp_pipe is used to signal the thread that is
+ * blocked in a select call that the set of sockets to listen
+ * to has changed.
+ */
+ int signal_pipe[2];
+
+ /**
+ * Array of currently active TCP sessions.
+ */
+ Session ** sessions;
+
+ unsigned int sessionCount;
+
+ unsigned int sessionArrayLength;
+
+ int shutdown;
+
+ struct GE_Context * ectx;
+
+ struct LoadMonitor * load_monitor;
+
+ unsigned int max_addr_len;
+
+ cron_t timeout;
+
+ SelectMessageHandler mh;
+
+ void * mh_cls;
+
+ SelectAcceptHandler ah;
+
+ void * ah_cls;
+
+ SelectCloseHandler ch;
+
+ void * ch_cls;
+
+ unsigned int memory_quota;
+
+} SelectHandle;
+
+static void add_to_select_set(struct SocketHandle * s,
+ fd_set * set,
+ int * max) {
+ FD_SET(s->handle,
+ set);
+ if (*max < s->handle)
+ *max = s->handle;
+}
+
+/**
+ * Write to the pipe to wake up the select thread (the set of
+ * files to watch has changed).
+ */
+static void signalSelect(SelectHandle * sh) {
+ static char i = '\0';
+ int ret;
+
+ ret = WRITE(sh->signal_pipe[1],
+ &i,
+ sizeof(char));
+ if (ret != sizeof(char))
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ "write");
+}
+
+/**
+ * Destroy the given session by closing the socket,
+ * releasing the buffers and removing it from the
+ * select set.
+ *
+ * This function may only be called if the tcplock is
+ * already held by the caller.
+ */
+static void destroySession(SelectHandle * sh,
+ Session * s) {
+ int i;
+
+ sh->ch(sh->ch_cls,
+ sh,
+ s->sock,
+ s->sock_ctx);
+ socket_destroy(s->sock);
+ GROW(s->rbuff,
+ s->rsize,
+ 0);
+ GROW(s->wbuff,
+ s->wsize,
+ 0);
+ for (i=0;i<sh->sessionCount;i++) {
+ if (sh->sessions[i] == s) {
+ sh->sessions[i] = sh->sessions[sh->sessionCount-1];
+ sh->sessionCount--;
+ break;
+ }
+ }
+ FREE(s);
+ if (sh->sessionCount * 2 < sh->sessionArrayLength)
+ GROW(sh->sessions,
+ sh->sessionArrayLength,
+ sh->sessionCount);
+}
+
+/**
+ * The socket of a session has data waiting, read and
+ * process!
+ *
+ * This function may only be called if the lock is
+ * already held by the caller.
+ * @return OK for success, SYSERR if session was destroyed
+ */
+static int readAndProcess(SelectHandle * sh,
+ Session * session) {
+ const MESSAGE_HEADER * pack;
+ int ret;
+ size_t recvd;
+ unsigned short len;
+
+ if (session->rsize == session->pos) {
+ /* read buffer too small, grow */
+ GROW(session->rbuff,
+ session->rsize,
+ session->rsize + 1024);
+ }
+ ret = socket_recv(session->sock,
+ NC_Blocking | NC_IgnoreInt,
+ &session->rbuff[session->pos],
+ session->rsize - session->pos,
+ &recvd);
+ if (ret != OK) {
+ destroySession(sh, session);
+ return SYSERR; /* other side closed connection */
+ }
+ session->pos += recvd;
+ while (sh->shutdown == NO) {
+ pack = (const MESSAGE_HEADER*) &session->rbuff[0];
+ len = ntohs(pack->size);
+ /* check minimum size */
+ if (len < sizeof(MESSAGE_HEADER)) {
+ GE_LOG(sh->ectx,
+ GE_WARNING | GE_USER | GE_BULK,
+ _("Received malformed message (too small) from connection.
Closing.\n"));
+ destroySession(sh, session);
+ return SYSERR;
+ }
+ if (len > session->rsize) /* if message larger than read buffer, grow! */
+ GROW(session->rbuff,
+ session->rsize,
+ len);
+
+ /* do we have the entire message? */
+ if (session->pos < len)
+ break; /* wait for more */
+
+ sh->mh(sh->mh_cls,
+ sh,
+ session->sock,
+ session->sock_ctx,
+ pack);
+ /* shrink buffer adequately */
+ memmove(&session->rbuff[0],
+ &session->rbuff[len],
+ session->pos - len);
+ session->pos -= len;
+ }
+ session->lastUse = get_time();
+ return OK;
+}
+
+/**
+ * The socket of a session has data waiting that can be
+ * transmitted, do it!
+ *
+ * This function may only be called if the lock is
+ * already held by the caller.
+ * @return OK for success, SYSERR if session was destroyed
+ */
+static int writeAndProcess(SelectHandle * sh,
+ Session * session) {
+ SocketHandle * sock;
+ int ret;
+ size_t size;
+
+ sock = session->sock;
+ while (sh->shutdown == NO) {
+ ret = socket_send(sock,
+ NC_Nonblocking,
+ session->wbuff,
+ session->wpos,
+ &size);
+ if (ret == SYSERR) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_WARNING | GE_USER | GE_ADMIN | GE_BULK,
+ "send");
+ destroySession(sh, session);
+ return SYSERR;
+ }
+ if (ret == OK) {
+ if (size == 0) {
+ /* send only returns 0 on error (happens if
+ other side closed connection), so close
+ the session */
+ destroySession(sh, session);
+ return SYSERR;
+ }
+ if (size == session->wpos) {
+ session->wpos = 0;
+ GROW(session->wbuff,
+ session->wsize,
+ 0);
+ break;
+ }
+ memmove(session->wbuff,
+ &session->wbuff[size],
+ session->wpos - size);
+ session->wpos -= size;
+ break;
+ }
+ GE_ASSERT(sh->ectx, ret == NO);
+ /* this should only happen under Win9x because
+ of a bug in the socket implementation (KB177346).
+ Let's sleep and try again. */
+ PTHREAD_SLEEP(20 * cronMILLIS);
+ }
+ session->lastUse = get_time();
+ return OK;
+}
+
+/**
+ * Thread that selects until it is signaled to shut down.
+ */
+static void * selectThread(void * ctx) {
+ struct SelectHandle * sh = ctx;
+ char * clientAddr;
+ fd_set readSet;
+ fd_set errorSet;
+ fd_set writeSet;
+ struct stat buf;
+ socklen_t lenOfIncomingAddr;
+ int i;
+ int max;
+ int ret;
+ int s;
+ void * sctx;
+ SocketHandle * sock;
+ Session * session;
+
+ clientAddr = MALLOC(sh->max_addr_len);
+ MUTEX_LOCK(sh->lock);
+ while (sh->shutdown == NO) {
+ FD_ZERO(&readSet);
+ FD_ZERO(&errorSet);
+ FD_ZERO(&writeSet);
+ if (sh->signal_pipe[0] != -1) {
+ if (-1 == FSTAT(sh->signal_pipe[0], &buf)) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ "fstat");
+ sh->signal_pipe[0] = -1; /* prevent us from error'ing all the time */
+ } else {
+ FD_SET(sh->signal_pipe[0],
+ &readSet);
+ }
+ }
+ max = sh->signal_pipe[0];
+ if (sh->listen_sock != NULL) {
+ if (! socket_test_valid(sh->listen_sock)) {
+ socket_destroy(sh->listen_sock);
+ GE_LOG(sh->ectx,
+ GE_USER | GE_ERROR | GE_BULK,
+ _("select listen socket not valid!\n"));
+ sh->listen_sock = NULL; /* prevent us from error'ing all the time */
+ } else {
+ add_to_select_set(sh->listen_sock, &readSet, &max);
+ }
+ }
+ for (i=0;i<sh->sessionCount;i++) {
+ Session * session = sh->sessions[i];
+ struct SocketHandle * sock = session->sock;
+
+ if (! socket_test_valid(sock)) {
+ destroySession(sh, session);
+ } else {
+ add_to_select_set(sock, &readSet, &max);
+ add_to_select_set(sock, &errorSet, &max);
+ if (session->wpos > 0)
+ add_to_select_set(sock, &writeSet, &max); /* do we have a pending
write request? */
+ }
+ }
+ MUTEX_UNLOCK(sh->lock);
+ ret = SELECT(max+1,
+ &readSet,
+ &writeSet,
+ &errorSet,
+ NULL);
+ MUTEX_LOCK(sh->lock);
+ if ( (ret == -1) &&
+ ( (errno == EAGAIN) || (errno == EINTR) ) )
+ continue;
+ if (ret == -1) {
+ if (errno == EBADF) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+ "select");
+ } else {
+ GE_DIE_STRERROR(sh->ectx,
+ GE_FATAL | GE_ADMIN | GE_USER | GE_IMMEDIATE,
+ "select");
+ }
+ }
+ if ( (sh->listen_sock != NULL) &&
+ (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+ lenOfIncomingAddr = sh->max_addr_len;
+ memset(clientAddr,
+ 0,
+ lenOfIncomingAddr);
+ s = ACCEPT(sh->listen_sock->handle,
+ (struct sockaddr *) clientAddr,
+ &lenOfIncomingAddr);
+ if (s == -1) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_WARNING | GE_ADMIN | GE_BULK,
+ "accept");
+ } else {
+ sock = socket_create(sh->ectx,
+ sh->load_monitor,
+ s);
+ sctx = sh->ah(sh->ah_cls,
+ sh,
+ sock,
+ clientAddr,
+ lenOfIncomingAddr);
+ if (sctx == NULL) {
+ socket_destroy(sock);
+ } else {
+ session = MALLOC(sizeof(Session));
+ memset(session, 0, sizeof(Session));
+ session->sock = sock;
+ session->sock_ctx = sctx;
+ session->lastUse = get_time();
+ if (sh->sessionArrayLength == sh->sessionCount)
+ GROW(sh->sessions,
+ sh->sessionArrayLength,
+ sh->sessionArrayLength + 4);
+ sh->sessions[sh->sessionCount++] = session;
+ }
+ }
+ }
+ if (FD_ISSET(sh->signal_pipe[0], &readSet)) {
+ /* allow reading multiple signals in one go in case we get many
+ in one shot... */
+#define MAXSIG_BUF 128
+ char buf[MAXSIG_BUF];
+ /* just a signal to refresh sets, eat and continue */
+ if (0 >= READ(sh->signal_pipe[0],
+ &buf[0],
+ MAXSIG_BUF)) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_WARNING | GE_USER | GE_BULK,
+ "read");
+ }
+ }
+ for (i=0;i<sh->sessionCount;i++) {
+ session = sh->sessions[i];
+ sock = session->sock;
+ if ( (FD_ISSET(sock->handle, &readSet)) &&
+ (SYSERR == readAndProcess(sh,
+ session)) ) {
+ i--;
+ continue;
+ }
+ if ( (FD_ISSET(sock->handle, &writeSet)) &&
+ (SYSERR == writeAndProcess(sh,
+ session)) ) {
+ i--;
+ continue;
+ }
+ if (FD_ISSET(sock->handle, &errorSet)) {
+ destroySession(sh,
+ session);
+ i--;
+ continue;
+ }
+ if ( (sh->timeout != 0) &&
+ (get_time() > session->lastUse + sh->timeout) ) {
+ destroySession(sh, session);
+ i--;
+ continue;
+ }
+ }
+ }
+ MUTEX_UNLOCK(sh->lock);
+ FREE(clientAddr);
+ return NULL;
+}
+
+static int makeNonblocking(struct GE_Context * ectx,
+ int handle) {
+#if MINGW
+ u_long l = 1;
+ if (ioctlsocket(handle,
+ FIONBIO,
+ &l) == SOCKET_ERROR) {
+ SetErrnoFromWinsockError(WSAGetLastError());
+ return SYSERR;
+ } else {
+ /* store the blocking mode */
+ __win_SetHandleBlockingMode(handle, 0);
+ }
+#else
+ int flags = fcntl(handle, F_GETFL);
+ flags |= O_NONBLOCK;
+ if (-1 == fcntl(handle,
+ F_SETFL,
+ flags)) {
+ GE_LOG_STRERROR(ectx,
+ GE_WARNING | GE_USER | GE_ADMIN | GE_IMMEDIATE,
+ "fcntl");
+ return SYSERR;
+ }
+#endif
+ return OK;
+}
+
+/**
+ * Start a select thread that will accept connections
+ * from the given socket and pass messages read to the
+ * given message handler.
+ *
+ * @param sock the listen socket
+ * @param max_addr_len maximum expected length of addresses for
+ * connections accepted on the given socket
+ * @param mon maybe NULL
+ * @param memory_quota amount of memory available for
+ * queueing messages (in bytes)
+ * @return NULL on error
+ */
+SelectHandle * select_create(struct GE_Context * ectx,
+ struct LoadMonitor * mon,
+ int sock,
+ unsigned int max_addr_len,
+ cron_t timeout,
+ SelectMessageHandler mh,
+ void * mh_cls,
+ SelectAcceptHandler ah,
+ void * ah_cls,
+ SelectCloseHandler ch,
+ void * ch_cls,
+ unsigned int memory_quota) {
+ SelectHandle * sh;
+
+ if (0 != LISTEN(sock, 5)) {
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_USER | GE_IMMEDIATE,
+ "listen");
+ return NULL;
+ }
+ sh = MALLOC(sizeof(SelectHandle));
+ memset(sh, 0, sizeof(SelectHandle));
+ if (0 != PIPE(sh->signal_pipe)) {
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_USER | GE_IMMEDIATE,
+ "pipe");
+ FREE(sh);
+ return NULL;
+ }
+ if (OK != makeNonblocking(sh->ectx,
+ sh->signal_pipe[0])) {
+ if ( (0 != CLOSE(sh->signal_pipe[0])) ||
+ (0 != CLOSE(sh->signal_pipe[1])) )
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+ "close");
+ FREE(sh);
+ return NULL;
+ }
+ sh->shutdown = NO;
+ sh->ectx = ectx;
+ sh->load_monitor = mon;
+ sh->max_addr_len = max_addr_len;
+ sh->mh = mh;
+ sh->mh_cls = mh_cls;
+ sh->ah = ah;
+ sh->ah_cls = ah_cls;
+ sh->ch = ch;
+ sh->ch_cls = ch_cls;
+ sh->memory_quota = memory_quota;
+ sh->timeout = timeout;
+ sh->lock = MUTEX_CREATE(NO);
+ sh->listen_sock = socket_create(ectx,
+ mon,
+ sock);
+ sh->thread = PTHREAD_CREATE(&selectThread,
+ sh,
+ 4 * 1024);
+ if (sh->thread == NULL) {
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+ "pthread_create");
+ socket_destroy(sh->listen_sock);
+ if ( (0 != CLOSE(sh->signal_pipe[0])) ||
+ (0 != CLOSE(sh->signal_pipe[1])) )
+ GE_LOG_STRERROR(ectx,
+ GE_ERROR | GE_IMMEDIATE | GE_ADMIN,
+ "close");
+ MUTEX_DESTROY(sh->lock);
+ FREE(sh);
+ return NULL;
+ }
+ return sh;
+}
+
+/**
+ * Terminate the select thread, close the socket and
+ * all associated connections.
+ */
+void select_destroy(struct SelectHandle * sh) {
+ void * unused;
+
+ sh->shutdown = YES;
+ signalSelect(sh);
+ PTHREAD_STOP_SLEEP(sh->thread);
+ PTHREAD_JOIN(sh->thread, &unused);
+ while (sh->sessionCount > 0)
+ destroySession(sh, sh->sessions[0]);
+ GROW(sh->sessions,
+ sh->sessionArrayLength,
+ 0);
+ MUTEX_DESTROY(sh->lock);
+ if (0 != CLOSE(sh->signal_pipe[1]))
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+ "close");
+ if (0 != CLOSE(sh->signal_pipe[0]))
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+ "close");
+ if (sh->listen_sock != NULL)
+ socket_destroy(sh->listen_sock);
+ FREE(sh);
+}
+
+/**
+ * Queue the given message with the select thread.
+ *
+ * @param mayBlock if YES, blocks this thread until message
+ * has been sent
+ * @param force message is important, queue even if
+ * there is not enough space
+ * @return OK if the message was sent or queued,
+ * NO if there was not enough memory to queue it,
+ * SYSERR if the sock does not belong with this select
+ */
+int select_write(struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ const MESSAGE_HEADER * msg,
+ int mayBlock,
+ int force) {
+ Session * session;
+ int i;
+ unsigned short len;
+ int fresh_write;
+
+ session = NULL;
+ len = ntohs(msg->size);
+ MUTEX_LOCK(sh->lock);
+ for (i=0;i<sh->sessionCount;i++)
+ if (sh->sessions[i]->sock == sock) {
+ session = sh->sessions[i];
+ break;
+ }
+ if (session == NULL) {
+ MUTEX_UNLOCK(sh->lock);
+ return SYSERR;
+ }
+ fresh_write = (session->wsize == 0);
+ GROW(session->wbuff,
+ session->wsize,
+ session->wsize + len);
+ memcpy(&session->wbuff[session->wpos],
+ msg,
+ len);
+ session->wpos += len;
+ MUTEX_UNLOCK(sh->lock);
+ if (fresh_write)
+ signalSelect(sh);
+ return OK;
+}
+
+/**
+ * Add another (already connected) socket to the set of
+ * sockets managed by the select.
+ */
+int select_connect(struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx) {
+ Session * session;
+
+ session = MALLOC(sizeof(Session));
+ memset(session, 0, sizeof(Session));
+ session->sock = sock;
+ session->sock_ctx = sock_ctx;
+ session->lastUse = get_time();
+ MUTEX_LOCK(sh->lock);
+ if (sh->sessionArrayLength == sh->sessionCount)
+ GROW(sh->sessions,
+ sh->sessionArrayLength,
+ sh->sessionArrayLength + 4);
+ sh->sessions[sh->sessionCount++] = session;
+ MUTEX_UNLOCK(sh->lock);
+ signalSelect(sh);
+ return OK;
+}
+
+/**
+ * Close the associated socket and remove it from the
+ * set of sockets managed by select.
+ */
+int select_disconnect(struct SelectHandle * sh,
+ struct SocketHandle * sock) {
+ Session * session;
+ int i;
+
+ MUTEX_LOCK(sh->lock);
+ for (i=0;i<sh->sessionCount;i++)
+ if (sh->sessions[i]->sock == sock) {
+ session = sh->sessions[i];
+ break;
+ }
+ if (session == NULL) {
+ MUTEX_UNLOCK(sh->lock);
+ return SYSERR;
+ }
+ destroySession(sh, session);
+ MUTEX_UNLOCK(sh->lock);
+ signalSelect(sh);
+ return OK;
+}
Property changes on: GNUnet/src/util/network/select.c
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: GNUnet/src/util/network/tcpio.c
===================================================================
--- GNUnet/src/util/network/tcpio.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/tcpio.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,459 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2001, 2002, 2006 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file util/network/tcpio.c
- * @brief code for synchronized access to TCP streams
- * @author Christian Grothoff
- *
- * Generic TCP code for reliable, mostly blocking, record-oriented TCP
- * connections. GNUnet uses the "tcpio" code for trusted client-server
- * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note
- * that an unblocking write is also provided since if both client and
- * server use blocking IO, both may block on a write and cause a
- * mutual inter-process deadlock.
- *
- * Since we do not want other peers (!) to be able to block a peer by
- * not reading from the TCP stream, the peer-to-peer TCP transport
- * uses unreliable, buffered, non-blocking, record-oriented TCP code
- * with a select call to reduce the number of threads which is
- * provided in transports/tcp.c.
- */
-
-#include "gnunet_util_network.h"
-#include "gnunet_util_os.h"
-#include "gnunet_util_config.h"
-#include "gnunet_protocols.h"
-#include "platform.h"
-
-#define DEBUG_TCPIO NO
-
-/**
- * Struct to refer to a GNUnet TCP connection.
- * This is more than just a socket because if the server
- * drops the connection, the client automatically tries
- * to reconnect (and for that needs connection information).
- */
-typedef struct ClientServerConnection {
-
- /**
- * the socket handle, NULL if not life
- */
- struct SocketHandle * sock;
-
- struct MUTEX * readlock;
-
- struct MUTEX * writelock;
-
- struct GE_Context * ectx;
-
- struct GC_Configuration * cfg;
-
- /**
- * If this is gnunetd's server socket, then we cannot
- * automatically reconnect after closing the connection
- * (since it is an "accept" that gives the socket).<p>
- *
- * If this is NO, we should query the configuration and
- * automagically try to reconnect.
- */
- int isServerSocket;
-
-} ClientServerConnection;
-
-
-/**
- * Return the port-number (in host byte order)
- * @return 0 on error
- */
-static unsigned short getGNUnetPort(struct GE_Context * ectx,
- struct GC_Configuration * cfg) {
- unsigned long long port;
-
- port = 2087;
- if (-1 == GC_get_configuration_value_number(cfg,
- "NETWORK",
- "PORT",
- 1,
- 65535,
- 2087,
- &port)) {
- GE_LOG(ectx,
- GE_ERROR | GE_USER | GE_BULK,
- _("Could not find valid value for PORT in section NETWORK."));
- return 0;
- }
- return (unsigned short) port;
-}
-
-/**
- * Configuration: get the GNUnetd host where the client
- * should connect to (via TCP)
- *
- * @return the name of the host, NULL on error
- */
-static char * getGNUnetdHost(struct GE_Context * ectx,
- struct GC_Configuration * cfg) {
- char * res;
-
- res = NULL;
- if (-1 == GC_get_configuration_value_string(cfg,
- "NETWORK",
- "HOST",
- "localhost",
- &res)) {
- GE_LOG(ectx,
- GE_ERROR | GE_USER | GE_BULK,
- _("Could not find valid value for HOST in section NETWORK."));
- return NULL;
- }
- return res;
-}
-
-struct ClientServerConnection *
-client_connection_create(struct GE_Context * ectx,
- struct GC_Configuration * cfg,
- struct SocketHandle * sock) {
- ClientServerConnection * result;
-
- result = MALLOC(sizeof(ClientServerConnection));
- result->sock = sock;
- result->readlock = MUTEX_CREATE(NO);
- result->writelock = MUTEX_CREATE(NO);
- result->ectx = ectx;
- result->cfg = cfg;
- result->isServerSocket = YES;
- return result;
-}
-
-
-/**
- * Get a GNUnet TCP socket that is connected to gnunetd.
- */
-struct ClientServerConnection *
-daemon_connection_create(struct GE_Context * ectx,
- struct GC_Configuration * cfg) {
- ClientServerConnection * result;
-
- result = MALLOC(sizeof(ClientServerConnection));
- result->sock = NULL;
- result->readlock = MUTEX_CREATE(NO);
- result->writelock = MUTEX_CREATE(NO);
- result->ectx = ectx;
- result->cfg = cfg;
- result->isServerSocket = NO;
- return result;
-}
-
-void connection_close_temporarily(struct ClientServerConnection * sock) {
- if (sock->sock != NULL) {
- socket_destroy(sock->sock);
- sock->sock = NULL;
- }
-}
-
-void connection_destroy(struct ClientServerConnection * sock) {
- connection_close_temporarily(sock);
- MUTEX_DESTROY(sock->readlock);
- MUTEX_DESTROY(sock->writelock);
- FREE(sock);
-}
-
-int connection_test_open(struct ClientServerConnection * sock) {
- return (sock->sock != NULL);
-}
-
-/**
- * Check a socket, open and connect if it is closed and it is a client-socket.
- */
-int connection_ensure_connected(struct ClientServerConnection * sock) {
- struct sockaddr_in soaddr;
- fd_set rset;
- fd_set wset;
- fd_set eset;
- struct timeval timeout;
- int ret;
- int osock;
- unsigned short port;
- char * host;
- IPaddr ip;
-
- if (sock->sock != NULL)
- return OK;
- port = getGNUnetPort(sock->ectx,
- sock->cfg);
- if (port == 0)
- return SYSERR;
- host = getGNUnetdHost(sock->ectx,
- sock->cfg);
- if (host == NULL)
- return SYSERR;
- if (SYSERR == get_host_by_name(sock->ectx,
- host,
- &ip)) {
- FREE(host);
- return SYSERR;
- }
- osock = SOCKET(PF_INET, SOCK_STREAM, 6); /* 6: TCP */
- if (osock == -1) {
- GE_LOG_STRERROR(sock->ectx,
- GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
- "socket");
- FREE(host);
- return SYSERR;
- }
- sock->sock = socket_create(sock->ectx,
- NULL,
- osock);
- socket_set_blocking(sock->sock, NO);
- soaddr.sin_family = AF_INET;
- GE_ASSERT(sock->ectx,
- sizeof(struct in_addr) == sizeof(IPaddr));
- memcpy(&soaddr.sin_addr,
- &ip,
- sizeof(struct in_addr));
- soaddr.sin_port = htons(port);
- ret = CONNECT(osock,
- (struct sockaddr*)&soaddr,
- sizeof(soaddr));
- if ( (ret < 0) &&
- (errno != EINPROGRESS) ) {
- GE_LOG(sock->ectx,
- GE_WARNING | GE_USER | GE_BULK,
- _("Cannot connect to %s:u: %s\n"),
- host,
- port,
- STRERROR(errno));
- socket_destroy(sock->sock);
- FREE(host);
- return SYSERR;
- }
- /* we call select() first with a timeout of 5s to
- avoid blocking on a later write indefinitely;
- Important if a local firewall decides to just drop
- the TCP handshake...*/
- FD_ZERO(&rset);
- FD_ZERO(&wset);
- FD_ZERO(&eset);
- FD_SET(osock, &wset);
- timeout.tv_sec = 5;
- timeout.tv_usec = 0;
- ret = SELECT(osock + 1,
- &rset,
- &wset,
- &eset,
- &timeout);
- if ( (ret == -1) ||
- (! FD_ISSET(osock,
- &wset)) ) {
- GE_LOG(sock->ectx,
- GE_WARNING | GE_USER | GE_BULK,
- _("Cannot connect to %s:u: %s\n"),
- host,
- port,
- STRERROR(errno));
- socket_destroy(sock->sock);
- FREE(host);
- return SYSERR;
- }
- FREE(host);
- socket_set_blocking(sock->sock, YES);
- return OK;
-}
-
-/**
- * Write to a GNUnet TCP socket. Will also potentially complete the
- * sending of a previous non-blocking writeToSocket call.
- *
- * @param sock the socket to write to
- * @param buffer the buffer to write
- * @return OK if the write was sucessful, otherwise SYSERR.
- */
-int connection_write(struct ClientServerConnection * sock,
- const MESSAGE_HEADER * buffer) {
- size_t size;
- size_t sent;
- int res;
-
- if (SYSERR == connection_ensure_connected(sock))
- return SYSERR;
- size = ntohs(buffer->size);
- MUTEX_LOCK(sock->writelock);
- res = socket_send(sock->sock,
- NC_Complete,
- buffer,
- size,
- &sent);
- if ( (res != YES) ||
- (sent != size) ) {
- connection_close_temporarily(sock);
- MUTEX_UNLOCK(sock->writelock);
- return SYSERR;
- }
- MUTEX_UNLOCK(sock->writelock);
- return OK;
-}
-
-int connection_read(struct ClientServerConnection * sock,
- MESSAGE_HEADER ** buffer) {
- int res;
- unsigned int pos;
- char * buf;
- unsigned short size;
-
- if (OK != connection_ensure_connected(sock))
- return SYSERR;
-
- MUTEX_LOCK(sock->readlock);
- pos = 0;
- res = 0;
- if ( (OK != socket_recv(sock->sock,
- NC_Complete,
- &size,
- sizeof(unsigned short),
- &pos)) ||
- (pos != sizeof(unsigned short)) ) {
- connection_close_temporarily(sock);
- MUTEX_UNLOCK(sock->readlock);
- return SYSERR;
- }
- size = ntohs(size);
- if (size < sizeof(MESSAGE_HEADER)) {
- connection_close_temporarily(sock);
- MUTEX_UNLOCK(sock->readlock);
- return SYSERR; /* invalid header */
- }
-
- buf = MALLOC(size);
- if ( (OK != socket_recv(sock->sock,
- NC_Complete,
- &buf[pos],
- size - pos,
- &pos)) ||
- (pos != sizeof(unsigned short) + size) ) {
- connection_close_temporarily(sock);
- FREE(buf);
- MUTEX_UNLOCK(sock->readlock);
- return SYSERR;
- }
-#if DEBUG_TCPIO
- LOG(LOG_DEBUG,
- "Successfully received %d bytes from TCP socket.\n",
- size);
-#endif
- MUTEX_UNLOCK(sock->readlock);
- *buffer = (MESSAGE_HEADER*) buf;
- (*buffer)->size = htons(size);
- return OK; /* success */
-}
-
-
-
-/**
- * CS communication: simple return value
- */
-typedef struct {
-
- /**
- * The CS header (values: sizeof(CS_returnvalue_MESSAGE) + error-size,
CS_PROTO_RETURN_VALUE)
- */
- MESSAGE_HEADER header;
-
- /**
- * The return value (network byte order)
- */
- int return_value;
-
-} RETURN_VALUE_MESSAGE;
-
-/**
- * Obtain a return value from a remote call from TCP.
- *
- * @param sock the TCP socket
- * @param ret the return value from TCP
- * @return SYSERR on error, OK if the return value was read
- * successfully
- */
-int connection_read_result(struct ClientServerConnection * sock,
- int * ret) {
- RETURN_VALUE_MESSAGE * rv;
-
- rv = NULL;
- if (SYSERR == connection_read(sock,
- (MESSAGE_HEADER **) &rv))
- return SYSERR;
- if ( (ntohs(rv->header.size) != sizeof(RETURN_VALUE_MESSAGE)) ||
- (ntohs(rv->header.type) != CS_PROTO_RETURN_VALUE) ) {
- GE_LOG(sock->ectx,
- GE_WARNING | GE_DEVELOPER | GE_BULK,
- _("`%s' failed, reply invalid!\n"),
- __FUNCTION__);
- FREE(rv);
- return SYSERR;
- }
- *ret = ntohl(rv->return_value);
- FREE(rv);
- return OK;
-}
-
-/**
- * Send a return value to the caller of a remote call via
- * TCP.
- * @param sock the TCP socket
- * @param ret the return value to send via TCP
- * @return SYSERR on error, OK if the return value was
- * send successfully
- */
-int connection_write_result(struct ClientServerConnection * sock,
- int ret) {
- RETURN_VALUE_MESSAGE rv;
-
- rv.header.size
- = htons(sizeof(RETURN_VALUE_MESSAGE));
- rv.header.type
- = htons(CS_PROTO_RETURN_VALUE);
- rv.return_value
- = htonl(ret);
- return connection_write(sock,
- &rv.header);
-}
-
-/**
- * Send a return value that indicates
- * a serious error to the other side.
- *
- * @param sock the TCP socket
- * @param mask GE_MASK
- * @param date date string
- * @param msg message string
- * @return SYSERR on error, OK if the error code was send
- * successfully
- */
-int connection_write_error(struct ClientServerConnection * sock,
- GE_KIND mask,
- const char * date,
- const char * msg) {
- return SYSERR; /* not implemented! */
-}
-
-
-
-
-/* end of tcpio.c */
Deleted: GNUnet/src/util/network/tcpiotest.c
===================================================================
--- GNUnet/src/util/network/tcpiotest.c 2006-07-27 08:02:54 UTC (rev 3141)
+++ GNUnet/src/util/network/tcpiotest.c 2006-07-28 08:29:51 UTC (rev 3142)
@@ -1,261 +0,0 @@
-/**
- * @file test/tcpiotest.c
- * @brief testcase for util/tcpiotest.c
- */
-
-#include "gnunet_util.h"
-#include "platform.h"
-
-static int openServerSocket() {
- int listenerFD;
- int listenerPort;
- struct sockaddr_in serverAddr;
- const int on = 1;
-
- listenerPort = getGNUnetPort();
- /* create the socket */
- while ( (listenerFD = SOCKET(PF_INET, SOCK_STREAM, 0)) < 0) {
- LOG(LOG_ERROR,
- "ERROR opening socket (%s). "
- "No client service started. "
- "Trying again in 30 seconds.\n",
- STRERROR(errno));
- sleep(30);
- }
-
- /* fill in the inet address structure */
- memset((char *) &serverAddr,
- 0,
- sizeof(serverAddr));
- serverAddr.sin_family = AF_INET;
- serverAddr.sin_addr.s_addr=htonl(INADDR_ANY);
- serverAddr.sin_port=htons(listenerPort);
-
- if ( SETSOCKOPT(listenerFD,
- SOL_SOCKET,
- SO_REUSEADDR,
- &on, sizeof(on)) < 0 )
- perror("setsockopt");
-
- /* bind the socket */
- if (BIND (listenerFD,
- (struct sockaddr *) &serverAddr,
- sizeof(serverAddr)) < 0) {
- LOG(LOG_ERROR,
- "ERROR (%s) binding the TCP listener to port %d. "
- "Test failed. Is gnunetd running?\n",
- STRERROR(errno),
- listenerPort);
- return -1;
- }
-
- /* start listening for new connections */
- if (0 != LISTEN(listenerFD, 5)) {
- LOG(LOG_ERROR,
- " listen failed: %s\n",
- STRERROR(errno));
- return -1;
- }
- return listenerFD;
-}
-
-static int doAccept(int serverSocket) {
- int incomingFD;
- int lenOfIncomingAddr;
- struct sockaddr_in clientAddr;
-
- incomingFD = -1;
- while (incomingFD < 0) {
- lenOfIncomingAddr = sizeof(clientAddr);
- incomingFD = ACCEPT(serverSocket,
- (struct sockaddr *)&clientAddr,
- &lenOfIncomingAddr);
- if (incomingFD < 0) {
- LOG(LOG_ERROR,
- "ERROR accepting new connection (%s).\n",
- STRERROR(errno));
- continue;
- }
- }
- return incomingFD;
-}
-
-/**
- * Perform option parsing from the command line.
- */
-static int parseCommandLine(int argc,
- char * argv[]) {
- char c;
-
- while (1) {
- int option_index = 0;
- static struct GNoption long_options[] = {
- { "config", 1, 0, 'c' },
- { 0,0,0,0 }
- };
-
- c = GNgetopt_long(argc,
- argv,
- "c:",
- long_options,
- &option_index);
-
- if (c == -1)
- break; /* No more flags to process */
-
- switch(c) {
- case 'c':
- FREENONNULL(setConfigurationString("FILES",
- "gnunet.conf",
- GNoptarg));
- break;
- } /* end of parsing commandline */
- }
- FREENONNULL(setConfigurationString("GNUNETD",
- "LOGFILE",
- NULL));
- FREENONNULL(setConfigurationString("GNUNETD",
- "LOGLEVEL",
- "DEBUG"));
- return OK;
-}
-
-static int testTransmission(GNUNET_TCP_SOCKET * a,
- GNUNET_TCP_SOCKET * b) {
- CS_MESSAGE_HEADER * hdr;
- CS_MESSAGE_HEADER * buf;
- int i;
- int j;
-
- hdr = MALLOC(1024);
- for (i=0;i<1024-sizeof(CS_MESSAGE_HEADER);i+=7) {
- fprintf(stderr, ".");
- for (j=0;j<i;j++)
- ((char*)&hdr[1])[j] = (char)i+j;
- hdr->size = htons(i+sizeof(CS_MESSAGE_HEADER));
- hdr->type = 0;
- if (OK != writeToSocket(a, hdr)) {
- FREE(hdr);
- return 1;
- }
- buf = NULL;
- if (OK != readFromSocket(b, &buf)) {
- FREE(hdr);
- return 2;
- }
- if (0 != memcmp(buf, hdr, i+sizeof(CS_MESSAGE_HEADER))) {
- FREE(buf);
- FREE(hdr);
- return 4;
- }
- FREE(buf);
- }
- FREE(hdr);
- return 0;
-}
-
-static int testNonblocking(GNUNET_TCP_SOCKET * a,
- GNUNET_TCP_SOCKET * b) {
- CS_MESSAGE_HEADER * hdr;
- CS_MESSAGE_HEADER * buf;
- int i;
- int cnt;
-
- hdr = MALLOC(1024);
- for (i=0;i<1024-sizeof(CS_MESSAGE_HEADER);i+=11)
- ((char*)&hdr[1])[i] = (char)i;
- hdr->size = htons(64+sizeof(CS_MESSAGE_HEADER));
- hdr->type = 0;
- while (OK == writeToSocketNonBlocking(a,
- hdr))
- hdr->type++;
- i = 0;
- cnt = hdr->type;
- /* printf("Reading %u messages.\n", cnt); */
- if (cnt < 2)
- return 8; /* could not write ANY data non-blocking!? */
- for (i=0;i<cnt;i++) {
- hdr->type = i;
- buf = NULL;
- if (OK != readFromSocket(b, &buf)) {
- FREE(hdr);
- return 16;
- }
- if (0 != memcmp(buf, hdr, 64+sizeof(CS_MESSAGE_HEADER))) {
- printf("Failure in message %u. Headers: %d ? %d\n",
- i,
- buf->type,
- hdr->type);
- FREE(buf);
- FREE(hdr);
- return 32;
- }
- FREE(buf);
- if (i == cnt - 2) {
- /* printf("Blocking write to flush last non-blocking message.\n"); */
- hdr->type = cnt;
- if (OK != writeToSocket(a,
- hdr)) {
- FREE(hdr);
- return 64;
- }
- }
- }
- hdr->type = i;
- buf = NULL;
- if (OK != readFromSocket(b, &buf)) {
- FREE(hdr);
- return 128;
- }
- if (0 != memcmp(buf, hdr, 64+sizeof(CS_MESSAGE_HEADER))) {
- FREE(buf);
- FREE(hdr);
- return 256;
- }
- FREE(buf);
- FREE(hdr);
- return 0;
-}
-
-int main(int argc, char * argv[]){
- int i;
- int ret;
- int serverSocket;
- GNUNET_TCP_SOCKET * clientSocket;
- GNUNET_TCP_SOCKET acceptSocket;
-
- ret = 0;
- initUtil(argc, argv, &parseCommandLine);
- serverSocket = openServerSocket();
- clientSocket = getClientSocket();
- if (serverSocket == -1) {
- releaseClientSocket(clientSocket);
- doneUtil();
- return 1;
- }
- for (i=0;i<2;i++) {
- if (OK == checkSocket(clientSocket)) {
- if (OK == initGNUnetServerSocket(doAccept(serverSocket),
- &acceptSocket)) {
- ret = ret | testTransmission(clientSocket, &acceptSocket);
- ret = ret | testTransmission(&acceptSocket, clientSocket);
- ret = ret | testNonblocking(clientSocket, &acceptSocket);
- ret = ret | testNonblocking(&acceptSocket, clientSocket);
- closeSocketTemporarily(clientSocket);
- destroySocket(&acceptSocket);
- fprintf(stderr, "\n");
- } else {
- fprintf(stderr, "initGNUnetServerSocket failed.\n");
- ret = -1;
- }
- } else {
- fprintf(stderr, "checkSocket faild.\n");
- ret = -1;
- }
- }
- releaseClientSocket(clientSocket);
- doneUtil();
- if (ret > 0)
- fprintf(stderr, "Error %d\n", ret);
- return ret;
-}
Copied: GNUnet/src/util/network_client/Makefile.am (from rev 3130,
GNUnet/src/util/network/Makefile.am)
===================================================================
--- GNUnet/src/util/network/Makefile.am 2006-07-24 23:05:12 UTC (rev 3130)
+++ GNUnet/src/util/network_client/Makefile.am 2006-07-28 08:29:51 UTC (rev
3142)
@@ -0,0 +1,22 @@
+INCLUDES = -I$(top_srcdir)/src/include
+
+SUBDIRS = .
+
+lib_LTLIBRARIES = \
+ libgnunetutil_network_client.la
+
+libgnunetutil_network_client_la_SOURCES = \
+ daemon.c \
+ tcpio.c
+
+check_PROGRAMS = \
+ tcpiotest
+
+TESTS = $(check_PROGRAMS)
+
+tcpiotest_SOURCES = \
+ tcpiotest.c
+tcpiotest_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/util/network_client/libgnunetutil_network_client.la
+
Copied: GNUnet/src/util/network_client/daemon.c (from rev 3130,
GNUnet/src/util/network/daemon.c)
Copied: GNUnet/src/util/network_client/tcpio.c (from rev 3130,
GNUnet/src/util/network/tcpio.c)
===================================================================
--- GNUnet/src/util/network/tcpio.c 2006-07-24 23:05:12 UTC (rev 3130)
+++ GNUnet/src/util/network_client/tcpio.c 2006-07-28 08:29:51 UTC (rev
3142)
@@ -0,0 +1,409 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2006 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file util/network/tcpio.c
+ * @brief code for synchronized access to TCP streams
+ * @author Christian Grothoff
+ *
+ * Generic TCP code for reliable, mostly blocking, record-oriented TCP
+ * connections. GNUnet uses the "tcpio" code for trusted client-server
+ * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note
+ * that an unblocking write is also provided since if both client and
+ * server use blocking IO, both may block on a write and cause a
+ * mutual inter-process deadlock.
+ *
+ * Since we do not want other peers (!) to be able to block a peer by
+ * not reading from the TCP stream, the peer-to-peer TCP transport
+ * uses unreliable, buffered, non-blocking, record-oriented TCP code
+ * with a select call to reduce the number of threads which is
+ * provided in transports/tcp.c.
+ */
+
+#include "gnunet_util_network.h"
+#include "gnunet_util_os.h"
+#include "gnunet_util_config.h"
+#include "gnunet_protocols.h"
+#include "platform.h"
+
+#define DEBUG_TCPIO NO
+
+/**
+ * Struct to refer to a GNUnet TCP connection.
+ * This is more than just a socket because if the server
+ * drops the connection, the client automatically tries
+ * to reconnect (and for that needs connection information).
+ */
+typedef struct ClientServerConnection {
+
+ /**
+ * the socket handle, NULL if not life
+ */
+ struct SocketHandle * sock;
+
+ struct MUTEX * readlock;
+
+ struct MUTEX * writelock;
+
+ struct GE_Context * ectx;
+
+ struct GC_Configuration * cfg;
+
+} ClientServerConnection;
+
+
+/**
+ * Return the port-number (in host byte order)
+ * @return 0 on error
+ */
+static unsigned short getGNUnetPort(struct GE_Context * ectx,
+ struct GC_Configuration * cfg) {
+ unsigned long long port;
+
+ port = 2087;
+ if (-1 == GC_get_configuration_value_number(cfg,
+ "NETWORK",
+ "PORT",
+ 1,
+ 65535,
+ 2087,
+ &port)) {
+ GE_LOG(ectx,
+ GE_ERROR | GE_USER | GE_BULK,
+ _("Could not find valid value for PORT in section NETWORK."));
+ return 0;
+ }
+ return (unsigned short) port;
+}
+
+/**
+ * Configuration: get the GNUnetd host where the client
+ * should connect to (via TCP)
+ *
+ * @return the name of the host, NULL on error
+ */
+static char * getGNUnetdHost(struct GE_Context * ectx,
+ struct GC_Configuration * cfg) {
+ char * res;
+
+ res = NULL;
+ if (-1 == GC_get_configuration_value_string(cfg,
+ "NETWORK",
+ "HOST",
+ "localhost",
+ &res)) {
+ GE_LOG(ectx,
+ GE_ERROR | GE_USER | GE_BULK,
+ _("Could not find valid value for HOST in section NETWORK."));
+ return NULL;
+ }
+ return res;
+}
+
+struct ClientServerConnection *
+client_connection_create(struct GE_Context * ectx,
+ struct GC_Configuration * cfg,
+ struct SocketHandle * sock) {
+ ClientServerConnection * result;
+
+ result = MALLOC(sizeof(ClientServerConnection));
+ result->sock = sock;
+ result->readlock = MUTEX_CREATE(NO);
+ result->writelock = MUTEX_CREATE(NO);
+ result->ectx = ectx;
+ result->cfg = cfg;
+ result->isServerSocket = YES;
+ return result;
+}
+
+void connection_close_temporarily(struct ClientServerConnection * sock) {
+ if (sock->sock != NULL) {
+ socket_destroy(sock->sock);
+ sock->sock = NULL;
+ }
+}
+
+void connection_destroy(struct ClientServerConnection * sock) {
+ connection_close_temporarily(sock);
+ MUTEX_DESTROY(sock->readlock);
+ MUTEX_DESTROY(sock->writelock);
+ FREE(sock);
+}
+
+int connection_test_open(struct ClientServerConnection * sock) {
+ return (sock->sock != NULL);
+}
+
+/**
+ * Check a socket, open and connect if it is closed and it is a client-socket.
+ */
+int connection_ensure_connected(struct ClientServerConnection * sock) {
+ struct sockaddr_in soaddr;
+ fd_set rset;
+ fd_set wset;
+ fd_set eset;
+ struct timeval timeout;
+ int ret;
+ int osock;
+ unsigned short port;
+ char * host;
+ IPaddr ip;
+
+ if (sock->sock != NULL)
+ return OK;
+ port = getGNUnetPort(sock->ectx,
+ sock->cfg);
+ if (port == 0)
+ return SYSERR;
+ host = getGNUnetdHost(sock->ectx,
+ sock->cfg);
+ if (host == NULL)
+ return SYSERR;
+ if (SYSERR == get_host_by_name(sock->ectx,
+ host,
+ &ip)) {
+ FREE(host);
+ return SYSERR;
+ }
+ osock = SOCKET(PF_INET, SOCK_STREAM, 6); /* 6: TCP */
+ if (osock == -1) {
+ GE_LOG_STRERROR(sock->ectx,
+ GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+ "socket");
+ FREE(host);
+ return SYSERR;
+ }
+ sock->sock = socket_create(sock->ectx,
+ NULL,
+ osock);
+ socket_set_blocking(sock->sock, NO);
+ soaddr.sin_family = AF_INET;
+ GE_ASSERT(sock->ectx,
+ sizeof(struct in_addr) == sizeof(IPaddr));
+ memcpy(&soaddr.sin_addr,
+ &ip,
+ sizeof(struct in_addr));
+ soaddr.sin_port = htons(port);
+ ret = CONNECT(osock,
+ (struct sockaddr*)&soaddr,
+ sizeof(soaddr));
+ if ( (ret < 0) &&
+ (errno != EINPROGRESS) ) {
+ GE_LOG(sock->ectx,
+ GE_WARNING | GE_USER | GE_BULK,
+ _("Cannot connect to %s:u: %s\n"),
+ host,
+ port,
+ STRERROR(errno));
+ socket_destroy(sock->sock);
+ FREE(host);
+ return SYSERR;
+ }
+ /* we call select() first with a timeout of 5s to
+ avoid blocking on a later write indefinitely;
+ Important if a local firewall decides to just drop
+ the TCP handshake...*/
+ FD_ZERO(&rset);
+ FD_ZERO(&wset);
+ FD_ZERO(&eset);
+ FD_SET(osock, &wset);
+ timeout.tv_sec = 5;
+ timeout.tv_usec = 0;
+ ret = SELECT(osock + 1,
+ &rset,
+ &wset,
+ &eset,
+ &timeout);
+ if ( (ret == -1) ||
+ (! FD_ISSET(osock,
+ &wset)) ) {
+ GE_LOG(sock->ectx,
+ GE_WARNING | GE_USER | GE_BULK,
+ _("Cannot connect to %s:u: %s\n"),
+ host,
+ port,
+ STRERROR(errno));
+ socket_destroy(sock->sock);
+ FREE(host);
+ return SYSERR;
+ }
+ FREE(host);
+ socket_set_blocking(sock->sock, YES);
+ return OK;
+}
+
+/**
+ * Write to a GNUnet TCP socket. Will also potentially complete the
+ * sending of a previous non-blocking writeToSocket call.
+ *
+ * @param sock the socket to write to
+ * @param buffer the buffer to write
+ * @return OK if the write was sucessful, otherwise SYSERR.
+ */
+int connection_write(struct ClientServerConnection * sock,
+ const MESSAGE_HEADER * buffer) {
+ size_t size;
+ size_t sent;
+ int res;
+
+ if (SYSERR == connection_ensure_connected(sock))
+ return SYSERR;
+ size = ntohs(buffer->size);
+ MUTEX_LOCK(sock->writelock);
+ res = socket_send(sock->sock,
+ NC_Complete,
+ buffer,
+ size,
+ &sent);
+ if ( (res != YES) ||
+ (sent != size) ) {
+ connection_close_temporarily(sock);
+ MUTEX_UNLOCK(sock->writelock);
+ return SYSERR;
+ }
+ MUTEX_UNLOCK(sock->writelock);
+ return OK;
+}
+
+int connection_read(struct ClientServerConnection * sock,
+ MESSAGE_HEADER ** buffer) {
+ int res;
+ unsigned int pos;
+ char * buf;
+ unsigned short size;
+
+ if (OK != connection_ensure_connected(sock))
+ return SYSERR;
+
+ MUTEX_LOCK(sock->readlock);
+ pos = 0;
+ res = 0;
+ if ( (OK != socket_recv(sock->sock,
+ NC_Complete,
+ &size,
+ sizeof(unsigned short),
+ &pos)) ||
+ (pos != sizeof(unsigned short)) ) {
+ connection_close_temporarily(sock);
+ MUTEX_UNLOCK(sock->readlock);
+ return SYSERR;
+ }
+ size = ntohs(size);
+ if (size < sizeof(MESSAGE_HEADER)) {
+ connection_close_temporarily(sock);
+ MUTEX_UNLOCK(sock->readlock);
+ return SYSERR; /* invalid header */
+ }
+
+ buf = MALLOC(size);
+ if ( (OK != socket_recv(sock->sock,
+ NC_Complete,
+ &buf[pos],
+ size - pos,
+ &pos)) ||
+ (pos != sizeof(unsigned short) + size) ) {
+ connection_close_temporarily(sock);
+ FREE(buf);
+ MUTEX_UNLOCK(sock->readlock);
+ return SYSERR;
+ }
+#if DEBUG_TCPIO
+ LOG(LOG_DEBUG,
+ "Successfully received %d bytes from TCP socket.\n",
+ size);
+#endif
+ MUTEX_UNLOCK(sock->readlock);
+ *buffer = (MESSAGE_HEADER*) buf;
+ (*buffer)->size = htons(size);
+ return OK; /* success */
+}
+
+/**
+ * Obtain a return value from a remote call from TCP.
+ *
+ * @param sock the TCP socket
+ * @param ret the return value from TCP
+ * @return SYSERR on error, OK if the return value was read
+ * successfully
+ */
+int connection_read_result(struct ClientServerConnection * sock,
+ int * ret) {
+ RETURN_VALUE_MESSAGE * rv;
+
+ rv = NULL;
+ if (SYSERR == connection_read(sock,
+ (MESSAGE_HEADER **) &rv))
+ return SYSERR;
+ if ( (ntohs(rv->header.size) != sizeof(RETURN_VALUE_MESSAGE)) ||
+ (ntohs(rv->header.type) != CS_PROTO_RETURN_VALUE) ) {
+ GE_LOG(sock->ectx,
+ GE_WARNING | GE_DEVELOPER | GE_BULK,
+ _("`%s' failed, reply invalid!\n"),
+ __FUNCTION__);
+ FREE(rv);
+ return SYSERR;
+ }
+ *ret = ntohl(rv->return_value);
+ FREE(rv);
+ return OK;
+}
+
+/**
+ * Send a return value to the caller of a remote call via
+ * TCP.
+ * @param sock the TCP socket
+ * @param ret the return value to send via TCP
+ * @return SYSERR on error, OK if the return value was
+ * send successfully
+ */
+int connection_write_result(struct ClientServerConnection * sock,
+ int ret) {
+ RETURN_VALUE_MESSAGE rv;
+
+ rv.header.size
+ = htons(sizeof(RETURN_VALUE_MESSAGE));
+ rv.header.type
+ = htons(CS_PROTO_RETURN_VALUE);
+ rv.return_value
+ = htonl(ret);
+ return connection_write(sock,
+ &rv.header);
+}
+
+/**
+ * Send a return value that indicates
+ * a serious error to the other side.
+ *
+ * @param sock the TCP socket
+ * @param mask GE_MASK
+ * @param date date string
+ * @param msg message string
+ * @return SYSERR on error, OK if the error code was send
+ * successfully
+ */
+int connection_write_error(struct ClientServerConnection * sock,
+ GE_KIND mask,
+ const char * date,
+ const char * msg) {
+ return SYSERR; /* not implemented! */
+}
+
+
+/* end of tcpio.c */
Copied: GNUnet/src/util/network_client/tcpiotest.c (from rev 3130,
GNUnet/src/util/network/tcpiotest.c)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3142 - in GNUnet/src: include transports util util/network util/network_client,
grothoff <=