[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r1555 - in GNUnet/src: applications/fs applications/fs/modu
From: |
grothoff |
Subject: |
[GNUnet-SVN] r1555 - in GNUnet/src: applications/fs applications/fs/module applications/gap applications/pingpong applications/session applications/transport include server transports |
Date: |
Sat, 16 Jul 2005 08:51:39 -0700 (PDT) |
Author: grothoff
Date: 2005-07-16 08:51:25 -0700 (Sat, 16 Jul 2005)
New Revision: 1555
Modified:
GNUnet/src/applications/fs/ecrs_core.c
GNUnet/src/applications/fs/module/fs.c
GNUnet/src/applications/gap/gap.c
GNUnet/src/applications/pingpong/pingpong.c
GNUnet/src/applications/session/connect.c
GNUnet/src/applications/transport/transport.c
GNUnet/src/include/ecrs_core.h
GNUnet/src/server/connection.c
GNUnet/src/transports/tcp.c
GNUnet/src/transports/udp.c
Log:
code cleanup plus additional assertions
Modified: GNUnet/src/applications/fs/ecrs_core.c
===================================================================
--- GNUnet/src/applications/fs/ecrs_core.c 2005-07-16 13:31:41 UTC (rev
1554)
+++ GNUnet/src/applications/fs/ecrs_core.c 2005-07-16 15:51:25 UTC (rev
1555)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -104,6 +105,8 @@
/**
* Get the query that will be used to query for
* a certain block of data.
+ *
+ * @param db the block in plaintext
*/
void fileBlockGetQuery(const DBlock * db,
unsigned int len,
@@ -156,13 +159,16 @@
unsigned int type;
type = getTypeOfBlock(size, data);
- if (type == ANY_BLOCK)
+ if (type == ANY_BLOCK) {
+ BREAK();
return SYSERR;
+ }
switch (type) {
case D_BLOCK:
- fileBlockGetKey(data,
- size,
- query);
+ /* CHK: hash of content == query */
+ hash(&data[1],
+ size - sizeof(DBlock),
+ query);
return OK;
case S_BLOCK: {
const SBlock * sb;
Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c 2005-07-16 13:31:41 UTC (rev
1554)
+++ GNUnet/src/applications/fs/module/fs.c 2005-07-16 15:51:25 UTC (rev
1555)
@@ -87,7 +87,7 @@
/**
* Store an item in the datastore.
*
- * @param key the key of the item
+ * @param query the unique identifier of the item
* @param value the value to store
* @param prio how much does our routing code value
* this datum?
@@ -96,7 +96,7 @@
* SYSERR if the value is malformed
*/
static int gapPut(void * closure,
- const HashCode512 * key,
+ const HashCode512 * query,
const DataContainer * value,
unsigned int prio) {
Datastore_Value * dv;
@@ -119,7 +119,7 @@
if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
(DBlock*)&gw[1],
&hc)) ||
- (! equalsHashCode512(&hc, key)) ) {
+ (! equalsHashCode512(&hc, query)) ) {
LOG(LOG_ERROR,
"Type: %u\n",
ntohl(*(unsigned int*) &gw[1]));
@@ -149,37 +149,37 @@
ntohl(dv->size) - sizeof(Datastore_Value),
(DBlock*) &dv[1],
0,
- key)) {
+ query)) {
BREAK();
FREE(dv);
return SYSERR;
}
- processResponse(key, dv);
+ processResponse(query, dv);
IFLOG(LOG_DEBUG,
- hash2enc(key,
+ hash2enc(query,
&enc));
LOG(LOG_DEBUG,
"FS received GAP-PUT request (query: %s)\n",
&enc);
- ret = datastore->putUpdate(key,
+ ret = datastore->putUpdate(query,
dv);
FREE(dv);
return ret;
}
-static int get_result_callback(const HashCode512 * key,
+static int get_result_callback(const HashCode512 * query,
const DataContainer * value,
DHT_GET_CLS * cls) {
EncName enc;
IFLOG(LOG_DEBUG,
- hash2enc(key,
+ hash2enc(query,
&enc));
LOG(LOG_DEBUG,
- "Found reply to query %s.\n",
+ "Found reply to query '%s'.\n",
&enc);
gapPut(NULL,
- key,
+ query,
value,
cls->prio);
return OK;
@@ -944,6 +944,17 @@
static Blockstore dsGap;
static Blockstore dsDht;
+
+ GNUNET_ASSERT(sizeof(CHK) == 128);
+ GNUNET_ASSERT(sizeof(DBlock) == 4);
+ GNUNET_ASSERT(sizeof(IBlock) == 132);
+ GNUNET_ASSERT(sizeof(FileIdentifier) == 136);
+ GNUNET_ASSERT(sizeof(KBlock) == 524);
+ GNUNET_ASSERT(sizeof(SBlock) == 732);
+ GNUNET_ASSERT(sizeof(NBlock) == 716);
+ GNUNET_ASSERT(sizeof(KNBlock) == 1244);
+
+
hash("GNUNET_FS",
strlen("GNUNET_FS"),
&dht_table);
Modified: GNUnet/src/applications/gap/gap.c
===================================================================
--- GNUnet/src/applications/gap/gap.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/applications/gap/gap.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -794,9 +795,12 @@
EncName encq;
EncName encp;
- if (equalsHashCode512(&id->hashPubKey,
- &qr->noTarget.hashPubKey))
+ if ( (equalsHashCode512(&id->hashPubKey,
+ &qr->noTarget.hashPubKey)) ||
+ (equalsHashCode512(&id->hashPubKey,
+ &qr->msg->returnTo.hashPubKey)) )
return;
+
if (getBit(qr, getIndex(id)) == 1) {
IFLOG(LOG_DEBUG,
hash2enc(&id->hashPubKey,
@@ -998,7 +1002,8 @@
/**
* Call useContent "later" and then free the pmsg.
*/
-static void useContentLater(GAP_REPLY * pmsg) {
+static void useContentLater(void * data) {
+ GAP_REPLY * pmsg = data;
useContent(NULL,
pmsg);
FREE(pmsg);
@@ -1059,7 +1064,7 @@
size - sizeof(GAP_REPLY));
/* delay reply, delay longer if we are busy (makes it harder
to predict / analyze, too). */
- addCronJob((CronJob)&useContentLater,
+ addCronJob(&useContentLater,
randomi(TTL_DECREMENT),
0,
pmsg);
@@ -2001,9 +2006,11 @@
memcpy(qmsg, msg, ntohs(msg->size));
if (equalsHashCode512(&qmsg->returnTo.hashPubKey,
&coreAPI->myIdentity->hashPubKey)) {
- /* A to B, B sends back to A without (!) source rewriting,
+ /* A to B, B sends to C without source rewriting,
+ C sends back to A again without source rewriting;
+ (or B directly back to A; also should not happen)
in this case, A must just drop; however, this
- should never happen. */
+ should not happen (peers should check). */
BREAK();
FREE(qmsg);
return OK;
@@ -2088,6 +2095,9 @@
static GAP_ServiceAPI api;
unsigned int i;
+ GNUNET_ASSERT(sizeof(GAP_REPLY) == 68);
+ GNUNET_ASSERT(sizeof(GAP_QUERY) == 144);
+
coreAPI = capi;
GROW(rewards,
rewardSize,
Modified: GNUnet/src/applications/pingpong/pingpong.c
===================================================================
--- GNUnet/src/applications/pingpong/pingpong.c 2005-07-16 13:31:41 UTC (rev
1554)
+++ GNUnet/src/applications/pingpong/pingpong.c 2005-07-16 15:51:25 UTC (rev
1555)
@@ -448,6 +448,7 @@
provide_module_pingpong(CoreAPIForApplication * capi) {
static Pingpong_ServiceAPI ret;
+ GNUNET_ASSERT(sizeof(PINGPONG_Message) == 72);
coreAPI = capi;
identity = capi->requestService("identity");
if (identity == NULL) {
Modified: GNUnet/src/applications/session/connect.c
===================================================================
--- GNUnet/src/applications/session/connect.c 2005-07-16 13:31:41 UTC (rev
1554)
+++ GNUnet/src/applications/session/connect.c 2005-07-16 15:51:25 UTC (rev
1555)
@@ -695,6 +695,7 @@
provide_module_session(CoreAPIForApplication * capi) {
static Session_ServiceAPI ret;
+ GNUNET_ASSERT(sizeof(SKEY_Message) == 520)
coreAPI = capi;
identity = capi->requestService("identity");
if (identity == NULL) {
Modified: GNUnet/src/applications/transport/transport.c
===================================================================
--- GNUnet/src/applications/transport/transport.c 2005-07-16 13:31:41 UTC
(rev 1554)
+++ GNUnet/src/applications/transport/transport.c 2005-07-16 15:51:25 UTC
(rev 1555)
@@ -611,6 +611,7 @@
void * lib;
EncName myself;
+ GNUNET_ASSERT(sizeof(HELO_Message) == 600);
identity = capi->requestService("identity");
if (identity == NULL) {
BREAK();
Modified: GNUnet/src/include/ecrs_core.h
===================================================================
--- GNUnet/src/include/ecrs_core.h 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/include/ecrs_core.h 2005-07-16 15:51:25 UTC (rev 1555)
@@ -1,5 +1,6 @@
/*
This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/server/connection.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -823,97 +823,15 @@
}
/**
- * Send a buffer; assumes that access is already synchronized. This
- * message solves the knapsack problem, assembles the message
- * (callback to build parts from knapsack, callbacks for padding,
- * random noise padding, crc, encryption) and finally hands the
- * message to the transport service.
+ * Check that the send frequency for this
+ * buffer is not too high.
*
- * @param be connection of the buffer that is to be transmitted
+ * @return OK if sending a message now is acceptable
*/
-static void sendBuffer(BufferEntry * be) {
- unsigned int i;
- unsigned int j;
- unsigned int p;
- unsigned int rsi;
- SendCallbackList * pos;
- P2P_Message * p2pHdr;
- int priority;
- int * perm;
- char * tmpMsg;
- char * plaintextMsg;
- void * encryptedMsg;
- cron_t expired;
- int headpos;
- int tailpos;
- int approxProb;
- int remainingBufferSize;
- unsigned int totalMessageSize;
- int ret;
-
- ENTRY();
- /* fast ways out */
- if (be == NULL) {
- BREAK();
- return;
- }
- if (be->status != STAT_UP)
- return; /* status is not up, cannot send! */
- if (be->sendBufferSize == 0)
- return; /* nothing to send */
-
- if (be->inSendBuffer == YES)
- return; /* must not recurse! */
- be->inSendBuffer = YES;
-
- /* recompute max send frequency */
- if (be->max_bpm <= 0)
+static int checkSendFrequency(BufferEntry * be) {
+ if (be->max_bpm == 0)
be->max_bpm = 1;
- if (be->session.tsession == NULL) {
- be->session.tsession
- = transport->connectFreely(&be->session.sender,
- YES);
- if (be->session.tsession == NULL) {
- be->inSendBuffer = NO;
- return;
- }
- be->session.mtu
- = transport->getMTU(be->session.tsession->ttype);
- if (be->session.mtu > 0) {
- /* MTU change may require new fragmentation! */
- SendEntry ** entries;
- SendEntry * entry;
-
- entries = be->sendBuffer;
- i = 0;
- ret = be->sendBufferSize;
- /* assumes entries are sorted by priority! */
- while (i < ret) {
- entry = entries[i];
- if (entry->len > be->session.mtu - sizeof(P2P_Message)) {
- entries[i] = entries[--ret];
- fragmentation->fragment(&be->session.sender,
- be->session.mtu - sizeof(P2P_Message),
- entry->pri,
- entry->transmissionTime,
- entry->len,
- entry->callback,
- entry->closure);
- FREE(entry);
- }
- i++;
- }
- if (ret != be->sendBufferSize)
- GROW(be->sendBuffer,
- be->sendBufferSize,
- ret);
- }
- }
- if (be->sendBufferSize == 0)
- return; /* nothing to send */
-
-
if (be->session.mtu == 0) {
be->MAX_SEND_FREQUENCY = /* ms per message */
EXPECTED_MTU
@@ -929,81 +847,85 @@
solutions for any MIN_SAMPLE_TIME! */
if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
-
+
if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
(be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"Send frequency too high (CPU load), send deferred.\n");
#endif
- be->inSendBuffer = NO;
- return; /* frequency too high, wait */
+ return NO; /* frequency too high, wait */
}
+ return OK;
+}
- /* test if receiver has enough bandwidth available! */
- updateCurBPS(be);
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "receiver window available: %lld bytes (MTU: %u)\n",
- be->available_send_window,
- be->session.mtu);
-#endif
+/**
+ * Select a subset of the messages for sending.
+ *
+ * @param *priority is set to the achieved message priority
+ * @return total number of bytes of messages selected
+ */
+static unsigned int selectMessagesToSend(BufferEntry * be,
+ unsigned int * priority) {
+ unsigned int totalMessageSize;
+ SendEntry * entry;
+ int i;
+ int j;
+ int approxProb;
+
+ totalMessageSize = 0;
+ (*priority) = 0;
-
if (be->session.mtu == 0) {
- SendEntry ** entries;
-
- entries = be->sendBuffer;
totalMessageSize = sizeof(P2P_Message);
- priority = 0;
i = 0;
/* assumes entries are sorted by priority! */
while (i < be->sendBufferSize) {
- if ( (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) &&
- (entries[i]->pri >= EXTREME_PRIORITY) ) {
- entries[i]->knapsackSolution = YES;
- priority += entries[i]->pri;
- totalMessageSize += entries[i]->len;
+ entry = be->sendBuffer[i];
+ if ( (totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
+ (entry->pri >= EXTREME_PRIORITY) ) {
+ entry->knapsackSolution = YES;
+ (*priority) += entry->pri;
+ totalMessageSize += entry->len;
} else {
- entries[i]->knapsackSolution = NO;
+ entry->knapsackSolution = NO;
break;
}
i++;
}
if ( (i == 0) &&
- (entries[i]->len <= be->available_send_window) )
- return; /* always wait for the highest-priority
+ (be->sendBuffer[i]->len <= be->available_send_window) )
+ return 0; /* always wait for the highest-priority
message (otherwise large messages may
starve! */
while ( (i < be->sendBufferSize) &&
(be->available_send_window > totalMessageSize) ) {
- if ( (entries[i]->len + totalMessageSize <=
+ entry = be->sendBuffer[i];
+ if ( (entry->len + totalMessageSize <=
be->available_send_window) &&
- (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) ) {
- entries[i]->knapsackSolution = YES;
- totalMessageSize += entries[i]->len;
- priority += entries[i]->pri;
+ (totalMessageSize + entry->len < MAX_BUFFER_SIZE) ) {
+ entry->knapsackSolution = YES;
+ totalMessageSize += entry->len;
+ (*priority) += entry->pri;
} else {
- entries[i]->knapsackSolution = NO;
+ entry->knapsackSolution = NO;
if (totalMessageSize == sizeof(P2P_Message)) {
/* if the highest-priority message does not yet
fit, wait for send window to grow so that
we can get it out (otherwise we would starve
high-priority, large messages) */
- be->inSendBuffer = NO;
- return;
+ return 0;
}
}
i++;
}
if ( (totalMessageSize == sizeof(P2P_Message)) ||
- ( (priority < EXTREME_PRIORITY) &&
+ ( ((*priority) < EXTREME_PRIORITY) &&
((totalMessageSize / sizeof(P2P_Message)) < 4) &&
(randomi(16) != 0) ) ) {
/* randomization necessary to ensure we eventually send
a small message if there is nothing else to do! */
- be->inSendBuffer = NO;
- return;
+ return 0;
}
} else { /* if (be->session.mtu == 0) */
/* solve knapsack problem, compute accumulated priority */
@@ -1015,23 +937,29 @@
approxProb *= 2; /* now value between 0 [always approx] and 100 [never
approx] */
/* control CPU load probabilistically! */
if (randomi(1+approxProb) == 0) {
- priority = approximateKnapsack(be,
- be->session.mtu - sizeof(P2P_Message));
+ (*priority) = approximateKnapsack(be,
+ be->session.mtu -
sizeof(P2P_Message));
#if DEBUG_COLLECT_PRIO == YES
FPRINTF(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
#endif
} else {
- priority = solveKnapsack(be,
- be->session.mtu - sizeof(P2P_Message));
+ (*priority) = solveKnapsack(be,
+ be->session.mtu - sizeof(P2P_Message));
#if DEBUG_COLLECT_PRIO == YES
- FPRINTF(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+ FPRINTF(prioFile,
+ "%llu 1 %d\n",
+ cronTime(NULL),
+ priority);
#endif
}
} else { /* never approximate < 50% CPU load */
- priority = solveKnapsack(be,
- be->session.mtu - sizeof(P2P_Message));
+ (*priority) = solveKnapsack(be,
+ be->session.mtu - sizeof(P2P_Message));
#if DEBUG_COLLECT_PRIO == YES
- FPRINTF(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
+ FPRINTF(prioFile,
+ "%llu 2 %d\n",
+ cronTime(NULL),
+ priority);
#endif
}
j = 0;
@@ -1052,113 +980,171 @@
j,
be->sendBuffer[j]->len,
be->sendBuffer[j]->pri);
- be->inSendBuffer = NO;
- return;
+ return 0;
}
if (be->available_send_window < be->session.mtu) {
/* if we have a very high priority, we may
want to ignore bandwidth availability (e.g. for HANGUP,
which has EXTREME_PRIORITY) */
- if (priority < EXTREME_PRIORITY) {
+ if ((*priority) < EXTREME_PRIORITY) {
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"bandwidth limits prevent sending (send window %u too small).\n",
be->available_send_window);
#endif
- be->inSendBuffer = NO;
- return; /* can not send, BPS available is too small */
+ return 0; /* can not send, BPS available is too small */
}
}
totalMessageSize = be->session.mtu;
} /* end MTU > 0 */
+ return totalMessageSize;
+}
- expired = cronTime(NULL) - SECONDS_PINGATTEMPT * cronSECONDS;
+/**
+ * Expire old messages from SendBuffer (to avoid
+ * running out of memory).
+ */
+static void expireSendBufferEntries(BufferEntry * be) {
+ int msgCap;
+ int i;
+ SendEntry * entry;
+ cron_t expired;
+ int l;
+ unsigned int freeSlots;
+ int j;
+
/* if it's more than one connection "lifetime" old, always kill it! */
-
- /* check if we (sender) have enough bandwidth available */
- if (SYSERR == outgoingCheck(priority)) {
- int msgCap;
-
- cronTime(&be->lastSendAttempt);
+ expired = cronTime(&be->lastSendAttempt) - SECONDS_PINGATTEMPT * cronSECONDS;
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "policy prevents sending message (priority too low: %d)\n",
- priority);
+ LOG(LOG_DEBUG,
+ "policy prevents sending message (priority too low: %d)\n",
+ priority);
#endif
+
+ l = getCPULoad();
+ /* cleanup queue */
+ if (l >= 50) {
+ msgCap = EXPECTED_MTU / sizeof(HashCode512);
+ } else {
+ if (l <= 0)
+ l = 1;
+ msgCap = EXPECTED_MTU / sizeof(HashCode512)
+ + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
+ }
+ if (be->max_bpm > 2) {
+ msgCap += 2 * (int) log((double)be->max_bpm);
+ if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
+ msgCap = MAX_SEND_BUFFER_SIZE-2;
+ /* try to make sure that there
+ is always room... */
+ }
- /* cleanup queue */
- if (getCPULoad() > 50)
- msgCap = 4;
- else
- msgCap = 54 - getCPULoad();
- if (be->max_bpm > 2)
- msgCap += 2 * (int) log((double)be->max_bpm);
- /* allow at least msgCap msgs in buffer */
- for (i=0;i<be->sendBufferSize;i++) {
- SendEntry * entry = be->sendBuffer[i];
+ freeSlots = 0;
+ /* allow at least msgCap msgs in buffer */
+ for (i=0;i<be->sendBufferSize;i++)
+ if (be->sendBuffer[i] == NULL)
+ freeSlots++;
- if (be->sendBufferSize <= msgCap)
- break;
- if ( entry->transmissionTime < expired) {
+ for (i=0;i<be->sendBufferSize;i++) {
+ entry = be->sendBuffer[i];
+ if (entry == NULL)
+ continue;
+ if (be->sendBufferSize <= msgCap + freeSlots)
+ break;
+ if (entry->transmissionTime < expired) {
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "expiring message, expired %ds ago, queue size is %u (bandwidth
stressed)\n",
- (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
- be->sendBufferSize);
+ LOG(LOG_DEBUG,
+ "expiring message, expired %ds ago, queue size is %u (bandwidth
stressed)\n",
+ (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+ be->sendBufferSize);
#endif
- if (stats != NULL) {
- stats->change(stat_messagesDropped, 1);
- stats->change(stat_sizeMessagesDropped, entry->len);
- }
- FREE(entry->closure);
- FREE(entry);
- be->sendBuffer[i] = be->sendBuffer[be->sendBufferSize-1];
- GROW(be->sendBuffer,
- be->sendBufferSize,
- be->sendBufferSize-1);
- i--; /* go again for this slot */
+ if (stats != NULL) {
+ stats->change(stat_messagesDropped, 1);
+ stats->change(stat_sizeMessagesDropped, entry->len);
}
+ FREENONNULL(entry->closure);
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ freeSlots++;
}
- be->inSendBuffer = NO;
- return; /* deferr further */
- }
+ }
- GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
+ /* cleanup/compact sendBuffer */
+ j = 0;
+ for (i=0;i<be->sendBufferSize;i++)
+ if (be->sendBuffer[i] != NULL)
+ be->sendBuffer[j++] = be->sendBuffer[i];
+ GROW(be->sendBuffer,
+ be->sendBufferSize,
+ j);
+}
- /* first, trigger callbacks on selected entries */
+/**
+ * For each SendEntry of the BE that has
+ * been selected by the knapsack solver,
+ * call the callback and make sure that the
+ * bytes are ready in entry->closure for
+ * transmission.<p>
+ *
+ * If the preparation fails for an entry,
+ * free it.
+ * @return number of prepared entries
+ */
+static unsigned int prepareSelectedMessages(BufferEntry * be) {
+ unsigned int ret;
+ int i;
+ char * tmpMsg;
+ SendEntry * entry;
+
+ ret = 0;
+
for (i=0;i<be->sendBufferSize;i++) {
- SendEntry * entry = be->sendBuffer[i];
+ entry = be->sendBuffer[i];
tmpMsg = MALLOC(entry->len);
- if ( (entry->knapsackSolution == YES) &&
- (entry->callback != NULL) ) {
- if (OK == entry->callback(tmpMsg,
- entry->closure,
- entry->len)) {
- entry->callback = NULL;
- entry->closure = tmpMsg;
+ if (entry->knapsackSolution == YES) {
+ if (entry->callback != NULL) {
+ if (OK == entry->callback(tmpMsg,
+ entry->closure,
+ entry->len)) {
+ entry->callback = NULL;
+ entry->closure = tmpMsg;
+ ret++;
+ } else {
+ FREE(tmpMsg);
+ entry->callback = NULL;
+ entry->closure = NULL;
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ }
} else {
- FREE(tmpMsg);
- entry->callback = NULL;
- entry->closure = NULL;
- FREE(entry);
- be->sendBuffer[i] = NULL;
+ ret++;
}
}
}
+ return ret;
+}
+/**
+ * Compute a random permuation of the send buffer
+ * entry such that the selected messages obey
+ * the SE flags.
+ */
+static int * permuteSendBuffer(BufferEntry * be) {
+ int * perm;
+ int headpos;
+ int tailpos;
+ int i;
+ int j;
+
perm = permute(WEAK, be->sendBufferSize);
- /* change permutation such that SE_FLAGS
- are obeyed */
headpos = 0;
tailpos = be->sendBufferSize-1;
- remainingBufferSize = be->sendBufferSize;
for (i=0;i<be->sendBufferSize;i++) {
if (be->sendBuffer[perm[i]] == NULL)
continue;
if (be->sendBuffer[perm[i]]->knapsackSolution == YES) {
- remainingBufferSize--;
switch (be->sendBuffer[perm[i]]->flags & SE_PLACEMENT_FLAG) {
case SE_FLAG_NONE:
break;
@@ -1176,7 +1162,163 @@
}
}
}
+ return perm;
+}
+/**
+ * Free entries in send buffer that were
+ * selected as the knapsack solution or
+ * that are dead (callback and closure NULL).
+ */
+static void freeSelectedEntries(BufferEntry * be) {
+ int i;
+ SendEntry * entry;
+
+ for (i=0;i<be->sendBufferSize;i++) {
+ entry = be->sendBuffer[i];
+ if (entry == NULL)
+ continue;
+ if (entry->knapsackSolution == YES) {
+ GNUNET_ASSERT(entry->callback == NULL);
+ FREENONNULL(entry->closure);
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ } else if ( (entry->callback == NULL) &&
+ (entry->closure == NULL) ) {
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ }
+ }
+}
+
+/**
+ * Try to make sure that the transport service for the given buffer is
+ * connected. If the transport service changes, this function also
+ * ensures that the pending messages are properly fragmented (if
+ * needed).
+ *
+ * @return OK on success, NO on error
+ */
+static int ensureTransportConnected(BufferEntry * be) {
+ SendEntry ** entries;
+ SendEntry * entry;
+ int i;
+ int ret;
+
+ if (be->session.tsession == NULL) {
+ be->session.tsession
+ = transport->connectFreely(&be->session.sender,
+ YES);
+ if (be->session.tsession == NULL)
+ return NO;
+ be->session.mtu
+ = transport->getMTU(be->session.tsession->ttype);
+ if (be->session.mtu > 0) {
+ /* MTU change may require new fragmentation! */
+ entries = be->sendBuffer;
+ i = 0;
+ ret = be->sendBufferSize;
+ /* assumes entries are sorted by priority! */
+ while (i < ret) {
+ entry = entries[i];
+ if (entry->len > be->session.mtu - sizeof(P2P_Message)) {
+ entries[i] = entries[--ret];
+ fragmentation->fragment(&be->session.sender,
+ be->session.mtu - sizeof(P2P_Message),
+ entry->pri,
+ entry->transmissionTime,
+ entry->len,
+ entry->callback,
+ entry->closure);
+ FREE(entry);
+ }
+ i++;
+ }
+ if (ret != be->sendBufferSize)
+ GROW(be->sendBuffer,
+ be->sendBufferSize,
+ ret);
+ }
+ }
+ return OK;
+}
+
+
+/**
+ * Send a buffer; assumes that access is already synchronized. This
+ * message solves the knapsack problem, assembles the message
+ * (callback to build parts from knapsack, callbacks for padding,
+ * random noise padding, crc, encryption) and finally hands the
+ * message to the transport service.
+ *
+ * @param be connection of the buffer that is to be transmitted
+ */
+static void sendBuffer(BufferEntry * be) {
+ unsigned int i;
+ unsigned int j;
+ unsigned int p;
+ unsigned int rsi;
+ SendCallbackList * pos;
+ P2P_Message * p2pHdr;
+ int priority;
+ int * perm;
+ char * plaintextMsg;
+ void * encryptedMsg;
+ unsigned int totalMessageSize;
+ int ret;
+
+ ENTRY();
+ /* fast ways out */
+ if (be == NULL) {
+ BREAK();
+ return;
+ }
+ if ( (be->status != STAT_UP) ||
+ (be->sendBufferSize == 0) ||
+ (be->inSendBuffer == YES) )
+ return; /* must not run */
+ be->inSendBuffer = YES;
+
+ if ( (OK != ensureTransportConnected(be)) ||
+ (be->sendBufferSize == 0) ||
+ (OK != checkSendFrequency(be)) ) {
+ be->inSendBuffer = NO;
+ return;
+ }
+
+ /* test if receiver has enough bandwidth available! */
+ updateCurBPS(be);
+#if DEBUG_CONNECTION
+ LOG(LOG_DEBUG,
+ "receiver window available: %lld bytes (MTU: %u)\n",
+ be->available_send_window,
+ be->session.mtu);
+#endif
+
+
+ totalMessageSize = selectMessagesToSend(be,
+ &priority);
+ if (totalMessageSize == 0) {
+ expireSendBufferEntries(be);
+ be->inSendBuffer = NO;
+ return; /* deferr further */
+ }
+ totalMessageSize += sizeof(P2P_Message);
+
+ /* check if we (sender) have enough bandwidth available
+ if so, trigger callbacks on selected entries; if either
+ fails, return (but clean up garbage) */
+ if ( (SYSERR == outgoingCheck(priority)) ||
+ (0 == prepareSelectedMessages(be)) ) {
+ expireSendBufferEntries(be);
+ be->inSendBuffer = NO;
+ return; /* deferr further */
+ }
+
+ /* get permutation of SendBuffer Entries
+ such that SE_FLAGS are obeyed */
+ perm = permuteSendBuffer(be);
+
/* build message (start with sequence number) */
plaintextMsg = MALLOC(totalMessageSize);
p2pHdr = (P2P_Message*) plaintextMsg;
@@ -1199,36 +1341,6 @@
entry->closure,
entry->len);
p += entry->len;
- } else {
- int msgCap;
- int l = getCPULoad();
- if (l >= 50) {
- msgCap = EXPECTED_MTU / sizeof(HashCode512);
- } else {
- if (l <= 0)
- l = 1;
- msgCap = EXPECTED_MTU / sizeof(HashCode512)
- + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
- }
- if (be->max_bpm > 2) {
- msgCap += 2 * (int) log((double)be->max_bpm);
- if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
- msgCap = MAX_SEND_BUFFER_SIZE-2; /* try to make sure that there
- is always room... */
- }
- if ( (remainingBufferSize > msgCap) &&
- (entry->transmissionTime < expired) ) {
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "expiring message, expired %ds ago, queue size is %u (other
messages went through)\n",
- (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
- remainingBufferSize);
-#endif
- FREENONNULL(entry->closure);
- FREE(entry);
- be->sendBuffer[perm[i]] = NULL;
- remainingBufferSize--;
- }
}
}
FREE(perm);
@@ -1317,21 +1429,7 @@
j += plen;
}
}
- for (i=0;i<be->sendBufferSize;i++) {
- SendEntry * entry = be->sendBuffer[i];
- if (entry == NULL)
- continue;
- if (entry->knapsackSolution == YES) {
- GNUNET_ASSERT(entry->callback == NULL);
- FREENONNULL(entry->closure);
- FREE(entry);
- be->sendBuffer[i] = NULL;
- } else if ( (entry->callback == NULL) &&
- (entry->closure == NULL) ) {
- FREE(entry);
- be->sendBuffer[i] = NULL;
- }
- }
+ freeSelectedEntries(be);
}
if ( (ret == SYSERR) &&
(be->session.tsession != NULL) ) {
@@ -1341,15 +1439,7 @@
FREE(encryptedMsg);
FREE(plaintextMsg);
-
- /* cleanup/compact sendBuffer */
- j = 0;
- for (i=0;i<be->sendBufferSize;i++)
- if (be->sendBuffer[i] != NULL)
- be->sendBuffer[j++] = be->sendBuffer[i];
- GROW(be->sendBuffer,
- be->sendBufferSize,
- j);
+ expireSendBufferEntries(be);
be->inSendBuffer = NO;
}
@@ -2262,7 +2352,6 @@
*/
void confirmSessionUp(const PeerIdentity * peer) {
BufferEntry * be;
- EncName enc;
MUTEX_LOCK(&lock);
be = lookForHost(peer);
@@ -2270,28 +2359,12 @@
cronTime(&be->isAlive);
identity->whitelistHost(peer);
if ( ( (be->status & STAT_SKEY_SENT) > 0) &&
- ( (be->status & STAT_SKEY_RECEIVED) > 0) ) {
- if (be->session.tsession == NULL)
- be->session.tsession
- = transport->connectFreely(&be->session.sender,
- YES);
- if (be->session.tsession != NULL) {
- be->session.mtu
- = transport->getMTU(be->session.tsession->ttype);
- if (be->status != STAT_UP) {
- be->status = STAT_UP;
- be->lastSequenceNumberReceived = 0;
- be->lastSequenceNumberSend = 1;
- }
- } else {
- IFLOG(LOG_WARNING,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
- LOG(LOG_WARNING,
- _("Session with peer '%s' confirmed, "
- "but I cannot connect! (bug?)\n"),
- &enc);
- }
+ ( (be->status & STAT_SKEY_RECEIVED) > 0) &&
+ (OK == ensureTransportConnected(be)) &&
+ (be->status != STAT_UP) ) {
+ be->status = STAT_UP;
+ be->lastSequenceNumberReceived = 0;
+ be->lastSequenceNumberSend = 1;
}
}
MUTEX_UNLOCK(&lock);
@@ -2532,6 +2605,7 @@
void initConnection() {
GNUNET_ASSERT(P2P_MESSAGE_OVERHEAD
== sizeof(P2P_Message));
+ GNUNET_ASSERT(sizeof(HANGUP_Message) == 68);
ENTRY();
scl_nextHead
= NULL;
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/transports/tcp.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -1406,6 +1406,8 @@
* via a global and returns the udp transport API.
*/
TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
+ GNUNET_ASSERT(sizeof(HostAddress) == 8);
+ GNUNET_ASSERT(sizeof(TCPMessagePack) == 4);
GNUNET_ASSERT(sizeof(TCPWelcome) == 68);
MUTEX_CREATE_RECURSIVE(&tcplock);
reloadConfiguration();
Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/transports/udp.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -633,6 +633,7 @@
TransportAPI * inittransport_udp(CoreAPIForTransport * core) {
int mtu;
+ GNUNET_ASSERT(sizeof(HostAddress) == 8);
GNUNET_ASSERT(sizeof(UDPMessage) == 68);
coreAPI = core;
stats = coreAPI->requestService("stats");
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r1555 - in GNUnet/src: applications/fs applications/fs/module applications/gap applications/pingpong applications/session applications/transport include server transports,
grothoff <=