gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r1555 - in GNUnet/src: applications/fs applications/fs/modu


From: grothoff
Subject: [GNUnet-SVN] r1555 - in GNUnet/src: applications/fs applications/fs/module applications/gap applications/pingpong applications/session applications/transport include server transports
Date: Sat, 16 Jul 2005 08:51:39 -0700 (PDT)

Author: grothoff
Date: 2005-07-16 08:51:25 -0700 (Sat, 16 Jul 2005)
New Revision: 1555

Modified:
   GNUnet/src/applications/fs/ecrs_core.c
   GNUnet/src/applications/fs/module/fs.c
   GNUnet/src/applications/gap/gap.c
   GNUnet/src/applications/pingpong/pingpong.c
   GNUnet/src/applications/session/connect.c
   GNUnet/src/applications/transport/transport.c
   GNUnet/src/include/ecrs_core.h
   GNUnet/src/server/connection.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/udp.c
Log:
code cleanup plus additional assertions

Modified: GNUnet/src/applications/fs/ecrs_core.c
===================================================================
--- GNUnet/src/applications/fs/ecrs_core.c      2005-07-16 13:31:41 UTC (rev 
1554)
+++ GNUnet/src/applications/fs/ecrs_core.c      2005-07-16 15:51:25 UTC (rev 
1555)
@@ -1,5 +1,6 @@
 /*
      This file is part of GNUnet
+     (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -104,6 +105,8 @@
 /**
  * Get the query that will be used to query for
  * a certain block of data.
+ *
+ * @param db the block in plaintext
  */
 void fileBlockGetQuery(const DBlock * db,
                       unsigned int len,
@@ -156,13 +159,16 @@
   unsigned int type;
 
   type = getTypeOfBlock(size, data);
-  if (type == ANY_BLOCK)
+  if (type == ANY_BLOCK) {
+    BREAK();
     return SYSERR;
+  }
   switch (type) {
   case D_BLOCK:
-    fileBlockGetKey(data,
-                   size,
-                   query);
+    /* CHK: hash of content == query */
+    hash(&data[1],
+        size - sizeof(DBlock),
+        query);
     return OK;
   case S_BLOCK: {
     const SBlock * sb;

Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c      2005-07-16 13:31:41 UTC (rev 
1554)
+++ GNUnet/src/applications/fs/module/fs.c      2005-07-16 15:51:25 UTC (rev 
1555)
@@ -87,7 +87,7 @@
 /**
  * Store an item in the datastore.
  *
- * @param key the key of the item
+ * @param query the unique identifier of the item
  * @param value the value to store
  * @param prio how much does our routing code value
  *        this datum?
@@ -96,7 +96,7 @@
  *         SYSERR if the value is malformed
  */
 static int gapPut(void * closure,
-                 const HashCode512 * key,
+                 const HashCode512 * query,
                  const DataContainer * value,
                  unsigned int prio) {
   Datastore_Value * dv;
@@ -119,7 +119,7 @@
   if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
                          (DBlock*)&gw[1],
                          &hc)) ||
-       (! equalsHashCode512(&hc, key)) ) {
+       (! equalsHashCode512(&hc, query)) ) {
     LOG(LOG_ERROR,
        "Type: %u\n",
        ntohl(*(unsigned int*) &gw[1]));
@@ -149,37 +149,37 @@
                               ntohl(dv->size) - sizeof(Datastore_Value),
                               (DBlock*) &dv[1],
                               0,
-                              key)) {
+                              query)) {
     BREAK();
     FREE(dv);
     return SYSERR;
   }
