[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3403 - in GNUnet: . src/applications/fragmentation src/inc
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3403 - in GNUnet: . src/applications/fragmentation src/include src/server src/transports src/util/network |
Date: |
Thu, 14 Sep 2006 23:24:32 -0700 (PDT) |
Author: grothoff
Date: 2006-09-14 23:24:24 -0700 (Thu, 14 Sep 2006)
New Revision: 3403
Modified:
GNUnet/ChangeLog
GNUnet/src/applications/fragmentation/fragmentation.c
GNUnet/src/include/gnunet_util_network.h
GNUnet/src/server/connection.c
GNUnet/src/transports/udp.c
GNUnet/src/transports/udp6.c
GNUnet/src/transports/udp_helper.c
GNUnet/src/util/network/select.c
Log:
fixing udp transport, also making fragmented udp perform much better
Modified: GNUnet/ChangeLog
===================================================================
--- GNUnet/ChangeLog 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/ChangeLog 2006-09-15 06:24:24 UTC (rev 3403)
@@ -1,3 +1,9 @@
+Thu Sep 14 23:44:17 MDT 2006
+ Breaking UDP transport protocol compatibility -- some
+ fields have to be switched around to make it work with
+ the new select code. Expect to see some warnings when
+ interacting with 0.7.0 peers.
+
Tue Sep 5 21:28:25 PDT 2006
Switched ncurses interface of gnunet-setup to use
external dialog library (new dependency!).
Modified: GNUnet/src/applications/fragmentation/fragmentation.c
===================================================================
--- GNUnet/src/applications/fragmentation/fragmentation.c 2006-09-14
15:53:51 UTC (rev 3402)
+++ GNUnet/src/applications/fragmentation/fragmentation.c 2006-09-15
06:24:24 UTC (rev 3403)
@@ -210,9 +210,11 @@
FRAGSIZE(pos));
pos = pos->link;
}
-
if (stats != NULL)
stats->change(stat_defragmented, 1);
+#if 0
+ printf("Finished defragmentation!\n");
+#endif
/* handle message! */
coreAPI->injectMessage(&pep->sender,
msg,
@@ -263,7 +265,12 @@
return SYSERR; /* wrong fragment list, try another! */
if (ntohl(packet->id) != entry->id)
return SYSERR; /* wrong fragment list, try another! */
-
+#if 0
+ printf("Received fragment %u from %u to %u\n",
+ ntohl(packet->id),
+ ntohs(packet->off),
+ ntohs(packet->off) + ntohs(packet->header.size) -
sizeof(P2P_fragmentation_MESSAGE));
+#endif
pos = entry->head;
if ( (pos != NULL) &&
(packet->len != pos->frag->len) )
Modified: GNUnet/src/include/gnunet_util_network.h
===================================================================
--- GNUnet/src/include/gnunet_util_network.h 2006-09-14 15:53:51 UTC (rev
3402)
+++ GNUnet/src/include/gnunet_util_network.h 2006-09-15 06:24:24 UTC (rev
3403)
@@ -179,10 +179,10 @@
* socket should be closed
*/
typedef int (*SelectMessageHandler)(void * mh_cls,
- struct SelectHandle * sh,
- struct SocketHandle * sock,
- void * sock_ctx,
- const MESSAGE_HEADER * msg);
+ struct SelectHandle * sh,
+ struct SocketHandle * sock,
+ void * sock_ctx,
+ const MESSAGE_HEADER * msg);
/**
* We've accepted a connection, check that
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/src/server/connection.c 2006-09-15 06:24:24 UTC (rev 3403)
@@ -1323,8 +1323,9 @@
* message to the transport service.
*
* @param be connection of the buffer that is to be transmitted
+ * @return YES if we might want to be re-run
*/
-static void sendBuffer(BufferEntry * be) {
+static int sendBuffer(BufferEntry * be) {
unsigned int i;
unsigned int j;
unsigned int p;
@@ -1340,30 +1341,28 @@
ENTRY();
/* fast ways out */
- if(be == NULL) {
+ if (be == NULL) {
GE_BREAK(ectx, 0);
- return;
+ return SYSERR;
}
if ( (be->status != STAT_UP) ||
(be->sendBufferSize == 0) ||
(be->inSendBuffer == YES) ) {
- return; /* must not run */
+ return NO; /* must not run */
}
be->inSendBuffer = YES;
if ( (OK != ensureTransportConnected(be)) ||
- (be->sendBufferSize == 0) ||
(OK != checkSendFrequency(be)) ){
be->inSendBuffer = NO;
#if 0
GE_LOG(ectx,
GE_DEBUG | GE_DEVELOPER | GE_BULK,
- "Will not try to send: %d %d %d\n",
+ "Will not try to send: %d %d\n",
(OK != ensureTransportConnected(be)),
- (be->sendBufferSize == 0),
(OK != checkSendFrequency(be)));
#endif
- return;
+ return NO;
}
/* test if receiver has enough bandwidth available! */
@@ -1380,27 +1379,27 @@
if (totalMessageSize == 0) {
expireSendBufferEntries(be);
be->inSendBuffer = NO;
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
GE_LOG(ectx,
GE_DEBUG | GE_DEVELOPER | GE_BULK,
"No messages selected for sending (%d)\n",
be->available_send_window);
#endif
- return; /* deferr further */
+ return NO; /* deferr further */
}
GE_ASSERT(ectx, totalMessageSize > sizeof(P2P_PACKET_HEADER));
/* check if we (sender) have enough bandwidth available
if so, trigger callbacks on selected entries; if either
fails, return (but clean up garbage) */
- if ((SYSERR == outgoingCheck(priority)) ||
- (0 == prepareSelectedMessages(be))) {
+ if ( (SYSERR == outgoingCheck(priority)) ||
+ (0 == prepareSelectedMessages(be)) ) {
GE_LOG(ectx,
GE_DEBUG | GE_DEVELOPER | GE_BULK,
"Insufficient bandwidth or priority to send message\n");
expireSendBufferEntries(be);
be->inSendBuffer = NO;
- return; /* deferr further */
+ return NO; /* deferr further */
}
/* get permutation of SendBuffer Entries
@@ -1495,7 +1494,8 @@
encryptedMsg,
p,
NO);
- if((ret == NO) && (priority >= EXTREME_PRIORITY)) {
+ if ( (ret == NO) &&
+ (priority >= EXTREME_PRIORITY) ) {
ret = transport->send(be->session.tsession,
encryptedMsg,
p,
@@ -1541,6 +1541,7 @@
FREE(plaintextMsg);
expireSendBufferEntries(be);
be->inSendBuffer = NO;
+ return NO;
}
/**
@@ -1591,7 +1592,8 @@
se->len,
&enc);
#endif
- if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
+ if ( (be->sendBufferSize > 0) &&
+ (be->status != STAT_UP) ) {
/* as long as we do not have a confirmed
connection, do NOT queue messages! */
#if DEBUG_CONNECTION
@@ -2275,19 +2277,19 @@
* @param unused not used, just to make signature type nicely
*/
static void cronDecreaseLiveness(void *unused) {
- BufferEntry *root;
- BufferEntry *prev;
- BufferEntry *tmp;
+ BufferEntry * root;
+ BufferEntry * prev;
+ BufferEntry * tmp;
cron_t now;
int i;
scheduleInboundTraffic();
now = get_time();
MUTEX_LOCK(lock);
- for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
+ for (i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
root = CONNECTION_buffer_[i];
prev = NULL;
- while(NULL != root) {
+ while (NULL != root) {
switch (root->status) {
case STAT_DOWN:
/* just compact linked list */
@@ -2300,15 +2302,20 @@
FREE(tmp);
continue; /* no need to call 'send buffer' */
case STAT_UP:
- if((now > root->isAlive) && /* concurrency might make this false... */
- (now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS)) {
+ if ( (now > root->isAlive) && /* concurrency might make this false...
*/
+ (now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS)) {
EncName enc;
/* switch state form UP to DOWN: too much inactivity */
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&root->session.sender.hashPubKey, &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "closing connection with `%s': "
- "too much inactivity (%llu ms)\n", &enc, now - root->isAlive);
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ hash2enc(&root->session.sender.hashPubKey, &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "closing connection with `%s': "
+ "too much inactivity (%llu ms)\n",
+ &enc,
+ now - root->isAlive);
shutdownConnection(root);
/* the host may still be worth trying again soon: */
identity->whitelistHost(&root->session.sender);
@@ -2325,12 +2332,12 @@
msgBuf = MALLOC(60000);
pos = scl_nextHead;
- while(pos != NULL) {
- if(pos->minimumPadding <= 60000) {
+ while (pos != NULL) {
+ if (pos->minimumPadding <= 60000) {
mSize = pos->callback(&root->session.sender,
msgBuf,
60000);
- if(mSize > 0)
+ if (mSize > 0)
unicast(&root->session.sender,
(MESSAGE_HEADER *) msgBuf,
0,
@@ -2340,16 +2347,20 @@
}
FREE(msgBuf);
}
-
break;
default: /* not up, not down - partial SETKEY exchange */
- if((now > root->isAlive) &&
- (now - root->isAlive > SECONDS_NOPINGPONG_DROP * cronSECONDS)) {
+ if ( (now > root->isAlive) &&
+ (now - root->isAlive > SECONDS_NOPINGPONG_DROP * cronSECONDS)) {
EncName enc;
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&root->session.sender.hashPubKey, &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "closing connection to %s: %s not answered.\n",
- &enc, (root->status == STAT_SETKEY_SENT) ? "SETKEY" : "PING");
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ hash2enc(&root->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "closing connection to %s: %s not answered.\n",
+ &enc,
+ (root->status == STAT_SETKEY_SENT) ? "SETKEY" : "PING");
shutdownConnection(root);
}
break;
@@ -2359,7 +2370,6 @@
root = root->overflowChain;
} /* end of while */
} /* for all buckets */
-
MUTEX_UNLOCK(lock);
}
@@ -2863,10 +2873,18 @@
GE_ASSERT(ectx,
CONNECTION_MAX_HOSTS_ != 0);
registerp2pHandler(P2P_PROTO_hangup, &handleHANGUP);
+ /* note: should we see that this cron job takes
+ excessive amounts of CPU on some systems, we
+ may consider adding an OPTION to reduce the
+ frequency. However, on my system, larger
+ values significantly impact the performance
+ of the UDP transport for large (fragmented)
+ messages -- and 10ms does not cause any noticeable
+ CPU load during testing. */
cron_add_job(cron,
&cronDecreaseLiveness,
- 1 * cronSECONDS,
- 1 * cronSECONDS,
+ 10 * cronMILLIS,
+ 10 * cronMILLIS,
NULL);
#if DEBUG_COLLECT_PRIO == YES
prioFile = FOPEN("/tmp/knapsack_prio.txt", "w");
@@ -3137,7 +3155,9 @@
* @param msg the message to transmit, should contain MESSAGE_HEADERs
* @return OK on success, SYSERR on failure, NO on temporary failure
*/
-int sendPlaintext(TSession * tsession, const char *msg, unsigned int size) {
+int sendPlaintext(TSession * tsession,
+ const char *msg,
+ unsigned int size) {
char *buf;
int ret;
P2P_PACKET_HEADER *hdr;
Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/src/transports/udp.c 2006-09-15 06:24:24 UTC (rev 3403)
@@ -249,8 +249,7 @@
const void * message,
const unsigned int size,
int important) {
- char * msg;
- UDPMessage mp;
+ UDPMessage * mp;
P2P_hello_MESSAGE * helo;
HostAddress * haddr;
struct sockaddr_in sin; /* an Internet endpoint address */
@@ -274,14 +273,11 @@
haddr = (HostAddress*) &helo[1];
ssize = size + sizeof(UDPMessage);
- msg = MALLOC(ssize);
- mp.header.size = htons(ssize);
- mp.header.type = 0;
- mp.sender = *(coreAPI->myIdentity);
- memcpy(&msg[size],
- &mp,
- sizeof(UDPMessage));
- memcpy(msg,
+ mp = MALLOC(ssize);
+ mp->header.size = htons(ssize);
+ mp->header.type = 0;
+ mp->sender = *(coreAPI->myIdentity);
+ memcpy(&mp[1],
message,
size);
ok = SYSERR;
@@ -294,15 +290,16 @@
&haddr->senderIP,
sizeof(IPaddr));
#if DEBUG_UDP
- GE_LOG(ectx, GE_DEBUG | GE_USER | GE_BULK,
- "Sending message of %d bytes via UDP to %u.%u.%u.%u:%u.\n",
- ssize,
- PRIP(ntohl(*(int*)&sin.sin_addr)),
- ntohs(sin.sin_port));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_USER | GE_BULK,
+ "Sending message of %d bytes via UDP to %u.%u.%u.%u:%u.\n",
+ ssize,
+ PRIP(ntohl(*(int*)&sin.sin_addr)),
+ ntohs(sin.sin_port));
#endif
if (YES == socket_send_to(udp_sock,
NC_Nonblocking,
- msg,
+ mp,
ssize,
&sent,
(const char *) &sin,
@@ -323,7 +320,7 @@
stats->change(stat_bytesDropped,
ssize);
}
- FREE(msg);
+ FREE(mp);
return ok;
}
Modified: GNUnet/src/transports/udp6.c
===================================================================
--- GNUnet/src/transports/udp6.c 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/src/transports/udp6.c 2006-09-15 06:24:24 UTC (rev 3403)
@@ -243,8 +243,7 @@
const void * message,
const unsigned int size,
int importance) {
- char * msg;
- UDPMessage mp;
+ UDPMessage * mp;
P2P_hello_MESSAGE * helo;
Host6Address * haddr;
struct sockaddr_in6 sin; /* an Internet endpoint address */
@@ -270,14 +269,11 @@
haddr = (Host6Address*) &helo[1];
ssize = size + sizeof(UDPMessage);
- msg = MALLOC(ssize);
- mp.header.size = htons(ssize);
- mp.header.type = 0;
- mp.sender = *coreAPI->myIdentity;
- memcpy(&msg[size],
- &mp,
- sizeof(UDPMessage));
- memcpy(msg,
+ mp = MALLOC(ssize);
+ mp->header.size = htons(ssize);
+ mp->header.type = 0;
+ mp->sender = *coreAPI->myIdentity;
+ memcpy(&mp[1],
message,
size);
ok = SYSERR;
@@ -300,7 +296,7 @@
#endif
if (YES == socket_send_to(udp_sock,
NC_Nonblocking,
- msg,
+ mp,
ssize,
&ssize,
(const char*) &sin,
@@ -317,7 +313,7 @@
stats->change(stat_bytesDropped,
ssize);
}
- FREE(msg);
+ FREE(mp);
return ok;
}
Modified: GNUnet/src/transports/udp_helper.c
===================================================================
--- GNUnet/src/transports/udp_helper.c 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/src/transports/udp_helper.c 2006-09-15 06:24:24 UTC (rev 3403)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other
contributing authors)
+ (C) 2001, 2002, 2003, 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -32,11 +32,6 @@
*/
typedef struct {
/**
- * this struct is *preceded* by MESSAGE_PARTs - until
- * size-sizeof(UDPMessage)!
- */
-
- /**
* size of the message, in bytes, including this header.
*/
MESSAGE_HEADER header;
Modified: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c 2006-09-14 15:53:51 UTC (rev 3402)
+++ GNUnet/src/util/network/select.c 2006-09-15 06:24:24 UTC (rev 3403)
@@ -553,88 +553,118 @@
}
}
} else { /* is_udp == YES */
- int pending;
- int udp_sock;
- int error;
-
- udp_sock = sh->listen_sock->handle;
- lenOfIncomingAddr = sh->max_addr_len;
- memset(clientAddr,
- 0,
- lenOfIncomingAddr);
- pending = 0;
- /* @todo FIXME in PlibC */
+ if ( (sh->listen_sock != NULL) &&
+ (FD_ISSET(sh->listen_sock->handle, &readSet)) ) {
+ int pending;
+ int udp_sock;
+ int error;
+
+ udp_sock = sh->listen_sock->handle;
+ lenOfIncomingAddr = sh->max_addr_len;
+ memset(clientAddr,
+ 0,
+ lenOfIncomingAddr);
+ pending = 0;
+ /* @todo FIXME in PlibC */
#ifdef MINGW
- error = ioctlsocket(udp_sock,
- FIONREAD,
- &pending);
+ error = ioctlsocket(udp_sock,
+ FIONREAD,
+ &pending);
#else
- error = ioctl(udp_sock,
- FIONREAD,
- &pending);
+ error = ioctl(udp_sock,
+ FIONREAD,
+ &pending);
#endif
- if (error != 0) {
- GE_LOG_STRERROR(sh->ectx,
- GE_ERROR | GE_ADMIN | GE_BULK,
- "ioctl");
- pending = 65535; /* max */
- }
- GE_ASSERT(sh->ectx, pending >= 0);
- if (pending == 0) {
- /* maybe empty UDP packet was sent (see report on bug-gnunet,
- 5/11/6; read 0 bytes from UDP just to kill potential empty packet! */
- socket_recv_from(sh->listen_sock,
- NC_Blocking,
- NULL,
- 0,
- &size,
- clientAddr,
- &lenOfIncomingAddr);
- } else if (pending >= 65536) {
- GE_BREAK(sh->ectx, 0);
- socket_close(sh->listen_sock);
- } else {
- char * msg;
-
- msg = MALLOC(pending);
- size = 0;
- if (YES != socket_recv_from(sh->listen_sock,
- NC_Blocking,
- msg,
- pending,
- &size,
- clientAddr,
- &lenOfIncomingAddr)) {
+ if (error != 0) {
+ GE_LOG_STRERROR(sh->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ "ioctl");
+ pending = 65535; /* max */
+ }
+#if DEBUG_SELECT
+ GE_LOG(sh->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Select %p is preparing to receive %u bytes\n",
+ sh,
+ pending);
+#endif
+ GE_ASSERT(sh->ectx, pending >= 0);
+ if (pending == 0) {
+ /* maybe empty UDP packet was sent (see report on bug-gnunet,
+ 5/11/6; read 0 bytes from UDP just to kill potential empty packet!
*/
+ socket_recv_from(sh->listen_sock,
+ NC_Nonblocking,
+ NULL,
+ 0,
+ &size,
+ clientAddr,
+ &lenOfIncomingAddr);
+ } else if (pending >= 65536) {
+ GE_BREAK(sh->ectx, 0);
socket_close(sh->listen_sock);
} else {
- /* validate msg format! */
- const MESSAGE_HEADER * hdr;
-
- hdr = (const MESSAGE_HEADER*) msg;
- if ( (size == pending) &&
- (size >= sizeof(MESSAGE_HEADER)) &&
- (ntohs(hdr->size) == size) ) {
- void * sctx;
+ char * msg;
+
+ msg = MALLOC(pending);
+ size = 0;
+ if (YES != socket_recv_from(sh->listen_sock,
+ NC_Blocking,
+ msg,
+ pending,
+ &size,
+ clientAddr,
+ &lenOfIncomingAddr)) {
+#if DEBUG_SELECT
+ GE_LOG(sh->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Error in select %p -- failed to receive %u bytes\n",
+ sh,
+ pending);
+#endif
+ socket_close(sh->listen_sock);
+ } else {
+ /* validate msg format! */
+ const MESSAGE_HEADER * hdr;
- sctx = sh->ah(sh->ah_cls,
- sh,
- NULL,
- clientAddr,
- lenOfIncomingAddr);
- if (sctx != NULL) {
- sh->mh(sh->mh_cls,
- sh,
- NULL,
- sctx,
- hdr);
- sh->ch(sh->ch_cls,
- sh,
- NULL,
- sctx);
+ hdr = (const MESSAGE_HEADER*) msg;
+ if ( (size == pending) &&
+ (size >= sizeof(MESSAGE_HEADER)) &&
+ (ntohs(hdr->size) == size) ) {
+ void * sctx;
+
+ sctx = sh->ah(sh->ah_cls,
+ sh,
+ NULL,
+ clientAddr,
+ lenOfIncomingAddr);
+ if (sctx != NULL) {
+ sh->mh(sh->mh_cls,
+ sh,
+ NULL,
+ sctx,
+ hdr);
+ sh->ch(sh->ch_cls,
+ sh,
+ NULL,
+ sctx);
+ } else {
+#if DEBUG_SELECT
+ GE_LOG(sh->ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Error in select %p -- connection refused\n",
+ sh);
+#endif
+ }
+ } else {
+#if DEBUG_SELECT
+ GE_BREAK(sh->ectx, size == pending);
+ GE_BREAK(sh->ectx, size >= sizeof(MESSAGE_HEADER));
+ GE_BREAK(sh->ectx, (size >= sizeof(MESSAGE_HEADER)) &&
(ntohs(hdr->size) == size));
+#endif
}
}
+ FREE(msg);
}
- FREE(msg);
}
} /* end UDP processing */
if (FD_ISSET(sh->signal_pipe[0], &readSet)) {
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3403 - in GNUnet: . src/applications/fragmentation src/include src/server src/transports src/util/network,
grothoff <=