gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r2324 - in GNUnet/src: applications/fragmentation applicati


From: grothoff
Subject: [GNUnet-SVN] r2324 - in GNUnet/src: applications/fragmentation applications/topology_default server
Date: Thu, 15 Dec 2005 01:23:55 -0800 (PST)

Author: grothoff
Date: 2005-12-15 01:23:51 -0800 (Thu, 15 Dec 2005)
New Revision: 2324

Modified:
   GNUnet/src/applications/fragmentation/fragmentation.c
   GNUnet/src/applications/topology_default/topology.c
   GNUnet/src/server/connection.c
Log:
trying to fix bug 985

Modified: GNUnet/src/applications/fragmentation/fragmentation.c
===================================================================
--- GNUnet/src/applications/fragmentation/fragmentation.c       2005-12-15 
00:56:15 UTC (rev 2323)
+++ GNUnet/src/applications/fragmentation/fragmentation.c       2005-12-15 
09:23:51 UTC (rev 2324)
@@ -442,8 +442,9 @@
  * such a great idea (i.e. would just waste bandwidth).
  */
 static int fragmentBMC(void * buf,
-                      FragmentBMC * ctx,
+                      void * cls,
                       unsigned short len) {
+  FragmentBMC * ctx = cls;
   static int idGen = 0;
   P2P_fragmentation_MESSAGE * frag;
   unsigned int pos;
@@ -535,7 +536,7 @@
   }
   xlen = mtu - sizeof(P2P_fragmentation_MESSAGE);
   coreAPI->unicastCallback(peer,
-                          (BuildMessageCallback) &fragmentBMC,
+                          &fragmentBMC,
                           fbmc,
                           mtu,
                           prio * xlen / len, /* compute new prio */

Modified: GNUnet/src/applications/topology_default/topology.c
===================================================================
--- GNUnet/src/applications/topology_default/topology.c 2005-12-15 00:56:15 UTC 
(rev 2323)
+++ GNUnet/src/applications/topology_default/topology.c 2005-12-15 09:23:51 UTC 
(rev 2324)
@@ -215,7 +215,8 @@
  *
  * @param hostId the peer that gave a sign of live
  */
-static void notifyPONG(PeerIdentity * hostId) {
+static void notifyPONG(void * cls) {
+  PeerIdentity * hostId = cls;
 #if DEBUG_TOPOLOGY
   EncName enc;
 
@@ -230,7 +231,7 @@
 }
 
 /**
- * Check the liveness of the ping and possibly ping it.
+ * Check the liveness of the peer and possibly ping it.
  */
 static void checkNeedForPing(const PeerIdentity * peer,
                             void * unused) {
@@ -264,7 +265,7 @@
 #endif
     if (OK != pingpong->ping(peer,
                             NO,
-                            (CronJob)&notifyPONG,
+                            &notifyPONG,
                             hi))
       FREE(hi);
   }

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2005-12-15 00:56:15 UTC (rev 2323)
+++ GNUnet/src/server/connection.c      2005-12-15 09:23:51 UTC (rev 2324)
@@ -87,18 +87,6 @@
 #define SECONDS_PINGATTEMPT 120
 
 /**
- * How big do we estimate should the send buffer be?  (can grow bigger
- * if we have many requests in a short time, but if it is larger than
- * this, we start do discard expired entries.)  Computed as "MTU /
- * querySize" plus a bit with the goal to be able to have at least
- * enough small entries to fill a message completely *and* to have
- * some room to manouver.
- */
-#define TARGET_SBUF_SIZE 40
-
-unsigned int MAX_SEND_FREQUENCY = 50 * cronMILLIS;
-
-/**
  * High priority message that needs to go through fast,
  * but not if policies would be disregarded.
  */
@@ -114,9 +102,9 @@
 #define MAX_BUF_FACT 2
 
 /**
- * Expected MTU for a connection (1500 for Ethernet)
+ * Expected MTU for a streaming connection.
  */
-#define EXPECTED_MTU 1500
+#define EXPECTED_MTU 32768
 
 /**
  * How many ping/pong messages to we want to transmit
@@ -134,7 +122,7 @@
 /**
  * What is the minimum number of bytes per minute that
  * we allocate PER peer? (5 minutes inactivity timeout,
- * 1500 MTU, 8 MSGs => 8 * 1500 / 5 = 2400 bpm [ 40 bps])
+ * 32768 MTU, 8 MSGs => 8 * 32768 / 5 = ~50000 bpm [ ~800 bps])
  */
 #define MIN_BPM_PER_PEER (TARGET_MSG_SID * EXPECTED_MTU * 60 / 
SECONDS_INACTIVE_DROP)
 
@@ -146,9 +134,10 @@
 #define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU / 
MIN_BPM_PER_PEER)
 
 /**
- * Hard limit on the send buffer size
+ * Hard limit on the send buffer size (per connection, in bytes),
+ * Must be larger than EXPECTED_MTU.
  */