-  processResponse(key, dv);
+  processResponse(query, dv);
   IFLOG(LOG_DEBUG,
-       hash2enc(key,
+       hash2enc(query,
                 &enc));
   LOG(LOG_DEBUG,
       "FS received GAP-PUT request (query: %s)\n",
       &enc);
-  ret = datastore->putUpdate(key,
+  ret = datastore->putUpdate(query,
                             dv);
   FREE(dv);
   return ret;
 }
 
-static int get_result_callback(const HashCode512 * key,
+static int get_result_callback(const HashCode512 * query,
                               const DataContainer * value,
                               DHT_GET_CLS * cls) {
   EncName enc;
 
   IFLOG(LOG_DEBUG,
-       hash2enc(key,
+       hash2enc(query,
                 &enc));
   LOG(LOG_DEBUG,
-      "Found reply to query %s.\n",
+      "Found reply to query '%s'.\n",
       &enc);
   gapPut(NULL,
-        key,
+        query,
         value,
         cls->prio);
   return OK;
@@ -944,6 +944,17 @@
   static Blockstore dsGap;
   static Blockstore dsDht;
 
+
+  GNUNET_ASSERT(sizeof(CHK) == 128);
+  GNUNET_ASSERT(sizeof(DBlock) == 4);
+  GNUNET_ASSERT(sizeof(IBlock) == 132);
+  GNUNET_ASSERT(sizeof(FileIdentifier) == 136);
+  GNUNET_ASSERT(sizeof(KBlock) == 524);
+  GNUNET_ASSERT(sizeof(SBlock) == 732);
+  GNUNET_ASSERT(sizeof(NBlock) == 716);
+  GNUNET_ASSERT(sizeof(KNBlock) == 1244);
+
+
   hash("GNUNET_FS",
        strlen("GNUNET_FS"),
        &dht_table);

Modified: GNUnet/src/applications/gap/gap.c
===================================================================
--- GNUnet/src/applications/gap/gap.c   2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/applications/gap/gap.c   2005-07-16 15:51:25 UTC (rev 1555)
@@ -1,5 +1,6 @@
 /*
       This file is part of GNUnet
+     (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other 
contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -794,9 +795,12 @@
   EncName encq;
   EncName encp;
 
-  if (equalsHashCode512(&id->hashPubKey,
-                       &qr->noTarget.hashPubKey))
+  if ( (equalsHashCode512(&id->hashPubKey,
+                         &qr->noTarget.hashPubKey)) ||
+       (equalsHashCode512(&id->hashPubKey,
+                         &qr->msg->returnTo.hashPubKey)) )
     return;
+  
   if (getBit(qr, getIndex(id)) == 1) {
     IFLOG(LOG_DEBUG,
          hash2enc(&id->hashPubKey,
@@ -998,7 +1002,8 @@
 /**
  * Call useContent "later" and then free the pmsg.
  */
-static void useContentLater(GAP_REPLY * pmsg) {
+static void useContentLater(void * data) {
+  GAP_REPLY * pmsg = data;
   useContent(NULL,
             pmsg);
   FREE(pmsg);
@@ -1059,7 +1064,7 @@
         size - sizeof(GAP_REPLY));
   /* delay reply, delay longer if we are busy (makes it harder
      to predict / analyze, too). */
-  addCronJob((CronJob)&useContentLater,
+  addCronJob(&useContentLater,
             randomi(TTL_DECREMENT),
             0,
             pmsg);
@@ -2001,9 +2006,11 @@
   memcpy(qmsg, msg, ntohs(msg->size));
   if (equalsHashCode512(&qmsg->returnTo.hashPubKey,
                        &coreAPI->myIdentity->hashPubKey)) {
-    /* A to B, B sends back to A without (!) source rewriting,
+    /* A to B, B sends to C without source rewriting,
+       C sends back to A again without source rewriting;
+       (or B directly back to A; also should not happen)
        in this case, A must just drop; however, this
-       should never happen. */
+       should not happen (peers should check). */
     BREAK();
     FREE(qmsg);
     return OK;
@@ -2088,6 +2095,9 @@
   static GAP_ServiceAPI api;
   unsigned int i;
 
+  GNUNET_ASSERT(sizeof(GAP_REPLY) == 68);
+  GNUNET_ASSERT(sizeof(GAP_QUERY) == 144);
+  
   coreAPI = capi;
   GROW(rewards,
        rewardSize,

Modified: GNUnet/src/applications/pingpong/pingpong.c
===================================================================
--- GNUnet/src/applications/pingpong/pingpong.c 2005-07-16 13:31:41 UTC (rev 
1554)
+++ GNUnet/src/applications/pingpong/pingpong.c 2005-07-16 15:51:25 UTC (rev 
1555)
@@ -448,6 +448,7 @@
 provide_module_pingpong(CoreAPIForApplication * capi) {
   static Pingpong_ServiceAPI ret;
 
+  GNUNET_ASSERT(sizeof(PINGPONG_Message) == 72);
   coreAPI = capi;
   identity = capi->requestService("identity");
   if (identity == NULL) {

Modified: GNUnet/src/applications/session/connect.c
===================================================================
--- GNUnet/src/applications/session/connect.c   2005-07-16 13:31:41 UTC (rev 
1554)
+++ GNUnet/src/applications/session/connect.c   2005-07-16 15:51:25 UTC (rev 
1555)
@@ -695,6 +695,7 @@
 provide_module_session(CoreAPIForApplication * capi) {
   static Session_ServiceAPI ret;
 
+  GNUNET_ASSERT(sizeof(SKEY_Message) == 520)
   coreAPI = capi;
   identity = capi->requestService("identity");
   if (identity == NULL) {

Modified: GNUnet/src/applications/transport/transport.c
===================================================================
--- GNUnet/src/applications/transport/transport.c       2005-07-16 13:31:41 UTC 
(rev 1554)
+++ GNUnet/src/applications/transport/transport.c       2005-07-16 15:51:25 UTC 
(rev 1555)
@@ -611,6 +611,7 @@
   void * lib;
   EncName myself;
 
+  GNUNET_ASSERT(sizeof(HELO_Message) == 600);
   identity = capi->requestService("identity");
   if (identity == NULL) {
     BREAK();

Modified: GNUnet/src/include/ecrs_core.h
===================================================================
--- GNUnet/src/include/ecrs_core.h      2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/include/ecrs_core.h      2005-07-16 15:51:25 UTC (rev 1555)
@@ -1,5 +1,6 @@
 /*
      This file is part of GNUnet
+     (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/server/connection.c      2005-07-16 15:51:25 UTC (rev 1555)
@@ -823,97 +823,15 @@
 }
 
 /**
- * Send a buffer; assumes that access is already synchronized.  This
- * message solves the knapsack problem, assembles the message
- * (callback to build parts from knapsack, callbacks for padding,
- * random noise padding, crc, encryption) and finally hands the
- * message to the transport service.
+ * Check that the send frequency for this
+ * buffer is not too high.
  *
- * @param be connection of the buffer that is to be transmitted
+ * @return OK if sending a message now is acceptable
  */
-static void sendBuffer(BufferEntry * be) {
-  unsigned int i;
-  unsigned int j;
-  unsigned int p;
-  unsigned int rsi;
-  SendCallbackList * pos;
-  P2P_Message * p2pHdr;
-  int priority;
-  int * perm;
-  char * tmpMsg;
-  char * plaintextMsg;
-  void * encryptedMsg;
-  cron_t expired;
-  int headpos;
-  int tailpos;
-  int approxProb;
-  int remainingBufferSize;
-  unsigned int totalMessageSize;
-  int ret;
-
-  ENTRY();
-  /* fast ways out */
-  if (be == NULL) {
-    BREAK();
-    return;
-  }
-  if (be->status != STAT_UP)
-    return; /* status is not up, cannot send! */
-  if (be->sendBufferSize == 0)
-    return; /* nothing to send */
-
-  if (be->inSendBuffer == YES)
-    return; /* must not recurse! */
-  be->inSendBuffer = YES;
-
-  /* recompute max send frequency */
-  if (be->max_bpm <= 0)
+static int checkSendFrequency(BufferEntry * be) {
+  if (be->max_bpm == 0)
     be->max_bpm = 1;
 
-  if (be->session.tsession == NULL) {
-    be->session.tsession
-      = transport->connectFreely(&be->session.sender,
-                                YES);
-    if (be->session.tsession == NULL) {
-      be->inSendBuffer = NO;
-      return;
-    }
-    be->session.mtu
-      = transport->getMTU(be->session.tsession->ttype);
-    if (be->session.mtu > 0) {
-      /* MTU change may require new fragmentation! */
-      SendEntry ** entries;
-      SendEntry * entry;
-      
-      entries = be->sendBuffer;
-      i = 0;
-      ret = be->sendBufferSize;
-      /* assumes entries are sorted by priority! */
-      while (i < ret) {
-       entry = entries[i];
-       if (entry->len > be->session.mtu - sizeof(P2P_Message)) {
-         entries[i] = entries[--ret];
-         fragmentation->fragment(&be->session.sender,
-                                 be->session.mtu - sizeof(P2P_Message),
-                                 entry->pri,
-                                 entry->transmissionTime,
-                                 entry->len,
-                                 entry->callback,
-                                 entry->closure);
-         FREE(entry);
-       }
-       i++;
-      } 
-      if (ret != be->sendBufferSize)
-       GROW(be->sendBuffer,
-            be->sendBufferSize,
-            ret);
-    }
-  }
-  if (be->sendBufferSize == 0)
-    return; /* nothing to send */
-
-
   if (be->session.mtu == 0) {
     be->MAX_SEND_FREQUENCY = /* ms per message */
       EXPECTED_MTU
@@ -929,81 +847,85 @@
      solutions for any MIN_SAMPLE_TIME! */
   if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
     be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
-
+  
   if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
        (be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
 #if DEBUG_CONNECTION
     LOG(LOG_DEBUG,
        "Send frequency too high (CPU load), send deferred.\n");
 #endif
-    be->inSendBuffer = NO;
-    return; /* frequency too high, wait */
+    return NO; /* frequency too high, wait */
   }
+  return OK;
+}
 
-  /* test if receiver has enough bandwidth available!  */
-  updateCurBPS(be);
-#if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "receiver window available: %lld bytes (MTU: %u)\n",
-      be->available_send_window,
-      be->session.mtu);
-#endif
+/**
+ * Select a subset of the messages for sending.
+ *
+ * @param *priority is set to the achieved message priority
+ * @return total number of bytes of messages selected
+ */
+static unsigned int selectMessagesToSend(BufferEntry * be,
+                                        unsigned int * priority) {
+  unsigned int totalMessageSize;
+  SendEntry * entry;
+  int i;
+  int j;
+  int approxProb;
+  
+  totalMessageSize = 0;
+  (*priority) = 0;
 
-
   if (be->session.mtu == 0) {
-    SendEntry ** entries;
-
-    entries = be->sendBuffer;
     totalMessageSize = sizeof(P2P_Message);
-    priority = 0;
     i = 0;
     /* assumes entries are sorted by priority! */
     while (i < be->sendBufferSize) {
-      if ( (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) &&
-          (entries[i]->pri >= EXTREME_PRIORITY) ) {
-       entries[i]->knapsackSolution = YES;
-       priority += entries[i]->pri;
-       totalMessageSize += entries[i]->len;
+      entry = be->sendBuffer[i];
+      if ( (totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
+          (entry->pri >= EXTREME_PRIORITY) ) {
+       entry->knapsackSolution = YES;
+       (*priority) += entry->pri;
+       totalMessageSize += entry->len;
       } else {
-       entries[i]->knapsackSolution = NO;
+       entry->knapsackSolution = NO;
        break;
       }
       i++;
     }
     if ( (i == 0) &&
-        (entries[i]->len <= be->available_send_window) )
-      return; /* always wait for the highest-priority
+        (be->sendBuffer[i]->len <= be->available_send_window) )
+      return 0; /* always wait for the highest-priority
                 message (otherwise large messages may
                 starve! */
     while ( (i < be->sendBufferSize) &&
            (be->available_send_window > totalMessageSize) ) {
-      if ( (entries[i]->len + totalMessageSize <=
+      entry = be->sendBuffer[i];
+      if ( (entry->len + totalMessageSize <=
            be->available_send_window) &&
-          (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) ) {
-       entries[i]->knapsackSolution = YES;
-       totalMessageSize += entries[i]->len;
-       priority += entries[i]->pri;
+          (totalMessageSize + entry->len < MAX_BUFFER_SIZE) ) {
+       entry->knapsackSolution = YES;
+       totalMessageSize += entry->len;
+       (*priority) += entry->pri;
       } else {
-       entries[i]->knapsackSolution = NO;
+       entry->knapsackSolution = NO;
        if (totalMessageSize == sizeof(P2P_Message)) {  
          /* if the highest-priority message does not yet
             fit, wait for send window to grow so that
             we can get it out (otherwise we would starve
             high-priority, large messages) */
-         be->inSendBuffer = NO;    
-         return;
+         return 0;
        }
       }
       i++;
     }
     if ( (totalMessageSize == sizeof(P2P_Message)) ||
-        ( (priority < EXTREME_PRIORITY) &&
+        ( ((*priority) < EXTREME_PRIORITY) &&
           ((totalMessageSize / sizeof(P2P_Message)) < 4) &&
           (randomi(16) != 0) ) ) {
       /* randomization necessary to ensure we eventually send
         a small message if there is nothing else to do! */
-      be->inSendBuffer = NO;
-      return;
+      return 0;
     }
   } else { /* if (be->session.mtu == 0) */
     /* solve knapsack problem, compute accumulated priority */
@@ -1015,23 +937,29 @@
       approxProb *= 2; /* now value between 0 [always approx] and 100 [never 
approx] */
       /* control CPU load probabilistically! */
       if (randomi(1+approxProb) == 0) {
-       priority = approximateKnapsack(be,
-                                      be->session.mtu - sizeof(P2P_Message));
+       (*priority) = approximateKnapsack(be,
+                                         be->session.mtu - 
sizeof(P2P_Message));
 #if DEBUG_COLLECT_PRIO == YES
        FPRINTF(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
 #endif
       } else {
-       priority = solveKnapsack(be,
-                                be->session.mtu - sizeof(P2P_Message));
+       (*priority) = solveKnapsack(be,
+                                   be->session.mtu - sizeof(P2P_Message));
 #if DEBUG_COLLECT_PRIO == YES
-       FPRINTF(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+       FPRINTF(prioFile, 
+               "%llu 1 %d\n", 
+               cronTime(NULL),
+               priority);
 #endif
       }
     } else { /* never approximate < 50% CPU load */
-      priority = solveKnapsack(be,
-                              be->session.mtu - sizeof(P2P_Message));
+      (*priority) = solveKnapsack(be,
+                                 be->session.mtu - sizeof(P2P_Message));
 #if DEBUG_COLLECT_PRIO == YES
-      FPRINTF(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
+      FPRINTF(prioFile,
+             "%llu 2 %d\n", 
+             cronTime(NULL), 
+             priority);
 #endif
     }
     j = 0;
@@ -1052,113 +980,171 @@
            j,
            be->sendBuffer[j]->len,
            be->sendBuffer[j]->pri);
-      be->inSendBuffer = NO;
-      return;
+      return 0;
     }
 
     if (be->available_send_window < be->session.mtu) {
       /* if we have a very high priority, we may
         want to ignore bandwidth availability (e.g. for HANGUP,
         which  has EXTREME_PRIORITY) */
-      if (priority < EXTREME_PRIORITY) {
+      if ((*priority) < EXTREME_PRIORITY) {
 #if DEBUG_CONNECTION
        LOG(LOG_DEBUG,
            "bandwidth limits prevent sending (send window %u too small).\n",
            be->available_send_window);
 #endif
-       be->inSendBuffer = NO;
-       return; /* can not send, BPS available is too small */
+       return 0; /* can not send, BPS available is too small */
       }
     }
     totalMessageSize = be->session.mtu;
   } /* end MTU > 0 */
+  return totalMessageSize;
+}
 
-  expired = cronTime(NULL) - SECONDS_PINGATTEMPT * cronSECONDS;
+/**
+ * Expire old messages from SendBuffer (to avoid 
+ * running out of memory).
+ */
+static void expireSendBufferEntries(BufferEntry * be) {
+  int msgCap;
+  int i;
+  SendEntry * entry;
+  cron_t expired;
+  int l;
+  unsigned int freeSlots;
+  int j;
+
   /* if it's more than one connection "lifetime" old, always kill it! */
-
-  /* check if we (sender) have enough bandwidth available */
-  if (SYSERR == outgoingCheck(priority)) {
-    int msgCap;
-
-    cronTime(&be->lastSendAttempt);
+  expired = cronTime(&be->lastSendAttempt) - SECONDS_PINGATTEMPT * cronSECONDS;
 #if DEBUG_CONNECTION
-    LOG(LOG_DEBUG,
-       "policy prevents sending message (priority too low: %d)\n",
-       priority);
+  LOG(LOG_DEBUG,
+      "policy prevents sending message (priority too low: %d)\n",
+      priority);
 #endif
+  
+  l = getCPULoad();
+  /* cleanup queue */
+  if (l >= 50) {
+    msgCap = EXPECTED_MTU / sizeof(HashCode512);
+  } else {
+    if (l <= 0)
+      l = 1;
+    msgCap = EXPECTED_MTU / sizeof(HashCode512)
+      + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
+  }
+  if (be->max_bpm > 2) {
+    msgCap += 2 * (int) log((double)be->max_bpm);
+    if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
+      msgCap = MAX_SEND_BUFFER_SIZE-2; 
+    /* try to make sure that there
+       is always room... */
+  }
 
-    /* cleanup queue */
-    if (getCPULoad() > 50)
-      msgCap = 4;
-    else
-      msgCap = 54 - getCPULoad();
-    if (be->max_bpm > 2)
-      msgCap += 2 * (int) log((double)be->max_bpm);
-    /* allow at least msgCap msgs in buffer */
-    for (i=0;i<be->sendBufferSize;i++) {       
-      SendEntry * entry = be->sendBuffer[i];
+  freeSlots = 0;
+  /* allow at least msgCap msgs in buffer */
+  for (i=0;i<be->sendBufferSize;i++) 
+    if (be->sendBuffer[i] == NULL)
+      freeSlots++;
 
-      if (be->sendBufferSize <= msgCap)
-       break;
-      if ( entry->transmissionTime < expired) {
+  for (i=0;i<be->sendBufferSize;i++) {         
+    entry = be->sendBuffer[i];
+    if (entry == NULL)
+      continue;   
+    if (be->sendBufferSize <= msgCap + freeSlots)
+      break;
+    if (entry->transmissionTime < expired) {
 #if DEBUG_CONNECTION
-       LOG(LOG_DEBUG,
-           "expiring message, expired %ds ago, queue size is %u (bandwidth 
stressed)\n",
-           (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
-           be->sendBufferSize);
+      LOG(LOG_DEBUG,
+         "expiring message, expired %ds ago, queue size is %u (bandwidth 
stressed)\n",
+         (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+         be->sendBufferSize);
 #endif
-       if (stats != NULL) {
-         stats->change(stat_messagesDropped, 1);
-         stats->change(stat_sizeMessagesDropped, entry->len);
-       }
-       FREE(entry->closure);
-       FREE(entry);
-       be->sendBuffer[i] = be->sendBuffer[be->sendBufferSize-1];
-       GROW(be->sendBuffer,
-            be->sendBufferSize,
-            be->sendBufferSize-1);
-       i--; /* go again for this slot */
+      if (stats != NULL) {
+       stats->change(stat_messagesDropped, 1);
+       stats->change(stat_sizeMessagesDropped, entry->len);
       }
+      FREENONNULL(entry->closure);
+      FREE(entry);
+      be->sendBuffer[i] = NULL;
+      freeSlots++;
     }
-    be->inSendBuffer = NO;
-    return; /* deferr further */
-  }
+  } 
 
-  GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
+  /* cleanup/compact sendBuffer */
+  j = 0;
+  for (i=0;i<be->sendBufferSize;i++)
+    if (be->sendBuffer[i] != NULL) 
+      be->sendBuffer[j++] = be->sendBuffer[i];    
+  GROW(be->sendBuffer,
+       be->sendBufferSize,
+       j);  
+}
 
-  /* first, trigger callbacks on selected entries */
+/**
+ * For each SendEntry of the BE that has
+ * been selected by the knapsack solver,
+ * call the callback and make sure that the
+ * bytes are ready in entry->closure for
+ * transmission.<p>
+ *
+ * If the preparation fails for an entry,
+ * free it.
+ * @return number of prepared entries
+ */
+static unsigned int prepareSelectedMessages(BufferEntry * be) {
+  unsigned int ret;
+  int i;
+  char * tmpMsg;
+  SendEntry * entry;
+
+  ret = 0;
+
   for (i=0;i<be->sendBufferSize;i++) {
-    SendEntry * entry = be->sendBuffer[i];
+    entry = be->sendBuffer[i];
     tmpMsg = MALLOC(entry->len);
 
-    if ( (entry->knapsackSolution == YES) &&
-        (entry->callback != NULL) ) {
-      if (OK == entry->callback(tmpMsg,
-                               entry->closure,
-                               entry->len)) {
-       entry->callback = NULL;
-       entry->closure = tmpMsg;
+    if (entry->knapsackSolution == YES) {
+      if (entry->callback != NULL) {
+       if (OK == entry->callback(tmpMsg,
+                                 entry->closure,
+                                 entry->len)) {
+         entry->callback = NULL;
+         entry->closure = tmpMsg;
+         ret++;
+       } else {
+         FREE(tmpMsg);
+         entry->callback = NULL;
+         entry->closure = NULL;
+         FREE(entry);
+         be->sendBuffer[i] = NULL;
+       }
       } else {
-       FREE(tmpMsg);
-       entry->callback = NULL;
-       entry->closure = NULL;
-       FREE(entry);
-       be->sendBuffer[i] = NULL;
+       ret++;
       }
     }
   }
+  return ret;
+}
 
+/**
+ * Compute a random permuation of the send buffer
+ * entry such that the selected messages obey
+ * the SE flags.
+ */
+static int * permuteSendBuffer(BufferEntry * be) {
+  int * perm;
+  int headpos;
+  int tailpos;
+  int i;
+  int j;
+
   perm = permute(WEAK, be->sendBufferSize);
-  /* change permutation such that SE_FLAGS
-     are obeyed */
   headpos = 0;
   tailpos = be->sendBufferSize-1;
-  remainingBufferSize = be->sendBufferSize;
   for (i=0;i<be->sendBufferSize;i++) {
     if (be->sendBuffer[perm[i]] == NULL)
       continue;        
     if (be->sendBuffer[perm[i]]->knapsackSolution == YES) {
-      remainingBufferSize--;
       switch (be->sendBuffer[perm[i]]->flags & SE_PLACEMENT_FLAG) {
       case SE_FLAG_NONE:
        break;
@@ -1176,7 +1162,163 @@
       }
     }
   }
+  return perm;
+}
 
+/**
+ * Free entries in send buffer that were
+ * selected as the knapsack solution or
+ * that are dead (callback and closure NULL).
+ */
+static void freeSelectedEntries(BufferEntry * be) {
+  int i;
+  SendEntry * entry;
+
+  for (i=0;i<be->sendBufferSize;i++) {
+    entry = be->sendBuffer[i];
+    if (entry == NULL)
+      continue;
+    if (entry->knapsackSolution == YES) {
+      GNUNET_ASSERT(entry->callback == NULL);
+      FREENONNULL(entry->closure);
+      FREE(entry);
+      be->sendBuffer[i] = NULL;
+    } else if ( (entry->callback == NULL) &&
+               (entry->closure == NULL) ) {
+      FREE(entry);
+      be->sendBuffer[i] = NULL;
+    }      
+  }
+}
+
+/**
+ * Try to make sure that the transport service for the given buffer is
+ * connected.  If the transport service changes, this function also
+ * ensures that the pending messages are properly fragmented (if
+ * needed).
+ *
+ * @return OK on success, NO on error
+ */
+static int ensureTransportConnected(BufferEntry * be) {
+  SendEntry ** entries;
+  SendEntry * entry;
+  int i;
+  int ret;
+      
+  if (be->session.tsession == NULL) {
+    be->session.tsession
+      = transport->connectFreely(&be->session.sender,
+                                YES);
+    if (be->session.tsession == NULL)
+      return NO;
+    be->session.mtu
+      = transport->getMTU(be->session.tsession->ttype);
+    if (be->session.mtu > 0) {
+      /* MTU change may require new fragmentation! */
+      entries = be->sendBuffer;
+      i = 0;
+      ret = be->sendBufferSize;
+      /* assumes entries are sorted by priority! */
+      while (i < ret) {
+       entry = entries[i];
+       if (entry->len > be->session.mtu - sizeof(P2P_Message)) {
+         entries[i] = entries[--ret];
+         fragmentation->fragment(&be->session.sender,
+                                 be->session.mtu - sizeof(P2P_Message),
+                                 entry->pri,
+                                 entry->transmissionTime,
+                                 entry->len,
+                                 entry->callback,
+                                 entry->closure);
+         FREE(entry);
+       }
+       i++;
+      } 
+      if (ret != be->sendBufferSize)
+       GROW(be->sendBuffer,
+            be->sendBufferSize,
+            ret);
+    }
+  }
+  return OK;
+}
+
+
+/**
+ * Send a buffer; assumes that access is already synchronized.  This
+ * message solves the knapsack problem, assembles the message
+ * (callback to build parts from knapsack, callbacks for padding,
+ * random noise padding, crc, encryption) and finally hands the
+ * message to the transport service.
+ *
+ * @param be connection of the buffer that is to be transmitted
+ */
+static void sendBuffer(BufferEntry * be) {
+  unsigned int i;
+  unsigned int j;
+  unsigned int p;
+  unsigned int rsi;
+  SendCallbackList * pos;
+  P2P_Message * p2pHdr;
+  int priority;
+  int * perm;
+  char * plaintextMsg;
+  void * encryptedMsg;
+  unsigned int totalMessageSize;
+  int ret;
+
+  ENTRY();
+  /* fast ways out */
+  if (be == NULL) {
+    BREAK();
+    return;
+  }
+  if ( (be->status != STAT_UP) ||
+       (be->sendBufferSize == 0) ||
+       (be->inSendBuffer == YES) )
+    return; /* must not run */
+  be->inSendBuffer = YES;
+
+  if ( (OK != ensureTransportConnected(be)) ||
+       (be->sendBufferSize == 0) ||
+       (OK != checkSendFrequency(be)) ) {
+    be->inSendBuffer = NO;
+    return;
+  }
+
+  /* test if receiver has enough bandwidth available!  */
+  updateCurBPS(be);
+#if DEBUG_CONNECTION
+  LOG(LOG_DEBUG,
+      "receiver window available: %lld bytes (MTU: %u)\n",
+      be->available_send_window,
+      be->session.mtu);
+#endif
+
+  
+  totalMessageSize = selectMessagesToSend(be,
+                                         &priority);
+  if (totalMessageSize == 0) {
+    expireSendBufferEntries(be);
+    be->inSendBuffer = NO;
+    return; /* deferr further */
+  }
+  totalMessageSize += sizeof(P2P_Message);
+
+  /* check if we (sender) have enough bandwidth available 
+     if so, trigger callbacks on selected entries; if either
+     fails, return (but clean up garbage) */
+  if ( (SYSERR == outgoingCheck(priority)) ||
+       (0 == prepareSelectedMessages(be)) ) {
+    expireSendBufferEntries(be);
+    be->inSendBuffer = NO;
+    return; /* deferr further */
+  }
+
+  /* get permutation of SendBuffer Entries
+     such that SE_FLAGS are obeyed */
+  perm = permuteSendBuffer(be);
+
   /* build message (start with sequence number) */
   plaintextMsg = MALLOC(totalMessageSize);
   p2pHdr = (P2P_Message*) plaintextMsg;
@@ -1199,36 +1341,6 @@
             entry->closure,
             entry->len);
       p += entry->len;
-    } else {
-      int msgCap;
-      int l = getCPULoad();
-      if (l >= 50) {
-       msgCap = EXPECTED_MTU / sizeof(HashCode512);
-      } else {
-       if (l <= 0)
-         l = 1;
-       msgCap = EXPECTED_MTU / sizeof(HashCode512)
-         + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
-      }
-      if (be->max_bpm > 2) {
-       msgCap += 2 * (int) log((double)be->max_bpm);
-       if (msgCap >= MAX_SEND_BUFFER_SIZE-1)
-         msgCap = MAX_SEND_BUFFER_SIZE-2; /* try to make sure that there
-                                             is always room... */
-      }
-      if ( (remainingBufferSize > msgCap) &&
-          (entry->transmissionTime < expired) ) {
-#if DEBUG_CONNECTION
-       LOG(LOG_DEBUG,
-           "expiring message, expired %ds ago, queue size is %u (other 
messages went through)\n",
-           (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
-           remainingBufferSize);
-#endif
-       FREENONNULL(entry->closure);
-       FREE(entry);
-       be->sendBuffer[perm[i]] = NULL;
-       remainingBufferSize--;
-      }
     }
   }
   FREE(perm);
@@ -1317,21 +1429,7 @@
        j += plen;
       }
     }
-    for (i=0;i<be->sendBufferSize;i++) {
-      SendEntry * entry = be->sendBuffer[i];
-      if (entry == NULL)
-       continue;
-      if (entry->knapsackSolution == YES) {
-       GNUNET_ASSERT(entry->callback == NULL);
-       FREENONNULL(entry->closure);
-       FREE(entry);
-       be->sendBuffer[i] = NULL;
-      } else if ( (entry->callback == NULL) &&
-                 (entry->closure == NULL) ) {
-       FREE(entry);
-       be->sendBuffer[i] = NULL;
-      }      
-    }
+    freeSelectedEntries(be);
   }
   if ( (ret == SYSERR) &&
        (be->session.tsession != NULL) ) {
@@ -1341,15 +1439,7 @@
 
   FREE(encryptedMsg);
   FREE(plaintextMsg);
-
-  /* cleanup/compact sendBuffer */
-  j = 0;
-  for (i=0;i<be->sendBufferSize;i++)
-    if (be->sendBuffer[i] != NULL) 
-      be->sendBuffer[j++] = be->sendBuffer[i];    
-  GROW(be->sendBuffer,
-       be->sendBufferSize,
-       j);
+  expireSendBufferEntries(be);
   be->inSendBuffer = NO;
 }
 
@@ -2262,7 +2352,6 @@
  */
 void confirmSessionUp(const PeerIdentity * peer) {
   BufferEntry * be;
-  EncName enc;
 
   MUTEX_LOCK(&lock);
   be = lookForHost(peer);
@@ -2270,28 +2359,12 @@
     cronTime(&be->isAlive);
     identity->whitelistHost(peer);
     if ( ( (be->status & STAT_SKEY_SENT) > 0) &&
-        ( (be->status & STAT_SKEY_RECEIVED) > 0) ) {
-      if (be->session.tsession == NULL) 
-       be->session.tsession
-         = transport->connectFreely(&be->session.sender,
-                                    YES);      
-      if (be->session.tsession != NULL) {
-       be->session.mtu
-         = transport->getMTU(be->session.tsession->ttype);
-       if (be->status != STAT_UP) {
-         be->status = STAT_UP;
-         be->lastSequenceNumberReceived = 0;
-         be->lastSequenceNumberSend = 1;
-       }
-      } else {
-       IFLOG(LOG_WARNING,
-             hash2enc(&be->session.sender.hashPubKey,
-                      &enc));
-       LOG(LOG_WARNING,
-           _("Session with peer '%s' confirmed, "
-             "but I cannot connect! (bug?)\n"),
-           &enc);
-      }
+        ( (be->status & STAT_SKEY_RECEIVED) > 0) &&
+        (OK == ensureTransportConnected(be)) &&
+        (be->status != STAT_UP) ) {
+      be->status = STAT_UP;
+      be->lastSequenceNumberReceived = 0;
+      be->lastSequenceNumberSend = 1;  
     }
   }
   MUTEX_UNLOCK(&lock);
@@ -2532,6 +2605,7 @@
 void initConnection() {
   GNUNET_ASSERT(P2P_MESSAGE_OVERHEAD
                == sizeof(P2P_Message));
+  GNUNET_ASSERT(sizeof(HANGUP_Message) == 68);
   ENTRY();
   scl_nextHead
     = NULL;

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/transports/tcp.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -1406,6 +1406,8 @@
  * via a global and returns the udp transport API.
  */
 TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
+  GNUNET_ASSERT(sizeof(HostAddress) == 8);
+  GNUNET_ASSERT(sizeof(TCPMessagePack) == 4);
   GNUNET_ASSERT(sizeof(TCPWelcome) == 68);
   MUTEX_CREATE_RECURSIVE(&tcplock);
   reloadConfiguration();

Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2005-07-16 13:31:41 UTC (rev 1554)
+++ GNUnet/src/transports/udp.c 2005-07-16 15:51:25 UTC (rev 1555)
@@ -633,6 +633,7 @@
 TransportAPI * inittransport_udp(CoreAPIForTransport * core) {
   int mtu;
 
+  GNUNET_ASSERT(sizeof(HostAddress) == 8);
   GNUNET_ASSERT(sizeof(UDPMessage) == 68);
   coreAPI = core;
   stats = coreAPI->requestService("stats");





reply via email to

[Prev in Thread] Current Thread [Next in Thread]