[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r2324 - in GNUnet/src: applications/fragmentation applicati
From: |
grothoff |
Subject: |
[GNUnet-SVN] r2324 - in GNUnet/src: applications/fragmentation applications/topology_default server |
Date: |
Thu, 15 Dec 2005 01:23:55 -0800 (PST) |
Author: grothoff
Date: 2005-12-15 01:23:51 -0800 (Thu, 15 Dec 2005)
New Revision: 2324
Modified:
GNUnet/src/applications/fragmentation/fragmentation.c
GNUnet/src/applications/topology_default/topology.c
GNUnet/src/server/connection.c
Log:
trying to fix bug 985
Modified: GNUnet/src/applications/fragmentation/fragmentation.c
===================================================================
--- GNUnet/src/applications/fragmentation/fragmentation.c 2005-12-15
00:56:15 UTC (rev 2323)
+++ GNUnet/src/applications/fragmentation/fragmentation.c 2005-12-15
09:23:51 UTC (rev 2324)
@@ -442,8 +442,9 @@
* such a great idea (i.e. would just waste bandwidth).
*/
static int fragmentBMC(void * buf,
- FragmentBMC * ctx,
+ void * cls,
unsigned short len) {
+ FragmentBMC * ctx = cls;
static int idGen = 0;
P2P_fragmentation_MESSAGE * frag;
unsigned int pos;
@@ -535,7 +536,7 @@
}
xlen = mtu - sizeof(P2P_fragmentation_MESSAGE);
coreAPI->unicastCallback(peer,
- (BuildMessageCallback) &fragmentBMC,
+ &fragmentBMC,
fbmc,
mtu,
prio * xlen / len, /* compute new prio */
Modified: GNUnet/src/applications/topology_default/topology.c
===================================================================
--- GNUnet/src/applications/topology_default/topology.c 2005-12-15 00:56:15 UTC
(rev 2323)
+++ GNUnet/src/applications/topology_default/topology.c 2005-12-15 09:23:51 UTC
(rev 2324)
@@ -215,7 +215,8 @@
*
* @param hostId the peer that gave a sign of live
*/
-static void notifyPONG(PeerIdentity * hostId) {
+static void notifyPONG(void * cls) {
+ PeerIdentity * hostId = cls;
#if DEBUG_TOPOLOGY
EncName enc;
@@ -230,7 +231,7 @@
}
/**
- * Check the liveness of the ping and possibly ping it.
+ * Check the liveness of the peer and possibly ping it.
*/
static void checkNeedForPing(const PeerIdentity * peer,
void * unused) {
@@ -264,7 +265,7 @@
#endif
if (OK != pingpong->ping(peer,
NO,
- (CronJob)¬ifyPONG,
+ ¬ifyPONG,
hi))
FREE(hi);
}
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2005-12-15 00:56:15 UTC (rev 2323)
+++ GNUnet/src/server/connection.c 2005-12-15 09:23:51 UTC (rev 2324)
@@ -87,18 +87,6 @@
#define SECONDS_PINGATTEMPT 120
/**
- * How big do we estimate should the send buffer be? (can grow bigger
- * if we have many requests in a short time, but if it is larger than
- * this, we start do discard expired entries.) Computed as "MTU /
- * querySize" plus a bit with the goal to be able to have at least
- * enough small entries to fill a message completely *and* to have
- * some room to manouver.
- */
-#define TARGET_SBUF_SIZE 40
-
-unsigned int MAX_SEND_FREQUENCY = 50 * cronMILLIS;
-
-/**
* High priority message that needs to go through fast,
* but not if policies would be disregarded.
*/
@@ -114,9 +102,9 @@
#define MAX_BUF_FACT 2
/**
- * Expected MTU for a connection (1500 for Ethernet)
+ * Expected MTU for a streaming connection.
*/
-#define EXPECTED_MTU 1500
+#define EXPECTED_MTU 32768
/**
* How many ping/pong messages to we want to transmit
@@ -134,7 +122,7 @@
/**
* What is the minimum number of bytes per minute that
* we allocate PER peer? (5 minutes inactivity timeout,
- * 1500 MTU, 8 MSGs => 8 * 1500 / 5 = 2400 bpm [ 40 bps])
+ * 32768 MTU, 8 MSGs => 8 * 32768 / 5 = ~50000 bpm [ ~800 bps])
*/
#define MIN_BPM_PER_PEER (TARGET_MSG_SID * EXPECTED_MTU * 60 /
SECONDS_INACTIVE_DROP)
@@ -146,9 +134,10 @@
#define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU /
MIN_BPM_PER_PEER)
/**
- * Hard limit on the send buffer size
+ * Hard limit on the send buffer size (per connection, in bytes),
+ * Must be larger than EXPECTED_MTU.
*/
-#define MAX_SEND_BUFFER_SIZE 256
+#define MAX_SEND_BUFFER_SIZE (EXPECTED_MTU * 8)
/**
* Status constants
@@ -874,8 +863,7 @@
if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
- if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
- (be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
+ if (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) {
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"Send frequency too high (CPU load), send deferred.\n");
@@ -1040,12 +1028,13 @@
* running out of memory).
*/
static void expireSendBufferEntries(BufferEntry * be) {
- int msgCap;
+ unsigned long long msgCap;
int i;
SendEntry * entry;
cron_t expired;
+ cron_t next_expired;
int l;
- unsigned int freeSlots;
+ unsigned long long usedBytes;
int j;
/* if it's more than one connection "lifetime" old, always kill it! */
@@ -1057,51 +1046,51 @@
l = getCPULoad();
/* cleanup queue */
- if (l >= 50) {
- msgCap = EXPECTED_MTU / sizeof(HashCode512);
- } else {
+ msgCap = be->max_bpm; /* have minute of msgs, but at least one MTU */
+ if (msgCap < EXPECTED_MTU)
+ msgCap = EXPECTED_MTU;
+ if (l < 50) { /* afford more if CPU load is low */
if (l <= 0)
l = 1;
- msgCap = EXPECTED_MTU / sizeof(HashCode512)
- + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
+ msgCap += (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU) / l;
}
- if (be->max_bpm > 2) {
- msgCap += 2 * (int) log((double)be->max_bpm);
- if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
- msgCap = MAX_SEND_BUFFER_SIZE-2;
- /* try to make sure that there
- is always room... */
- }
- freeSlots = 0;
- /* allow at least msgCap msgs in buffer */
+ usedBytes = 0;
+ /* allow at least msgCap bytes in buffer */
for (i=0;i<be->sendBufferSize;i++)
- if (be->sendBuffer[i] == NULL)
- freeSlots++;
+ if (be->sendBuffer[i] != NULL)
+ usedBytes += be->sendBuffer[i]->len;
- for (i=0;i<be->sendBufferSize;i++) {
- entry = be->sendBuffer[i];
- if (entry == NULL)
- continue;
- if (be->sendBufferSize <= msgCap + freeSlots)
- break;
- if (entry->transmissionTime < expired) {
+ while (usedBytes > msgCap) {
+ next_expired = cronTime(NULL) +7 * cronDAYS; /* 'infinity' */
+ for (i=0;i<be->sendBufferSize;i++) {
+ entry = be->sendBuffer[i];
+ if (entry == NULL)
+ continue;
+ if (usedBytes <= msgCap)
+ break;
+ if (entry->transmissionTime <= expired) {
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "expiring message, expired %ds ago, queue size is %u (bandwidth
stressed)\n",
- (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
- be->sendBufferSize);
+ LOG(LOG_DEBUG,
+ "expiring message, expired %ds ago, queue size is %llu (bandwidth
stressed)\n",
+ (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+ usedBytes);
#endif
- if (stats != NULL) {
- stats->change(stat_messagesDropped, 1);
- stats->change(stat_sizeMessagesDropped, entry->len);
+ if (stats != NULL) {
+ stats->change(stat_messagesDropped, 1);
+ stats->change(stat_sizeMessagesDropped, entry->len);
+ }
+ FREENONNULL(entry->closure);
+ usedBytes -= entry->len;
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ } else if (entry->transmissionTime < next_expired) {
+ next_expired = entry->transmissionTime; /* compute min! */
}
- FREENONNULL(entry->closure);
- FREE(entry);
- be->sendBuffer[i] = NULL;
- freeSlots++;
}
+ expired = next_expired;
}
+ GNUNET_ASSERT(usedBytes <= msgCap);
/* cleanup/compact sendBuffer */
j = 0;
@@ -1526,6 +1515,7 @@
float apri;
unsigned int i;
SendEntry ** ne;
+ unsigned long long queueSize;
ENTRY();
if ( (se == NULL) || (se->len == 0) ) {
@@ -1569,16 +1559,26 @@
FREE(se);
return;
}
- if (be->sendBufferSize >= MAX_SEND_BUFFER_SIZE) {
+ queueSize = 0;
+ for (i=0;i<be->sendBufferSize;i++)
+ queueSize += be->sendBuffer[i]->len;
+
+ if (queueSize >= MAX_SEND_BUFFER_SIZE) {
/* first, try to remedy! */
sendBuffer(be);
/* did it work? */
- if (be->sendBufferSize >= MAX_SEND_BUFFER_SIZE) {
+
+ queueSize = 0;
+ for (i=0;i<be->sendBufferSize;i++)
+ queueSize += be->sendBuffer[i]->len;
+
+ if (queueSize >= MAX_SEND_BUFFER_SIZE) {
/* we need to enforce some hard limit here, otherwise we may take
FAR too much memory (200 MB easily) */
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
- "sendBufferSize >= %d, refusing to queue message.\n",
+ "queueSize (%llu) >= %d, refusing to queue message.\n",
+ queueSize,
MAX_SEND_BUFFER_SIZE);
#endif
FREE(se->closure);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r2324 - in GNUnet/src: applications/fragmentation applications/topology_default server,
grothoff <=