-#define MAX_SEND_BUFFER_SIZE 256
+#define MAX_SEND_BUFFER_SIZE (EXPECTED_MTU * 8)
 
 /**
  * Status constants
@@ -874,8 +863,7 @@
   if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
     be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
 
-  if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
-       (be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
+  if (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) {
 #if DEBUG_CONNECTION
     LOG(LOG_DEBUG,
        "Send frequency too high (CPU load), send deferred.\n");
@@ -1040,12 +1028,13 @@
  * running out of memory).
  */
 static void expireSendBufferEntries(BufferEntry * be) {
-  int msgCap;
+  unsigned long long msgCap;
   int i;
   SendEntry * entry;
   cron_t expired;
+  cron_t next_expired;
   int l;
-  unsigned int freeSlots;
+  unsigned long long usedBytes;
   int j;
 
   /* if it's more than one connection "lifetime" old, always kill it! */
@@ -1057,51 +1046,51 @@
 
   l = getCPULoad();
   /* cleanup queue */
-  if (l >= 50) {
-    msgCap = EXPECTED_MTU / sizeof(HashCode512);
-  } else {
+  msgCap = be->max_bpm; /* have minute of msgs, but at least one MTU */
+  if (msgCap < EXPECTED_MTU)
+    msgCap = EXPECTED_MTU;
+  if (l < 50) { /* afford more if CPU load is low */
     if (l <= 0)
       l = 1;
-    msgCap = EXPECTED_MTU / sizeof(HashCode512)
-      + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
+    msgCap += (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU) / l;
   }
-  if (be->max_bpm > 2) {
-    msgCap += 2 * (int) log((double)be->max_bpm);
-    if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
-      msgCap = MAX_SEND_BUFFER_SIZE-2;
-    /* try to make sure that there
-       is always room... */
-  }
 
-  freeSlots = 0;
-  /* allow at least msgCap msgs in buffer */
+  usedBytes = 0;
+  /* allow at least msgCap bytes in buffer */
   for (i=0;i<be->sendBufferSize;i++)
-    if (be->sendBuffer[i] == NULL)
-      freeSlots++;
+    if (be->sendBuffer[i] != NULL)
+      usedBytes += be->sendBuffer[i]->len;
 
-  for (i=0;i<be->sendBufferSize;i++) {         
-    entry = be->sendBuffer[i];
-    if (entry == NULL)
-      continue;
-    if (be->sendBufferSize <= msgCap + freeSlots)
-      break;
-    if (entry->transmissionTime < expired) {
+  while (usedBytes > msgCap) {
+    next_expired = cronTime(NULL) +7 * cronDAYS; /* 'infinity' */
+    for (i=0;i<be->sendBufferSize;i++) {       
+      entry = be->sendBuffer[i];
+      if (entry == NULL)
+       continue;
+      if (usedBytes <= msgCap)
+       break;
+      if (entry->transmissionTime <= expired) {
 #if DEBUG_CONNECTION
-      LOG(LOG_DEBUG,
-         "expiring message, expired %ds ago, queue size is %u (bandwidth 
stressed)\n",
-         (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
-         be->sendBufferSize);
+       LOG(LOG_DEBUG,
+           "expiring message, expired %ds ago, queue size is %llu (bandwidth 
stressed)\n",
+           (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+           usedBytes);
 #endif
-      if (stats != NULL) {
-       stats->change(stat_messagesDropped, 1);
-       stats->change(stat_sizeMessagesDropped, entry->len);
+       if (stats != NULL) {
+         stats->change(stat_messagesDropped, 1);
+         stats->change(stat_sizeMessagesDropped, entry->len);
+       }
+       FREENONNULL(entry->closure);
+       usedBytes -= entry->len;
+       FREE(entry);
+       be->sendBuffer[i] = NULL;
+      } else if (entry->transmissionTime < next_expired) {
+       next_expired = entry->transmissionTime; /* compute min! */
       }
-      FREENONNULL(entry->closure);
-      FREE(entry);
-      be->sendBuffer[i] = NULL;
-      freeSlots++;
     }
+    expired = next_expired;
   }
+  GNUNET_ASSERT(usedBytes <= msgCap);
 
   /* cleanup/compact sendBuffer */
   j = 0;
@@ -1526,6 +1515,7 @@
   float apri;
   unsigned int i;
   SendEntry ** ne;
+  unsigned long long queueSize;
 
   ENTRY();
   if ( (se == NULL) || (se->len == 0) ) {
@@ -1569,16 +1559,26 @@
     FREE(se);
     return;
   }
-  if (be->sendBufferSize >= MAX_SEND_BUFFER_SIZE) {
+  queueSize = 0;
+  for (i=0;i<be->sendBufferSize;i++)
+    queueSize += be->sendBuffer[i]->len;
+
+  if (queueSize >= MAX_SEND_BUFFER_SIZE) {
     /* first, try to remedy! */
     sendBuffer(be);
     /* did it work? */
-    if (be->sendBufferSize >= MAX_SEND_BUFFER_SIZE) {
+
+    queueSize = 0;
+    for (i=0;i<be->sendBufferSize;i++)
+      queueSize += be->sendBuffer[i]->len;
+
+    if (queueSize >= MAX_SEND_BUFFER_SIZE) {
       /* we need to enforce some hard limit here, otherwise we may take
         FAR too much memory (200 MB easily) */
 #if DEBUG_CONNECTION
       LOG(LOG_DEBUG,
-         "sendBufferSize >= %d, refusing to queue message.\n",
+         "queueSize (%llu) >= %d, refusing to queue message.\n",
+         queueSize,
          MAX_SEND_BUFFER_SIZE);
 #endif
       FREE(se->closure);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]