[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3399 - in GNUnet: . src/include src/server src/transports
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3399 - in GNUnet: . src/include src/server src/transports src/util/network |
Date: |
Wed, 13 Sep 2006 23:15:27 -0700 (PDT) |
Author: grothoff
Date: 2006-09-13 23:15:21 -0700 (Wed, 13 Sep 2006)
New Revision: 3399
Modified:
GNUnet/src/include/gnunet_util_network.h
GNUnet/src/server/tcpserver.c
GNUnet/src/transports/tcp.c
GNUnet/src/transports/tcp6.c
GNUnet/src/transports/udp.c
GNUnet/src/transports/udp6.c
GNUnet/src/transports/udp_helper.c
GNUnet/src/util/network/select.c
GNUnet/todo
Log:
working on udp tbench
Modified: GNUnet/src/include/gnunet_util_network.h
===================================================================
--- GNUnet/src/include/gnunet_util_network.h 2006-09-14 05:09:41 UTC (rev
3398)
+++ GNUnet/src/include/gnunet_util_network.h 2006-09-14 06:15:21 UTC (rev
3399)
@@ -410,6 +410,7 @@
* @return NULL on error
*/
struct SelectHandle * select_create(const char * desc,
+ int is_udp,
struct GE_Context * ectx,
struct LoadMonitor * mon,
int sock,
Modified: GNUnet/src/server/tcpserver.c
===================================================================
--- GNUnet/src/server/tcpserver.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/server/tcpserver.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -319,6 +319,7 @@
return SYSERR;
}
selector = select_create("tcpserver",
+ NO,
ectx,
NULL,
listenerFD,
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/tcp.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -330,10 +330,11 @@
return SYSERR;
}
selector = select_create("tcp",
+ NO,
ectx,
coreAPI->load_monitor,
s,
- sizeof(IPaddr),
+ sizeof(struct sockaddr_in),
TCP_TIMEOUT,
&select_message_handler,
NULL,
Modified: GNUnet/src/transports/tcp6.c
===================================================================
--- GNUnet/src/transports/tcp6.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/tcp6.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2003, 2004, 2005 Christian Grothoff (and other contributing authors)
+ (C) 2003, 2004, 2005, 2006 Christian Grothoff (and other contributing
authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -339,6 +339,7 @@
return SYSERR;
}
selector = select_create("tcp6",
+ NO,
ectx,
coreAPI->load_monitor,
s,
Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -98,41 +98,46 @@
const int on = 1;
sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER);
- if (sock < 0)
+ if (sock < 0) {
GE_DIE_STRERROR(ectx,
GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
"socket");
+ return -1;
+ }
if ( SETSOCKOPT(sock,
SOL_SOCKET,
SO_REUSEADDR,
&on,
- sizeof(on)) < 0 )
+ sizeof(on)) < 0 ) {
GE_DIE_STRERROR(ectx,
GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
"setsockopt");
- if (port != 0) {
- memset(&sin,
- 0,
- sizeof(sin));
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = INADDR_ANY;
- sin.sin_port = htons(port);
- if (BIND(sock,
- (struct sockaddr *)&sin,
- sizeof(sin)) < 0) {
- GE_LOG_STRERROR(ectx,
- GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
- "bind");
- GE_LOG(ectx,
- GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
- _("Failed to bind to UDP port %d.\n"),
- port);
- GE_DIE_STRERROR(ectx,
- GE_FATAL | GE_USER | GE_IMMEDIATE,
- "bind");
- }
- } /* do not bind if port == 0, then we use
- send-only! */
+ return -1;
+ }
+ GE_ASSERT(NULL, port != 0);
+ memset(&sin,
+ 0,
+ sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = INADDR_ANY;
+ sin.sin_port = htons(port);
+ if (BIND(sock,
+ (struct sockaddr *)&sin,
+ sizeof(sin)) < 0) {
+ GE_LOG_STRERROR(ectx,
+ GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+ "bind");
+ GE_LOG(ectx,
+ GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+ _("Failed to bind to UDP port %d.\n"),
+ port);
+ GE_DIE_STRERROR(ectx,
+ GE_FATAL | GE_USER | GE_IMMEDIATE,
+ "bind");
+ return -1;
+ }
+ /* do not bind if port == 0, then we use
+ send-only! */
return sock;
}
@@ -339,15 +344,16 @@
if (sock == -1)
return SYSERR;
selector = select_create("udp",
+ YES,
ectx,
load_monitor,
sock,
- sizeof(IPaddr),
+ sizeof(struct sockaddr_in),
0, /* timeout */
&select_message_handler,
NULL,
&select_accept_handler,
- NULL,
+ &isBlacklisted,
&select_close_handler,
NULL,
0 /* memory quota */ );
Modified: GNUnet/src/transports/udp6.c
===================================================================
--- GNUnet/src/transports/udp6.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp6.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -337,15 +337,16 @@
if (sock == -1)
return SYSERR;
selector = select_create("udp6",
+ YES,
ectx,
load_monitor,
sock,
- sizeof(IPaddr),
+ sizeof(struct sockaddr_in6),
0, /* timeout */
&select_message_handler,
NULL,
&select_accept_handler,
- NULL,
+ &isBlacklisted,
&select_close_handler,
NULL,
0 /* memory quota */ );
Modified: GNUnet/src/transports/udp_helper.c
===================================================================
--- GNUnet/src/transports/udp_helper.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/transports/udp_helper.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -24,6 +24,9 @@
* @author Christian Grothoff
*/
+typedef int (*BlacklistedTester)(const void * addr,
+ unsigned int addr_len);
+
/**
* Message-Packet header.
*/
@@ -115,6 +118,10 @@
const void * addr,
unsigned int addr_len) {
static int nonnullpointer;
+ BlacklistedTester blt = ah_cls;
+ if (NO != blt(addr,
+ addr_len))
+ return NULL;
return &nonnullpointer;
}
Modified: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/src/util/network/select.c 2006-09-14 06:15:21 UTC (rev 3399)
@@ -104,17 +104,37 @@
*/
struct SocketHandle * listen_sock;
+ struct GE_Context * ectx;
+
+ struct LoadMonitor * load_monitor;
+
/**
+ * Array of currently active TCP sessions.
+ */
+ Session ** sessions;
+
+ SelectMessageHandler mh;
+
+ SelectAcceptHandler ah;
+
+ SelectCloseHandler ch;
+
+ void * mh_cls;
+
+ void * ah_cls;
+
+ void * ch_cls;
+
+ cron_t timeout;
+
+ /**
* tcp_pipe is used to signal the thread that is
* blocked in a select call that the set of sockets to listen
* to has changed.
*/
int signal_pipe[2];
- /**
- * Array of currently active TCP sessions.
- */
- Session ** sessions;
+ int is_udp;
unsigned int sessionCount;
@@ -122,26 +142,8 @@
int shutdown;
- struct GE_Context * ectx;
-
- struct LoadMonitor * load_monitor;
-
unsigned int max_addr_len;
- cron_t timeout;
-
- SelectMessageHandler mh;
-
- void * mh_cls;
-
- SelectAcceptHandler ah;
-
- void * ah_cls;
-
- SelectCloseHandler ch;
-
- void * ch_cls;
-
unsigned int memory_quota;
} SelectHandle;
@@ -413,6 +415,7 @@
void * sctx;
SocketHandle * sock;
Session * session;
+ size_t size;
clientAddr = MALLOC(sh->max_addr_len);
MUTEX_LOCK(sh->lock);
@@ -491,63 +494,149 @@
"select");
}
}
- if ( (sh->listen_sock != NULL) &&
- (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+ if (sh->is_udp == NO) {
+ if ( (sh->listen_sock != NULL) &&
+ (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+ lenOfIncomingAddr = sh->max_addr_len;
+ memset(clientAddr,
+ 0,
+ lenOfIncomingAddr);
+ s = ACCEPT(sh->listen_sock->handle,
+ (struct sockaddr *) clientAddr,
+ &lenOfIncomingAddr);
+ if (s == -1) {
+ GE_LOG(sh->ectx,
+ GE_WARNING | GE_ADMIN | GE_BULK,
+ "Select %s failed to accept!\n",
+ sh->description);
+ GE_LOG_STRERROR(sh->ectx,
+ GE_WARNING | GE_ADMIN | GE_BULK,
+ "accept");
+ break;
+ } else {
+#if DEBUG_SELECT
+ GE_LOG(sh->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Select %p is accepting connection: %d\n",
+ sh,
+ s);
+#endif
+ sock = socket_create(sh->ectx,
+ sh->load_monitor,
+ s);
+ sctx = sh->ah(sh->ah_cls,
+ sh,
+ sock,
+ clientAddr,
+ lenOfIncomingAddr);
+#if DEBUG_SELECT
+ GE_LOG(sh->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Select %p is accepting connection: %p\n",
+ sh,
+ sctx);
+#endif
+ if (sctx == NULL) {
+ socket_destroy(sock);
+ } else {
+ session = MALLOC(sizeof(Session));
+ memset(session, 0, sizeof(Session));
+ session->sock = sock;
+ session->sock_ctx = sctx;
+ session->lastUse = get_time();
+ if (sh->sessionArrayLength == sh->sessionCount)
+ GROW(sh->sessions,
+ sh->sessionArrayLength,
+ sh->sessionArrayLength + 4);
+ sh->sessions[sh->sessionCount++] = session;
+ }
+ }
+ }
+ } else { /* is_udp == YES */
+ int pending;
+ int udp_sock;
+ int error;
+
+ udp_sock = sh->listen_sock->handle;
lenOfIncomingAddr = sh->max_addr_len;
memset(clientAddr,
0,
lenOfIncomingAddr);
- s = ACCEPT(sh->listen_sock->handle,
- (struct sockaddr *) clientAddr,
- &lenOfIncomingAddr);
- if (s == -1) {
- GE_LOG(sh->ectx,
- GE_WARNING | GE_ADMIN | GE_BULK,
- "Select %s failed to accept!\n",
- sh->description);
+ pending = 0;
+ /* @todo FIXME in PlibC */
+#ifdef MINGW
+ error = ioctlsocket(udp_sock,
+ FIONREAD,
+ &pending);
+#else
+ error = ioctl(udp_sock,
+ FIONREAD,
+ &pending);
+#endif
+ if (error != 0) {
GE_LOG_STRERROR(sh->ectx,
- GE_WARNING | GE_ADMIN | GE_BULK,
- "accept");
- break;
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ "ioctl");
+ pending = 65535; /* max */
+ }
+ GE_ASSERT(sh->ectx, pending >= 0);
+ if (pending == 0) {
+ /* maybe empty UDP packet was sent (see report on bug-gnunet,
+ 5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
+ socket_recv_from(sh->listen_sock,
+ NC_Blocking,
+ NULL,
+ 0,
+ &size,
+ clientAddr,
+ &lenOfIncomingAddr);
+ } else if (pending >= 65536) {
+ GE_BREAK(sh->ectx, 0);
+ socket_close(sh->listen_sock);
} else {
-#if DEBUG_SELECT
- GE_LOG(sh->ectx,
- GE_DEBUG | GE_DEVELOPER | GE_BULK,
- "Select %p is accepting connection: %d\n",
- sh,
- s);
-#endif
- sock = socket_create(sh->ectx,
- sh->load_monitor,
- s);
- sctx = sh->ah(sh->ah_cls,
- sh,
- sock,
- clientAddr,
- lenOfIncomingAddr);
-#if DEBUG_SELECT
- GE_LOG(sh->ectx,
- GE_DEBUG | GE_DEVELOPER | GE_BULK,
- "Select %p is accepting connection: %p\n",
- sh,
- sctx);
-#endif
- if (sctx == NULL) {
- socket_destroy(sock);
+ char * msg;
+
+ msg = MALLOC(pending);
+ size = 0;
+ if (YES != socket_recv_from(sh->listen_sock,
+ NC_Blocking,
+ msg,
+ pending,
+ &size,
+ clientAddr,
+ &lenOfIncomingAddr)) {
+ socket_close(sh->listen_sock);
} else {
- session = MALLOC(sizeof(Session));
- memset(session, 0, sizeof(Session));
- session->sock = sock;
- session->sock_ctx = sctx;
- session->lastUse = get_time();
- if (sh->sessionArrayLength == sh->sessionCount)
- GROW(sh->sessions,
- sh->sessionArrayLength,
- sh->sessionArrayLength + 4);
- sh->sessions[sh->sessionCount++] = session;
+ /* validate msg format! */
+ const MESSAGE_HEADER * hdr;
+
+ hdr = (const MESSAGE_HEADER*) msg;
+ if ( (size == pending) &&
+ (size >= sizeof(MESSAGE_HEADER)) &&
+ (ntohs(hdr->size) == size) ) {
+ void * sctx;
+
+ sctx = sh->ah(sh->ah_cls,
+ sh,
+ NULL,
+ clientAddr,
+ lenOfIncomingAddr);
+ if (sctx != NULL) {
+ sh->mh(sh->mh_cls,
+ sh,
+ NULL,
+ sctx,
+ hdr);
+ sh->ch(sh->ch_cls,
+ sh,
+ NULL,
+ sctx);
+ }
+ }
}
- }
- }
+ FREE(msg);
+ }
+ } /* end UDP processing */
if (FD_ISSET(sh->signal_pipe[0], &readSet)) {
/* allow reading multiple signals in one go in case we get many
in one shot... */
@@ -638,6 +727,7 @@
* @return NULL on error
*/
SelectHandle * select_create(const char * description,
+ int is_udp,
struct GE_Context * ectx,
struct LoadMonitor * mon,
int sock,
@@ -652,16 +742,18 @@
unsigned int memory_quota) {
SelectHandle * sh;
- if ( (0 != LISTEN(sock, 5)) &&
- (errno != EOPNOTSUPP) ) { /* udp: not supported */
+ if ( (is_udp == NO) &&
+ (0 != LISTEN(sock, 5)) ) {
GE_LOG_STRERROR(ectx,
GE_ERROR | GE_USER | GE_IMMEDIATE,
"listen");
- return NULL;
+ return NULL;
}
+ GE_ASSERT(ectx, description != NULL);
sh = MALLOC(sizeof(SelectHandle));
+ memset(sh, 0, sizeof(SelectHandle));
+ sh->is_udp = is_udp;
sh->description = description;
- memset(sh, 0, sizeof(SelectHandle));
if (0 != PIPE(sh->signal_pipe)) {
GE_LOG_STRERROR(ectx,
GE_ERROR | GE_USER | GE_IMMEDIATE,
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-09-14 05:09:41 UTC (rev 3398)
+++ GNUnet/todo 2006-09-14 06:15:21 UTC (rev 3399)
@@ -18,6 +18,7 @@
+ loggers: SMTP logger
+ use new loggers in for CS error reporting
* make testcases compile & pass again:
+ + tbench_udp (bugs in udp transport?)
+ gap -- does not yet compile
+ fs/namespace -- does not yet compile
+ fs/fsui -- downloadtest does not yet compile
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3399 - in GNUnet: . src/include src/server src/transports src/util/network,
grothoff <=