[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3889 - GNUnet/src/server
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3889 - GNUnet/src/server |
Date: |
Thu, 7 Dec 2006 00:43:48 -0800 (PST) |
Author: grothoff
Date: 2006-12-07 00:43:46 -0800 (Thu, 07 Dec 2006)
New Revision: 3889
Modified:
GNUnet/src/server/connection.c
Log:
bugfixes
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-12-07 08:43:42 UTC (rev 3888)
+++ GNUnet/src/server/connection.c 2006-12-07 08:43:46 UTC (rev 3889)
@@ -269,20 +269,41 @@
* callback).
*/
typedef struct {
- /** how long is this message part expected to be? */
+
+ /**
+ * how long is this message part expected to be?
+ */
unsigned short len;
- /** flags */
+
+ /**
+ * flags
+ */
unsigned short flags;
- /** how important is this message part? */
+
+ /**
+ * how important is this message part?
+ */
unsigned int pri;
- /** when did we intend to transmit? */
+
+ /**
+ * when do/did we intend to transmit?
+ */
cron_t transmissionTime;
- /** callback to call to create the message part */
+
+ /**
+ * callback to call to create the message part
+ */
BuildMessageCallback callback;
- /** argument to callback, call FREENONNULL(closure) if we
- can not transmit this MessagePart. */
- void *closure;
- /** YES if selected by knapsack for sending */
+
+ /**
+ * argument to callback, call FREENONNULL(closure) if we
+ * can not transmit this MessagePart.
+ */
+ void * closure;
+
+ /**
+ * YES if selected by knapsack for sending
+ */
int knapsackSolution;
} SendEntry;
@@ -314,40 +335,74 @@
* Type of the connection table.
*/
typedef struct BufferEntry_ {
- /** Session for the connection */
+ /**
+ * Session for the connection
+ */
Session session;
- /** the current session key used for encryption */
+
+ /**
+ * the current session key used for encryption
+ */
SESSIONKEY skey_local;
- /** at which time was the local sessionkey created */
+
+ /**
+ * at which time was the local sessionkey created
+ */
TIME_T skey_local_created;
- /** the current session key used for decryption */
+
+ /**
+ * the current session key used for decryption
+ */
SESSIONKEY skey_remote;
- /** at which time was the remote sessionkey created */
+
+ /**
+ * at which time was the remote sessionkey created
+ */
TIME_T skey_remote_created;
- /** is this host alive? timestamp of the time of the last-active
- point (as witnessed by some higher-level application, typically
- topology+pingpong) */
+
+ /**
+ * is this host alive? timestamp of the time of the last-active
+ * point (as witnessed by some higher-level application, typically
+ * topology+pingpong)
+ */
cron_t isAlive;
- /** Status of the connection (STAT_XXX) */
+
+ /**
+ * Status of the connection (STAT_XXX)
+ */
unsigned int status;
- /** last sequence number received on this connection (highest) */
+ /**
+ * last sequence number received on this connection (highest)
+ */
unsigned int lastSequenceNumberReceived;
- /** bit map indicating which of the 32 sequence numbers before the last
- were received (good for accepting out-of-order packets and
- estimating reliability of the connection) */
+
+ /**
+ * bit map indicating which of the 32 sequence numbers before the last
+ * were received (good for accepting out-of-order packets and
+ * estimating reliability of the connection)
+ */
unsigned int lastPacketsBitmap;
- /** last sequence number transmitted */
+
+ /**
+ * last sequence number transmitted
+ */
unsigned int lastSequenceNumberSend;
- /** number of entries in the send buffer */
+ /**
+ * number of entries in the send buffer
+ */
unsigned int sendBufferSize;
- /** buffer of entries waiting to be transmitted */
+ /**
+ * buffer of entries waiting to be transmitted
+ */
SendEntry **sendBuffer;
- /** time of the last send-attempt (to avoid
- solving knapsack's too often) */
+ /**
+ * time of the last send-attempt (to avoid
+ * solving knapsack's too often)
+ */
cron_t lastSendAttempt;
/**
@@ -360,44 +415,64 @@
*/
cron_t MAX_SEND_FREQUENCY;
- /** a hash collision overflow chain */
+ /**
+ * a hash collision overflow chain
+ */
struct BufferEntry_ *overflowChain;
/* *********** outbound bandwidth limits ********** */
- /** byte-per-minute limit for this connection */
+ /**
+ * byte-per-minute limit for this connection
+ */
unsigned int max_bpm;
- /** current bps (actually bytes per minute) for this connection
- (incremented every minute by max_bpm,
- bounded by max_bpm * secondsInactive/2;
- may get negative if we have VERY high priority
- content) */
+ /**
+ * current bps (actually bytes per minute) for this connection
+ * (incremented every minute by max_bpm, bounded by max_bpm *
+ * secondsInactive/2; may get negative if we have VERY high priority
+ * content) */
long long available_send_window;
- /** time of the last increment of available_send_window */
+ /**
+ * time of the last increment of available_send_window
+ */
cron_t last_bps_update;
/* *********** inbound bandwidth accounting ******** */
- /** how much traffic (bytes) did we receive on this connection since
- the last update-round? */
+ /**
+ * how much traffic (bytes) did we receive on this connection since
+ * the last update-round?
+ */
long long recently_received;
- /** How valueable were the messages of this peer recently? */
+ /**
+ * How valueable were the messages of this peer recently?
+ */
double current_connection_value;
- /** the highest bandwidth limit that a well-behaved peer
- must have received by now */
+ /**
+ * the highest bandwidth limit that a well-behaved peer
+ * must have received by now
+ */
unsigned int max_transmitted_limit;
- /** what is the limit that we are currently shooting for? (byte per minute)
*/
+ /**
+ * what is the limit that we are currently shooting for? (bytes per minute)
+ */
unsigned int idealized_limit;
+ /**
+ * How often has the other peer violated the traffic bounds
+ * recently?
+ */
unsigned int violations;
- /** are we currently in "sendBuffer" for this entry? */
+ /**
+ * are we currently in "sendBuffer" for this entry?
+ */
int inSendBuffer;
} BufferEntry;
@@ -481,6 +556,12 @@
static unsigned long long max_bpm;
/**
+ * What is the available upstream bandwidth (in bytes
+ * per minute)?
+ */
+static unsigned long long max_bpm_up;
+
+/**
* Registered Send-Notify handlers.
*/
static MessagePartHandler *rsns;
@@ -522,18 +603,23 @@
/* ******************** CODE ********************* */
#if DEBUG_CONNECTION
-static void printMsg(const char *prefix, PeerIdentity * sender,
- SESSIONKEY * key, const INITVECTOR * iv, int crc) {
+static void printMsg(const char *prefix,
+ const PeerIdentity * sender,
+ const SESSIONKEY * key,
+ const INITVECTOR * iv,
+ int crc) {
char skey[65];
char *dst;
int idx;
EncName enc;
- hash2enc(&sender->hashPubKey, &enc);
-
+ hash2enc(&sender->hashPubKey,
+ &enc);
dst = skey;
for(idx = 0; idx < SESSIONKEY_LEN; idx++) {
- sprintf(dst, "%02x", key->key[idx]);
+ sprintf(dst,
+ "%02x",
+ key->key[idx]);
dst += 2;
}
*dst = 0;
@@ -544,7 +630,7 @@
prefix,
&enc,
skey,
- *((int *) iv),
+ *((const int *) iv),
crc);
}
#endif
@@ -1767,10 +1853,9 @@
int count = 0;
BufferEntry *be;
- ENTRY();
- for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
+ for(i=0;i<CONNECTION_MAX_HOSTS_;i++) {
be = CONNECTION_buffer_[i];
- while(be != NULL) {
+ while (be != NULL) {
if(be->status == STAT_UP) {
if(method != NULL)
method(be, arg);
@@ -1952,8 +2037,6 @@
int load;
int * perm;
EncName enc;
- unsigned long long total_allowed_sent;
- unsigned long long total_allowed_recv;
MUTEX_LOCK(lock);
now = get_time();
@@ -2031,7 +2114,10 @@
Download);
if (load > 100) /* take counter measure */
schedulableBandwidth = schedulableBandwidth * 100 / load;
-
+#if 0
+ printf("Scheduling %llu bytes per minute\n",
+ schedulableBandwidth);
+#endif
/* compute recent activity profile of the peer */
adjustedRR = MALLOC(sizeof(long long) * activePeerCount);
GE_ASSERT(ectx,
@@ -2207,7 +2293,7 @@
share =
entries[v]->idealized_limit +
(unsigned int) (schedulableBandwidth / activePeerCount);
- if (share > entries[v]->idealized_limit) { /* no int-overflow? */
+ if (share >= entries[v]->idealized_limit) { /* no int-overflow? */
entries[v]->idealized_limit = share;
} else {
entries[v]->idealized_limit = 0xFFFF0000;
@@ -2258,8 +2344,6 @@
FREE(adjustedRR);
FREE(shares);
- total_allowed_sent = 0;
- total_allowed_recv = 0;
for (u=0;u<activePeerCount;u++) {
BufferEntry * be = entries[u];
@@ -2281,16 +2365,13 @@
be->idealized_limit = MIN_BPM_PER_PEER;
shutdownConnection(be);
} else {
- total_allowed_sent += be->max_bpm;
- total_allowed_recv += be->idealized_limit;
+#if 0
+ printf("Assigned %u bytes to peer %u\n",
+ be->idealized_limit,
+ u);
+#endif
}
}
- if (stats != NULL) {
- stats->set(stat_total_allowed_sent,
- total_allowed_sent);
- stats->set(stat_total_allowed_recv,
- total_allowed_recv);
- }
FREE(entries);
MUTEX_UNLOCK(lock);
@@ -2310,9 +2391,13 @@
BufferEntry * tmp;
cron_t now;
int i;
+ unsigned long long total_allowed_sent;
+ unsigned long long total_allowed_recv;
scheduleInboundTraffic();
now = get_time();
+ total_allowed_sent = 0;
+ total_allowed_recv = 0;
MUTEX_LOCK(lock);
for (i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
root = CONNECTION_buffer_[i];
@@ -2330,6 +2415,8 @@
FREE(tmp);
continue; /* no need to call 'send buffer' */
case STAT_UP:
+ total_allowed_sent += root->max_bpm;
+ total_allowed_recv += root->idealized_limit;
if ( (now > root->isAlive) && /* concurrency might make this false...
*/
(now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS)) {
EncName enc;
@@ -2399,6 +2486,14 @@
} /* end of while */
} /* for all buckets */
MUTEX_UNLOCK(lock);
+ if (stats != NULL) {
+ if (total_allowed_sent > max_bpm_up)
+ total_allowed_sent = max_bpm_up;
+ stats->set(stat_total_allowed_sent,
+ total_allowed_sent / 60); /* bpm to bps */
+ stats->set(stat_total_allowed_recv,
+ total_allowed_recv / 60); /* bpm to bps */
+ }
}
/**
@@ -2591,7 +2686,9 @@
* YES if it is the key for sending
*/
void assignSessionKey(const SESSIONKEY * key,
- const PeerIdentity * peer, TIME_T age, int forSending) {
+ const PeerIdentity * peer,
+ TIME_T age,
+ int forSending) {
BufferEntry *be;
MUTEX_LOCK(lock);
@@ -2820,6 +2917,14 @@
50000, /* default: 50 kbps */
&new_max_bpm))
return SYSERR;
+ GC_get_configuration_value_number(cfg,
+ "LOAD",
+ "MAXNETUPBPSTOTAL",
+ 0,
+ ((unsigned long long)-1)/60,
+ 50000, /* default: 50 kbps */
+ &max_bpm_up);
+ max_bpm_up *= 60; /* bps -> bpm */
MUTEX_LOCK(lock);
new_max_bpm = 60 * new_max_bpm;
if(max_bpm != new_max_bpm) {
@@ -2972,9 +3077,9 @@
stat_noise_sent
= stats->create(gettext_noop("# bytes noise sent"));
stat_total_allowed_sent
- = stats->create(gettext_noop("# total advertised bytes per minute
received limit"));
+ = stats->create(gettext_noop("# total advertised bytes per second
received limit"));
stat_total_allowed_recv
- = stats->create(gettext_noop("# total allowed bytes per minute
transmission limit"));
+ = stats->create(gettext_noop("# total allowed bytes per second
transmission limit"));
}
transport->start(&core_receive);
}
@@ -3250,14 +3355,16 @@
BuildMessageCallback callback,
void *closure,
unsigned short len,
- unsigned int importance, unsigned int maxdelay) {
+ unsigned int importance,
+ unsigned int maxdelay) {
BufferEntry *be;
#if DEBUG_CONNECTION
EncName enc;
IF_GELOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&hostId->hashPubKey, &enc));
+ hash2enc(&hostId->hashPubKey,
+ &enc));
GE_LOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
"%s: sending message to host %s message of size %d\n",
@@ -3298,7 +3405,8 @@
*/
void unicast(const PeerIdentity * receiver,
const MESSAGE_HEADER * msg,
- unsigned int importance, unsigned int maxdelay) {
+ unsigned int importance,
+ unsigned int maxdelay) {
char *closure;
unsigned short len;
@@ -3320,7 +3428,11 @@
closure = MALLOC(len);
memcpy(closure, msg, len);
unicastCallback(receiver,
- ©Callback, closure, len, importance, maxdelay);
+ ©Callback,
+ closure,
+ len,
+ importance,
+ maxdelay);
}
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3889 - GNUnet/src/server,
grothoff <=