[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r2604 - GNUnet/src/server
From: |
durner |
Subject: |
[GNUnet-SVN] r2604 - GNUnet/src/server |
Date: |
Sat, 1 Apr 2006 07:53:11 -0800 (PST) |
Author: durner
Date: 2006-04-01 07:53:05 -0800 (Sat, 01 Apr 2006)
New Revision: 2604
Modified:
GNUnet/src/server/connection.c
Log:
reformat
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-04-01 14:18:46 UTC (rev 2603)
+++ GNUnet/src/server/connection.c 2006-04-01 15:53:05 UTC (rev 2604)
@@ -197,7 +197,7 @@
#endif
#if DEBUG_COLLECT_PRIO == YES
-FILE * prioFile;
+FILE *prioFile;
#endif
@@ -222,14 +222,14 @@
/**
* Did we say that this is a linked list?
*/
- struct SendCallbackList__ * next;
+ struct SendCallbackList__ *next;
} SendCallbackList;
typedef struct fENHWrap {
PerNodeCallback method;
- void * arg;
+ void *arg;
} fENHWrap;
@@ -280,7 +280,7 @@
BuildMessageCallback callback;
/** argument to callback, call FREENONNULL(closure) if we
can not transmit this MessagePart. */
- void * closure;
+ void *closure;
/** YES if selected by knapsack for sending */
int knapsackSolution;
} SendEntry;
@@ -304,7 +304,7 @@
/**
* The session handle specific for the transport service.
*/
- TSession * tsession;
+ TSession *tsession;
} Session;
@@ -343,7 +343,7 @@
unsigned int sendBufferSize;
/** buffer of entries waiting to be transmitted */
- SendEntry ** sendBuffer;
+ SendEntry **sendBuffer;
/** time of the last send-attempt (to avoid
solving knapsack's too often) */
@@ -360,7 +360,7 @@
cron_t MAX_SEND_FREQUENCY;
/** a hash collision overflow chain */
- struct BufferEntry_ * overflowChain;
+ struct BufferEntry_ *overflowChain;
/* *********** outbound bandwidth limits ********** */
@@ -396,11 +396,11 @@
/* are we currently in "sendBuffer" for this
entry? */
int inSendBuffer;
-
+
} BufferEntry;
typedef struct {
- BufferEntry ** e;
+ BufferEntry **e;
unsigned int pos;
} UTL_Closure;
@@ -409,45 +409,44 @@
* @param be the buffer entry
* @param data context for callee
*/
-typedef void (*BufferEntryCallback)(BufferEntry * be,
- void * data);
+typedef void (*BufferEntryCallback) (BufferEntry * be, void *data);
/* ***************** globals ********************** */
/**
* Transport service
*/
-static Transport_ServiceAPI * transport;
+static Transport_ServiceAPI *transport;
/**
* Identity service
*/
-static Identity_ServiceAPI * identity;
+static Identity_ServiceAPI *identity;
/**
* Session service
*/
-static Session_ServiceAPI * session;
+static Session_ServiceAPI *session;
/**
* Fragmentation service
*/
-static Fragmentation_ServiceAPI * fragmentation;
+static Fragmentation_ServiceAPI *fragmentation;
/**
* Topology service
*/
-static Topology_ServiceAPI * topology;
+static Topology_ServiceAPI *topology;
/**
* Stats service (maybe NULL!)
*/
-static Stats_ServiceAPI * stats;
+static Stats_ServiceAPI *stats;
/**
* The buffer containing all current connections.
*/
-static BufferEntry ** CONNECTION_buffer_;
+static BufferEntry **CONNECTION_buffer_;
/**
* Size of the CONNECTION_buffer_
@@ -463,8 +462,8 @@
/**
* Send callbacks for making better use of noise padding...
*/
-static SendCallbackList * scl_nextHead;
-static SendCallbackList * scl_nextTail;
+static SendCallbackList *scl_nextHead;
+static SendCallbackList *scl_nextTail;
/**
* Lock for the connection module.
@@ -480,7 +479,7 @@
/**
* Registered Send-Notify handlers.
*/
-static MessagePartHandler * rsns;
+static MessagePartHandler *rsns;
/**
* Size of rsns.
@@ -502,25 +501,25 @@
/* ******************** CODE ********************* */
#if DEBUG_CONNECTION
-static void printMsg(const char *prefix, PeerIdentity *sender,
- SESSIONKEY *key, const INITVECTOR *iv, int crc) {
+static void printMsg(const char *prefix, PeerIdentity * sender,
+ SESSIONKEY * key, const INITVECTOR * iv, int crc) {
char skey[65];
char *dst;
int idx;
EncName enc;
-
+
hash2enc(&sender->hashPubKey, &enc);
-
+
dst = skey;
- for (idx=0; idx < SESSIONKEY_LEN; idx++) {
+ for(idx = 0; idx < SESSIONKEY_LEN; idx++) {
sprintf(dst, "%02x", key->key[idx]);
dst += 2;
}
*dst = 0;
-
+
LOG(LOG_DEBUG,
"%s: Sender `%s', key `%s', IV %u msg CRC %u\n",
- prefix, &enc, skey, *((int *)iv), crc);
+ prefix, &enc, skey, *((int *) iv), crc);
}
#endif
@@ -528,41 +527,26 @@
* This allocates and initializes a BufferEntry.
* @return the initialized BufferEntry
*/
-static BufferEntry * initBufferEntry() {
- BufferEntry * be;
+static BufferEntry *initBufferEntry() {
+ BufferEntry *be;
- be = (BufferEntry*) MALLOC(sizeof(BufferEntry));
+ be = (BufferEntry *) MALLOC(sizeof(BufferEntry));
memset(be, 0, sizeof(BufferEntry));
- be->isAlive
- = 0;
- be->status
- = STAT_DOWN;
- be->sendBuffer
- = NULL;
- be->sendBufferSize
- = 0;
- be->overflowChain
- = NULL;
- be->session.tsession
- = NULL;
- be->max_bpm
- = MIN_BPM_PER_PEER;
- be->available_send_window
- = be->max_bpm;
- be->recently_received
- = 0;
- be->current_connection_value
- = 0.0;
- be->idealized_limit
- = MIN_BPM_PER_PEER;
- be->max_transmitted_limit
- = MIN_BPM_PER_PEER;
- be->lastSendAttempt
- = 0; /* never */
- be->MAX_SEND_FREQUENCY
- = 50 * cronMILLIS * getCPULoad();
- be->inSendBuffer
- = NO;
+ be->isAlive = 0;
+ be->status = STAT_DOWN;
+ be->sendBuffer = NULL;
+ be->sendBufferSize = 0;
+ be->overflowChain = NULL;
+ be->session.tsession = NULL;
+ be->max_bpm = MIN_BPM_PER_PEER;
+ be->available_send_window = be->max_bpm;
+ be->recently_received = 0;
+ be->current_connection_value = 0.0;
+ be->idealized_limit = MIN_BPM_PER_PEER;
+ be->max_transmitted_limit = MIN_BPM_PER_PEER;
+ be->lastSendAttempt = 0; /* never */
+ be->MAX_SEND_FREQUENCY = 50 * cronMILLIS * getCPULoad();
+ be->inSendBuffer = NO;
cronTime(&be->last_bps_update); /* now */
return be;
}
@@ -576,14 +560,14 @@
cron_t delta;
cronTime(&now);
- if (now <= be->last_bps_update)
+ if(now <= be->last_bps_update)
return;
delta = now - be->last_bps_update;
- if (be->max_bpm * delta < cronMINUTES)
+ if(be->max_bpm * delta < cronMINUTES)
return;
be->available_send_window =
be->available_send_window + be->max_bpm * delta / cronMINUTES;
- if (be->available_send_window > (long long) be->max_bpm * MAX_BUF_FACT)
+ if(be->available_send_window > (long long) be->max_bpm * MAX_BUF_FACT)
be->available_send_window = (long long) be->max_bpm * MAX_BUF_FACT;
be->last_bps_update = now;
}
@@ -597,7 +581,7 @@
* @return gcd(a,b)
*/
static int gcd(int a, int b) {
- while (a != 0) {
+ while(a != 0) {
int t = a;
a = b % a;
b = t;
@@ -620,11 +604,10 @@
* @return the overall priority that was achieved
*/
static unsigned int
-approximateKnapsack(BufferEntry * be,
- unsigned int available) {
+approximateKnapsack(BufferEntry * be, unsigned int available) {
unsigned int i;
unsigned int count;
- SendEntry ** entries;
+ SendEntry **entries;
int max;
int left;
@@ -633,12 +616,13 @@
left = available;
max = 0;
- for (i=0;i<count;i++) {
- if (entries[i]->len <= left) {
+ for(i = 0; i < count; i++) {
+ if(entries[i]->len <= left) {
entries[i]->knapsackSolution = YES;
left -= entries[i]->len;
max += entries[i]->pri;
- } else {
+ }
+ else {
entries[i]->knapsackSolution = NO;
}
}
@@ -658,21 +642,19 @@
* @param available what is the maximum length available?
* @return the overall priority that was achieved
*/
-static unsigned int
-solveKnapsack(BufferEntry * be,
- unsigned int available) {
+static unsigned int solveKnapsack(BufferEntry * be, unsigned int available) {
unsigned int i;
int j;
int max;
- long long * v;
- int * efflen;
+ long long *v;
+ int *efflen;
cron_t startTime;
cron_t endTime;
- SendEntry ** entries;
+ SendEntry **entries;
unsigned int count;
#define VARR(i,j) v[(i)+(j)*(count+1)]
- if (available < 0) {
+ if(available < 0) {
BREAK();
return -1;
}
@@ -683,14 +665,14 @@
/* fast test: schedule everything? */
max = 0;
- for (i=0;i<count;i++)
+ for(i = 0; i < count; i++)
max += entries[i]->len;
- if (max <= available) {
+ if(max <= available) {
/* short cut: take everything! */
- for (i=0;i<count;i++)
+ for(i = 0; i < count; i++)
entries[i]->knapsackSolution = YES;
max = 0;
- for (i=0;i<count;i++)
+ for(i = 0; i < count; i++)
max += entries[i]->pri;
return max;
}
@@ -699,76 +681,75 @@
to reduce cost to O(count*available/gcd) in terms of
CPU and memory. Since gcd is almost always at least
4, this is probably a good idea (TM) :-) */
- efflen = MALLOC(sizeof(int)*count);
+ efflen = MALLOC(sizeof(int) * count);
max = available;
- for (i=0;i<count;i++) {
- if (entries[i]->len > 0)
+ for(i = 0; i < count; i++) {
+ if(entries[i]->len > 0)
max = gcd(max, entries[i]->len);
}
GNUNET_ASSERT(max != 0);
available = available / max;
- for (i=0;i<count;i++)
+ for(i = 0; i < count; i++)
efflen[i] = entries[i]->len / max;
/* dynamic programming:
VARR(i,j) stores the maximum value of any subset
of objects {1, ... i} that can fit into a knapsack
of weight j. */
- v = MALLOC(sizeof(long long) * (count+1) * (available+1));
- memset(v,
- 0,
- sizeof(long long) * (count+1) * (available+1));
- for (j=1;j<=available;j++)
- VARR(0,j) = -1;
- for (i=1;i<=count;i++) {
- for (j=0;j<=available;j++) {
+ v = MALLOC(sizeof(long long) * (count + 1) * (available + 1));
+ memset(v, 0, sizeof(long long) * (count + 1) * (available + 1));
+ for(j = 1; j <= available; j++)
+ VARR(0, j) = -1;
+ for(i = 1; i <= count; i++) {
+ for(j = 0; j <= available; j++) {
int take_val;
int leave_val;
take_val = -1;
- leave_val = VARR(i-1,j);
- if (j >= efflen[i-1]) {
- take_val = entries[i-1]->pri + VARR(i-1, j-efflen[i-1]);
- if (leave_val > take_val)
- VARR(i,j) = leave_val;
- else
- VARR(i,j) = take_val;
- } else
- VARR(i,j) = leave_val;
+ leave_val = VARR(i - 1, j);
+ if(j >= efflen[i - 1]) {
+ take_val = entries[i - 1]->pri + VARR(i - 1, j - efflen[i - 1]);
+ if(leave_val > take_val)
+ VARR(i, j) = leave_val;
+ else
+ VARR(i, j) = take_val;
+ }
+ else
+ VARR(i, j) = leave_val;
/*
- printf("i: %d j: %d (of %d) efflen: %d take: %d "
- "leave %d e[i-1]->pri %d VAR(i-1,j-eff) %lld VAR(i,j) %lld\n",
- i,
- j,
- available,
- efflen[i-1],
- take_val,
- leave_val,
- entries[i-1]->pri,
- VARR(i-1,j-efflen[i-1]),
- VARR(i,j));
- */
+ printf("i: %d j: %d (of %d) efflen: %d take: %d "
+ "leave %d e[i-1]->pri %d VAR(i-1,j-eff) %lld VAR(i,j) %lld\n",
+ i,
+ j,
+ available,
+ efflen[i-1],
+ take_val,
+ leave_val,
+ entries[i-1]->pri,
+ VARR(i-1,j-efflen[i-1]),
+ VARR(i,j));
+ */
}
}
/* find slot with max value, prefer long messages! */
max = 0;
j = -1;
- for (i=0;(int)i<=available;i++) {
- if (VARR(count, i) >= max) {
+ for(i = 0; (int) i <= available; i++) {
+ if(VARR(count, i) >= max) {
j = i;
max = VARR(count, i);
}
}
/* reconstruct selection */
- for (i=0;i<count;i++)
+ for(i = 0; i < count; i++)
entries[i]->knapsackSolution = NO;
- for (i=count;i>0;i--) {
- if (j >= efflen[i-1]) {
- if (VARR(i-1, j-efflen[i-1]) + entries[i-1]->pri == VARR(i,j)) {
- j -= efflen[i-1];
- entries[i-1]->knapsackSolution = YES;
+ for(i = count; i > 0; i--) {
+ if(j >= efflen[i - 1]) {
+ if(VARR(i - 1, j - efflen[i - 1]) + entries[i - 1]->pri == VARR(i, j)) {
+ j -= efflen[i - 1];
+ entries[i - 1]->knapsackSolution = YES;
}
}
}
@@ -792,19 +773,20 @@
int load;
unsigned int delta;
- load = getNetworkLoadUp(); /* how much free bandwidth do we have? */
- if (load >= 150) {
- return SYSERR; /* => always drop */
+ load = getNetworkLoadUp(); /* how much free bandwidth do we have? */
+ if(load >= 150) {
+ return SYSERR; /* => always drop */
}
- if (load > 100) {
- if (priority >= EXTREME_PRIORITY) {
- return OK; /* allow administrative msgs */
- } else {
- return SYSERR; /* but nothing else */
+ if(load > 100) {
+ if(priority >= EXTREME_PRIORITY) {
+ return OK; /* allow administrative msgs */
}
+ else {
+ return SYSERR; /* but nothing else */
+ }
}
- if (load <= 50) { /* everything goes */
- return OK; /* allow */
+ if(load <= 50) { /* everything goes */
+ return OK; /* allow */
}
/* Now load in [51, 100]. Between 51% and 100% load:
at 51% require priority >= 1 = (load-50)^3
@@ -812,28 +794,23 @@
at 75% require priority >= 15626 = (load-50)^3
at 100% require priority >= 125000 = (load-50)^3
(cubic function)
- */
- delta = load - 50; /* now delta is in [1,50] with 50 == 100% load */
- if (delta * delta * delta > priority ) {
+ */
+ delta = load - 50; /* now delta is in [1,50] with 50 == 100% load
*/
+ if(delta * delta * delta > priority) {
#if DEBUG_POLICY
LOG(LOG_DEBUG,
- "Network load is too high (%d%%, priority is %u, require %d), "
- "dropping outgoing.\n",
- load,
- priority,
- delta * delta * delta);
+ "Network load is too high (%d%%, priority is %u, require %d), "
+ "dropping outgoing.\n", load, priority, delta * delta * delta);
#endif
- return SYSERR; /* drop */
- } else {
+ return SYSERR; /* drop */
+ }
+ else {
#if DEBUG_POLICY
LOG(LOG_DEBUG,
- "Network load is ok (%d%%, priority is %u >= %d), "
- "sending outgoing.\n",
- load,
- priority,
- delta * delta * delta);
+ "Network load is ok (%d%%, priority is %u >= %d), "
+ "sending outgoing.\n", load, priority, delta * delta * delta);
#endif
- return OK; /* allow */
+ return OK; /* allow */
}
}
@@ -844,31 +821,30 @@
* @return OK if sending a message now is acceptable
*/
static int checkSendFrequency(BufferEntry * be) {
- if (be->max_bpm == 0)
+ if(be->max_bpm == 0)
be->max_bpm = 1;
- if (be->session.mtu == 0) {
- be->MAX_SEND_FREQUENCY = /* ms per message */
- EXPECTED_MTU
- / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
- / 2;
- } else {
- be->MAX_SEND_FREQUENCY = /* ms per message */
- be->session.mtu /* byte per message */
- / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
- / 2; /* some head-room */
+ if(be->session.mtu == 0) {
+ be->MAX_SEND_FREQUENCY = /* ms per message */
+ EXPECTED_MTU / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms
*/
+ /2;
}
+ else {
+ be->MAX_SEND_FREQUENCY = /* ms per message */
+ be->session.mtu /* byte per message */
+ / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
+ /2; /* some head-room */
+ }
/* Also: allow at least MINIMUM_SAMPLE_COUNT knapsack
solutions for any MIN_SAMPLE_TIME! */
- if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
+ if(be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
- if (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) {
+ if(be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) {
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Send frequency too high (CPU load), send deferred.\n");
+ LOG(LOG_DEBUG, "Send frequency too high (CPU load), send deferred.\n");
#endif
- return NO; /* frequency too high, wait */
+ return NO; /* frequency too high, wait */
}
return OK;
}
@@ -891,7 +867,7 @@
totalMessageSize = 0;
(*priority) = 0;
- for (i=be->sendBufferSize-1;i>=0;i--)
+ for(i = be->sendBufferSize - 1; i >= 0; i--)
be->sendBuffer[i]->knapsackSolution = NO;
if(be->session.mtu == 0) {
@@ -1030,50 +1006,50 @@
static void expireSendBufferEntries(BufferEntry * be) {
unsigned long long msgCap;
int i;
- SendEntry * entry;
+ SendEntry *entry;
cron_t expired;
int l;
unsigned long long usedBytes;
int j;
/* if it's more than one connection "lifetime" old, always kill it! */
- expired = cronTime(&be->lastSendAttempt) - SECONDS_PINGATTEMPT * cronSECONDS;
+ expired =
+ cronTime(&be->lastSendAttempt) - SECONDS_PINGATTEMPT * cronSECONDS;
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "policy prevents sending message\n");
+ LOG(LOG_DEBUG, "policy prevents sending message\n");
#endif
l = getCPULoad();
/* cleanup queue */
- msgCap = be->max_bpm; /* have minute of msgs, but at least one MTU */
- if (msgCap < EXPECTED_MTU)
+ msgCap = be->max_bpm; /* have minute of msgs, but at least one MTU */
+ if(msgCap < EXPECTED_MTU)
msgCap = EXPECTED_MTU;
- if (l < 50) { /* afford more if CPU load is low */
- if (l <= 0)
+ if(l < 50) { /* afford more if CPU load is low */
+ if(l <= 0)
l = 1;
msgCap += (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU) / l;
}
usedBytes = 0;
/* allow at least msgCap bytes in buffer */
- for (i=0;i<be->sendBufferSize;i++)
- if (be->sendBuffer[i] != NULL)
+ for(i = 0; i < be->sendBufferSize; i++)
+ if(be->sendBuffer[i] != NULL)
usedBytes += be->sendBuffer[i]->len;
- for (i=0;i<be->sendBufferSize;i++) {
+ for(i = 0; i < be->sendBufferSize; i++) {
entry = be->sendBuffer[i];
- if (entry == NULL)
+ if(entry == NULL)
continue;
- if (entry->transmissionTime <= expired) {
+ if(entry->transmissionTime <= expired) {
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
- "expiring message, expired %ds ago, queue size is %llu (bandwidth
stressed)\n",
- (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
- usedBytes);
+ "expiring message, expired %ds ago, queue size is %llu (bandwidth
stressed)\n",
+ (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+ usedBytes);
#endif
- if (stats != NULL) {
- stats->change(stat_messagesDropped, 1);
- stats->change(stat_sizeMessagesDropped, entry->len);
+ if(stats != NULL) {
+ stats->change(stat_messagesDropped, 1);
+ stats->change(stat_sizeMessagesDropped, entry->len);
}
FREENONNULL(entry->closure);
usedBytes -= entry->len;
@@ -1084,12 +1060,10 @@
/* cleanup/compact sendBuffer */
j = 0;
- for (i=0;i<be->sendBufferSize;i++)
- if (be->sendBuffer[i] != NULL)
+ for(i = 0; i < be->sendBufferSize; i++)
+ if(be->sendBuffer[i] != NULL)
be->sendBuffer[j++] = be->sendBuffer[i];
- GROW(be->sendBuffer,
- be->sendBufferSize,
- j);
+ GROW(be->sendBuffer, be->sendBufferSize, j);
}
/**
@@ -1106,46 +1080,42 @@
static unsigned int prepareSelectedMessages(BufferEntry * be) {
unsigned int ret;
int i;
- char * tmpMsg;
- SendEntry * entry;
+ char *tmpMsg;
+ SendEntry *entry;
ret = 0;
- for (i=0;i<be->sendBufferSize;i++) {
+ for(i = 0; i < be->sendBufferSize; i++) {
entry = be->sendBuffer[i];
- if (entry->knapsackSolution == YES) {
- if (entry->callback != NULL) {
- tmpMsg = MALLOC(entry->len);
- if (OK == entry->callback(tmpMsg,
- entry->closure,
- entry->len)) {
- entry->callback = NULL;
- entry->closure = tmpMsg;
- ret++;
- } else {
- FREE(tmpMsg);
- entry->callback = NULL;
- entry->closure = NULL;
- FREE(entry);
- be->sendBuffer[i] = NULL;
- }
- } else {
- ret++;
+ if(entry->knapsackSolution == YES) {
+ if(entry->callback != NULL) {
+ tmpMsg = MALLOC(entry->len);
+ if(OK == entry->callback(tmpMsg, entry->closure, entry->len)) {
+ entry->callback = NULL;
+ entry->closure = tmpMsg;
+ ret++;
+ }
+ else {
+ FREE(tmpMsg);
+ entry->callback = NULL;
+ entry->closure = NULL;
+ FREE(entry);
+ be->sendBuffer[i] = NULL;
+ }
}
+ else {
+ ret++;
+ }
#if 0
{
- P2P_MESSAGE_HEADER * hdr;
- EncName enc;
+ P2P_MESSAGE_HEADER *hdr;
+ EncName enc;
- hdr = (P2P_MESSAGE_HEADER*) entry->closure;
- IFLOG(LOG_DEBUG,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
- LOG(LOG_DEBUG,
- "Core selected message of type %u and size %u for sending to peer
`%s'.\n",
- ntohs(hdr->type),
- ntohs(hdr->size),
- &enc);
+ hdr = (P2P_MESSAGE_HEADER *) entry->closure;
+ IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
+ LOG(LOG_DEBUG,
+ "Core selected message of type %u and size %u for sending to peer
`%s'.\n",
+ ntohs(hdr->type), ntohs(hdr->size), &enc);
}
#endif
}
@@ -1158,8 +1128,8 @@
* entry such that the selected messages obey
* the SE flags.
*/
-static int * permuteSendBuffer(BufferEntry * be) {
- int * perm;
+static int *permuteSendBuffer(BufferEntry * be) {
+ int *perm;
int headpos;
int tailpos;
int i;
@@ -1167,25 +1137,25 @@
perm = permute(WEAK, be->sendBufferSize);
headpos = 0;
- tailpos = be->sendBufferSize-1;
- for (i=0;i<be->sendBufferSize;i++) {
- if (be->sendBuffer[perm[i]] == NULL)
- continue;
- if (be->sendBuffer[perm[i]]->knapsackSolution == YES) {
+ tailpos = be->sendBufferSize - 1;
+ for(i = 0; i < be->sendBufferSize; i++) {
+ if(be->sendBuffer[perm[i]] == NULL)
+ continue;
+ if(be->sendBuffer[perm[i]]->knapsackSolution == YES) {
switch (be->sendBuffer[perm[i]]->flags & SE_PLACEMENT_FLAG) {
case SE_FLAG_NONE:
- break;
+ break;
case SE_FLAG_PLACE_HEAD:
- /* swap slot with whoever is head now */
- j = perm[headpos];
- perm[headpos++] = perm[i];
- perm[i] = j;
- break;
+ /* swap slot with whoever is head now */
+ j = perm[headpos];
+ perm[headpos++] = perm[i];
+ perm[i] = j;
+ break;
case SE_FLAG_PLACE_TAIL:
- /* swap slot with whoever is tail now */
- j = perm[tailpos];
- perm[tailpos--] = perm[i];
- perm[i] = j;
+ /* swap slot with whoever is tail now */
+ j = perm[tailpos];
+ perm[tailpos--] = perm[i];
+ perm[i] = j;
}
}
}
@@ -1199,18 +1169,18 @@
*/
static void freeSelectedEntries(BufferEntry * be) {
int i;
- SendEntry * entry;
+ SendEntry *entry;
- for (i=0;i<be->sendBufferSize;i++) {
+ for(i = 0; i < be->sendBufferSize; i++) {
entry = be->sendBuffer[i];
GNUNET_ASSERT(entry != NULL);
- if (entry->knapsackSolution == YES) {
+ if(entry->knapsackSolution == YES) {
GNUNET_ASSERT(entry->callback == NULL);
FREENONNULL(entry->closure);
FREE(entry);
be->sendBuffer[i] = NULL;
- } else if ( (entry->callback == NULL) &&
- (entry->closure == NULL) ) {
+ }
+ else if((entry->callback == NULL) && (entry->closure == NULL)) {
FREE(entry);
be->sendBuffer[i] = NULL;
}
@@ -1226,57 +1196,51 @@
* @return OK on success, NO on error
*/
static int ensureTransportConnected(BufferEntry * be) {
- SendEntry ** entries;
- SendEntry * entry;
+ SendEntry **entries;
+ SendEntry *entry;
int i;
int ret;
int j;
int changed;
- if (be->session.tsession == NULL) {
- be->session.tsession
- = transport->connectFreely(&be->session.sender,
- YES);
- if (be->session.tsession == NULL)
+ if(be->session.tsession == NULL) {
+ be->session.tsession = transport->connectFreely(&be->session.sender, YES);
+ if(be->session.tsession == NULL)
return NO;
- be->session.mtu
- = transport->getMTU(be->session.tsession->ttype);
- if (be->session.mtu > 0) {
+ be->session.mtu = transport->getMTU(be->session.tsession->ttype);
+ if(be->session.mtu > 0) {
/* MTU change may require new fragmentation! */
changed = YES;
- while (changed) {
- changed = NO;
- entries = be->sendBuffer;
- i = 0;
- ret = be->sendBufferSize;
- while (i < ret) {
- entry = entries[i];
- if (entry->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) {
- ret--;
- for (j=i;j<ret;j++)
- entries[j] = entries[j+1]; /* preserve ordering */
- GROW(be->sendBuffer,
- be->sendBufferSize,
- ret);
- /* calling fragment will change be->sendBuffer;
- thus we need to restart from the beginning afterwards... */
- fragmentation->fragment(&be->session.sender,
- be->session.mtu - sizeof(P2P_PACKET_HEADER),
- entry->pri,
- entry->transmissionTime,
- entry->len,
- entry->callback,
- entry->closure);
- FREE(entry);
- changed = YES;
- break;
- } else {
- i++;
- }
- } /* for all i (until change) */
- } /* while changed */
- } /* if MTU changed */
- } /* if need to reconnect */
+ while(changed) {
+ changed = NO;
+ entries = be->sendBuffer;
+ i = 0;
+ ret = be->sendBufferSize;
+ while(i < ret) {
+ entry = entries[i];
+ if(entry->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) {
+ ret--;
+ for(j = i; j < ret; j++)
+ entries[j] = entries[j + 1]; /* preserve ordering */
+ GROW(be->sendBuffer, be->sendBufferSize, ret);
+ /* calling fragment will change be->sendBuffer;
+ thus we need to restart from the beginning afterwards... */
+ fragmentation->fragment(&be->session.sender,
+ be->session.mtu -
+ sizeof(P2P_PACKET_HEADER), entry->pri,
+ entry->transmissionTime, entry->len,
+ entry->callback, entry->closure);
+ FREE(entry);
+ changed = YES;
+ break;
+ }
+ else {
+ i++;
+ }
+ } /* for all i (until change) */
+ } /* while changed */
+ } /* if MTU changed */
+ } /* if need to reconnect */
return OK;
}
@@ -1295,31 +1259,29 @@
unsigned int j;
unsigned int p;
unsigned int rsi;
- SendCallbackList * pos;
- P2P_PACKET_HEADER * p2pHdr;
+ SendCallbackList *pos;
+ P2P_PACKET_HEADER *p2pHdr;
unsigned int priority;
- int * perm;
- char * plaintextMsg;
- void * encryptedMsg;
+ int *perm;
+ char *plaintextMsg;
+ void *encryptedMsg;
unsigned int totalMessageSize;
int ret;
ENTRY();
/* fast ways out */
- if (be == NULL) {
+ if(be == NULL) {
BREAK();
return;
}
- if ( (be->status != STAT_UP) ||
- (be->sendBufferSize == 0) ||
- (be->inSendBuffer == YES) ) {
- return; /* must not run */
+ if((be->status != STAT_UP) ||
+ (be->sendBufferSize == 0) || (be->inSendBuffer == YES)) {
+ return; /* must not run */
}
be->inSendBuffer = YES;
- if ( (OK != ensureTransportConnected(be)) ||
- (be->sendBufferSize == 0) ||
- (OK != checkSendFrequency(be)) ) {
+ if((OK != ensureTransportConnected(be)) ||
+ (be->sendBufferSize == 0) || (OK != checkSendFrequency(be))) {
be->inSendBuffer = NO;
return;
}
@@ -1329,27 +1291,25 @@
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"receiver window available: %lld bytes (MTU: %u)\n",
- be->available_send_window,
- be->session.mtu);
+ be->available_send_window, be->session.mtu);
#endif
- totalMessageSize = selectMessagesToSend(be,
- &priority);
- if (totalMessageSize == 0) {
+ totalMessageSize = selectMessagesToSend(be, &priority);
+ if(totalMessageSize == 0) {
expireSendBufferEntries(be);
be->inSendBuffer = NO;
- return; /* deferr further */
+ return; /* deferr further */
}
GNUNET_ASSERT(totalMessageSize > sizeof(P2P_PACKET_HEADER));
/* check if we (sender) have enough bandwidth available
if so, trigger callbacks on selected entries; if either
fails, return (but clean up garbage) */
- if ( (SYSERR == outgoingCheck(priority)) ||
- (0 == prepareSelectedMessages(be)) ) {
+ if((SYSERR == outgoingCheck(priority)) ||
+ (0 == prepareSelectedMessages(be))) {
expireSendBufferEntries(be);
be->inSendBuffer = NO;
- return; /* deferr further */
+ return; /* deferr further */
}
/* get permutation of SendBuffer Entries
@@ -1358,29 +1318,24 @@
/* build message (start with sequence number) */
plaintextMsg = MALLOC(totalMessageSize);
- p2pHdr = (P2P_PACKET_HEADER*) plaintextMsg;
- p2pHdr->timeStamp
- = htonl(TIME(NULL));
- p2pHdr->sequenceNumber
- = htonl(be->lastSequenceNumberSend);
- p2pHdr->bandwidth
- = htonl(be->idealized_limit);
+ p2pHdr = (P2P_PACKET_HEADER *) plaintextMsg;
+ p2pHdr->timeStamp = htonl(TIME(NULL));
+ p2pHdr->sequenceNumber = htonl(be->lastSequenceNumberSend);
+ p2pHdr->bandwidth = htonl(be->idealized_limit);
p = sizeof(P2P_PACKET_HEADER);
- for (i=0;i<be->sendBufferSize;i++) {
- SendEntry * entry = be->sendBuffer[perm[i]];
+ for(i = 0; i < be->sendBufferSize; i++) {
+ SendEntry *entry = be->sendBuffer[perm[i]];
- if (entry == NULL)
+ if(entry == NULL)
continue;
- if (entry->knapsackSolution == YES) {
+ if(entry->knapsackSolution == YES) {
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG, "Queuing msg %u with length %u\n", perm[i], entry->len);
+ LOG(LOG_DEBUG, "Queuing msg %u with length %u\n", perm[i], entry->len);
#endif
GNUNET_ASSERT(entry->callback == NULL);
GNUNET_ASSERT(p + entry->len <= totalMessageSize);
- memcpy(&plaintextMsg[p],
- entry->closure,
- entry->len);
+ memcpy(&plaintextMsg[p], entry->closure, entry->len);
p += entry->len;
}
}
@@ -1389,96 +1344,78 @@
/* still room left? try callbacks! */
pos = scl_nextHead;
- while (pos != NULL) {
- if (pos->minimumPadding + p <= totalMessageSize) {
+ while(pos != NULL) {
+ if(pos->minimumPadding + p <= totalMessageSize) {
p += pos->callback(&be->session.sender,
- &plaintextMsg[p],
- be->session.mtu - p);
+ &plaintextMsg[p], be->session.mtu - p);
}
pos = pos->next;
}
/* finally padd with noise */
- if ( (p + sizeof(P2P_MESSAGE_HEADER) <= totalMessageSize) &&
- (disable_random_padding == NO) ) {
- P2P_MESSAGE_HEADER * part;
+ if((p + sizeof(P2P_MESSAGE_HEADER) <= totalMessageSize) &&
+ (disable_random_padding == NO)) {
+ P2P_MESSAGE_HEADER *part;
unsigned short noiseLen = totalMessageSize - p;
- part = (P2P_MESSAGE_HEADER *) &plaintextMsg[p];
- part->size
- = htons(noiseLen);
- part->type
- = htons(P2P_PROTO_noise);
- for (i=p+sizeof(P2P_MESSAGE_HEADER);
- i < totalMessageSize;
- i++)
+ part = (P2P_MESSAGE_HEADER *) & plaintextMsg[p];
+ part->size = htons(noiseLen);
+ part->type = htons(P2P_PROTO_noise);
+ for(i = p + sizeof(P2P_MESSAGE_HEADER); i < totalMessageSize; i++)
plaintextMsg[i] = (char) rand();
p = totalMessageSize;
- if (stats != NULL)
- stats->change(stat_noise_sent,
- noiseLen);
+ if(stats != NULL)
+ stats->change(stat_noise_sent, noiseLen);
}
encryptedMsg = MALLOC(p);
hash(&p2pHdr->sequenceNumber,
- p - sizeof(HashCode512),
- (HashCode512*) encryptedMsg);
- ret = encryptBlock(&p2pHdr->sequenceNumber,
- p - sizeof(HashCode512),
- &be->skey_local,
- (const INITVECTOR*) encryptedMsg, /* IV */
- &((P2P_PACKET_HEADER*)encryptedMsg)->sequenceNumber);
+ p - sizeof(HashCode512), (HashCode512 *) encryptedMsg);
+ ret = encryptBlock(&p2pHdr->sequenceNumber, p - sizeof(HashCode512),
&be->skey_local, (const INITVECTOR *) encryptedMsg, /* IV */
+ &((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber);
#if DEBUG_CONNECTION
printMsg("Encrypting P2P data", &be->session.sender,
- &be->skey_local, (const INITVECTOR*) encryptedMsg,
- crc32N(&((P2P_PACKET_HEADER*)encryptedMsg)->sequenceNumber, ret));
+ &be->skey_local, (const INITVECTOR *) encryptedMsg,
+ crc32N(&((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber,
+ ret));
#endif
- if (stats != NULL)
- stats->change(stat_encrypted,
- p - sizeof(HashCode512));
+ if(stats != NULL)
+ stats->change(stat_encrypted, p - sizeof(HashCode512));
GNUNET_ASSERT(be->session.tsession != NULL);
- ret = transport->send(be->session.tsession,
- encryptedMsg,
- p);
- if ( (ret == NO) &&
- (priority >= EXTREME_PRIORITY) ) {
- ret = transport->sendReliable(be->session.tsession,
- encryptedMsg,
- p);
+ ret = transport->send(be->session.tsession, encryptedMsg, p);
+ if((ret == NO) && (priority >= EXTREME_PRIORITY)) {
+ ret = transport->sendReliable(be->session.tsession, encryptedMsg, p);
}
- if (ret == YES) {
- if (be->available_send_window > totalMessageSize)
+ if(ret == YES) {
+ if(be->available_send_window > totalMessageSize)
be->available_send_window -= totalMessageSize;
else
- be->available_send_window = 0; /* if we overrode limits,
- reset to 0 at least... */
+ be->available_send_window = 0; /* if we overrode limits,
+ reset to 0 at least... */
be->lastSequenceNumberSend++;
- if (be->idealized_limit > be->max_transmitted_limit)
+ if(be->idealized_limit > be->max_transmitted_limit)
+ be->max_transmitted_limit = be->idealized_limit;
+ else /* age */
be->max_transmitted_limit
- = be->idealized_limit;
- else /* age */
- be->max_transmitted_limit
- = (be->idealized_limit + be->max_transmitted_limit*3)/4;
+ = (be->idealized_limit + be->max_transmitted_limit * 3) / 4;
- if (rsnSize > 0) {
+ if(rsnSize > 0) {
j = sizeof(P2P_PACKET_HEADER);
- while (j < p) {
- P2P_MESSAGE_HEADER * part = (P2P_MESSAGE_HEADER*) &plaintextMsg[j];
- unsigned short plen = htons(part->size);
- if (plen < sizeof(P2P_MESSAGE_HEADER)) {
- BREAK();
- break;
- }
- for (rsi=0;rsi<rsnSize;rsi++)
- rsns[rsi](&be->session.sender,
- part);
- j += plen;
+ while(j < p) {
+ P2P_MESSAGE_HEADER *part = (P2P_MESSAGE_HEADER *) & plaintextMsg[j];
+ unsigned short plen = htons(part->size);
+ if(plen < sizeof(P2P_MESSAGE_HEADER)) {
+ BREAK();
+ break;
+ }
+ for(rsi = 0; rsi < rsnSize; rsi++)
+ rsns[rsi] (&be->session.sender, part);
+ j += plen;
}
}
freeSelectedEntries(be);
}
- if ( (ret == SYSERR) &&
- (be->session.tsession != NULL) ) {
+ if((ret == SYSERR) && (be->session.tsession != NULL)) {
transport->disconnect(be->session.tsession);
be->session.tsession = NULL;
}
@@ -1496,79 +1433,68 @@
* @param be on which connection to transmit
* @param se what to transmit (with meta-data)
*/
-static void appendToBuffer(BufferEntry * be,
- SendEntry * se) {
+static void appendToBuffer(BufferEntry * be, SendEntry * se) {
#if DEBUG_CONNECTION
EncName enc;
#endif
float apri;
unsigned int i;
- SendEntry ** ne;
+ SendEntry **ne;
unsigned long long queueSize;
ENTRY();
- if ( (se == NULL) || (se->len == 0) ) {
+ if((se == NULL) || (se->len == 0)) {
BREAK();
FREENONNULL(se);
return;
}
- if ( (be->session.mtu != 0) &&
- (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) ) {
+ if((be->session.mtu != 0) &&
+ (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER))) {
/* this message is so big that it must be fragmented! */
fragmentation->fragment(&be->session.sender,
- be->session.mtu - sizeof(P2P_PACKET_HEADER),
- se->pri,
- se->transmissionTime,
- se->len,
- se->callback,
- se->closure);
+ be->session.mtu - sizeof(P2P_PACKET_HEADER),
+ se->pri,
+ se->transmissionTime,
+ se->len, se->callback, se->closure);
FREE(se);
return;
}
#if DEBUG_CONNECTION
- IFLOG(LOG_DEBUG,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
+ IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
LOG(LOG_DEBUG,
- "adding message of size %d to buffer of host %s.\n",
- se->len,
- &enc);
+ "adding message of size %d to buffer of host %s.\n", se->len, &enc);
#endif
- if ( (be->sendBufferSize > 0) &&
- (be->status != STAT_UP) ) {
+ if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
/* as long as we do not have a confirmed
connection, do NOT queue messages! */
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "not connected to %s, message dropped\n",
- &enc);
+ LOG(LOG_DEBUG, "not connected to %s, message dropped\n", &enc);
#endif
FREE(se->closure);
FREE(se);
return;
}
queueSize = 0;
- for (i=0;i<be->sendBufferSize;i++)
+ for(i = 0; i < be->sendBufferSize; i++)
queueSize += be->sendBuffer[i]->len;
- if (queueSize >= MAX_SEND_BUFFER_SIZE) {
+ if(queueSize >= MAX_SEND_BUFFER_SIZE) {
/* first, try to remedy! */
sendBuffer(be);
/* did it work? */
queueSize = 0;
- for (i=0;i<be->sendBufferSize;i++)
+ for(i = 0; i < be->sendBufferSize; i++)
queueSize += be->sendBuffer[i]->len;
- if (queueSize >= MAX_SEND_BUFFER_SIZE) {
+ if(queueSize >= MAX_SEND_BUFFER_SIZE) {
/* we need to enforce some hard limit here, otherwise we may take
- FAR too much memory (200 MB easily) */
+ FAR too much memory (200 MB easily) */
#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
- "queueSize (%llu) >= %d, refusing to queue message.\n",
- queueSize,
- MAX_SEND_BUFFER_SIZE);
+ "queueSize (%llu) >= %d, refusing to queue message.\n",
+ queueSize, MAX_SEND_BUFFER_SIZE);
#endif
FREE(se->closure);
FREE(se);
@@ -1576,19 +1502,19 @@
}
}
/* grow send buffer, insertion sort! */
- ne = MALLOC( (be->sendBufferSize+1) * sizeof(SendEntry*));
+ ne = MALLOC((be->sendBufferSize + 1) * sizeof(SendEntry *));
GNUNET_ASSERT(se->len != 0);
apri = (float) se->pri / (float) se->len;
- i=0;
- while ( (i < be->sendBufferSize) &&
- ( ((float)be->sendBuffer[i]->pri /
- (float)be->sendBuffer[i]->len) >= apri) ) {
+ i = 0;
+ while((i < be->sendBufferSize) &&
+ (((float) be->sendBuffer[i]->pri /
+ (float) be->sendBuffer[i]->len) >= apri)) {
ne[i] = be->sendBuffer[i];
i++;
}
ne[i++] = se;
- while (i < be->sendBufferSize+1) {
- ne[i] = be->sendBuffer[i-1];
+ while(i < be->sendBufferSize + 1) {
+ ne[i] = be->sendBuffer[i - 1];
i++;
}
FREENONNULL(be->sendBuffer);
@@ -1604,13 +1530,13 @@
* @param hostId the ID of the peer for which the connection is returned
* @return the connection of the host in the table, NULL if not connected
*/
-static BufferEntry * lookForHost(const PeerIdentity * hostId) {
- BufferEntry * root;
+static BufferEntry *lookForHost(const PeerIdentity * hostId) {
+ BufferEntry *root;
root = CONNECTION_buffer_[computeIndex(hostId)];
- while (root != NULL) {
- if (equalsHashCode512(&hostId->hashPubKey,
- &root->session.sender.hashPubKey))
+ while(root != NULL) {
+ if(equalsHashCode512(&hostId->hashPubKey,
+ &root->session.sender.hashPubKey))
return root;
root = root->overflowChain;
}
@@ -1628,46 +1554,40 @@
* @param hostId for which peer should we get/create a connection
* @return the table entry for the host
*/
-static BufferEntry * addHost(const PeerIdentity * hostId,
- int establishSession) {
- BufferEntry * root;
- BufferEntry * prev;
+static BufferEntry *addHost(const PeerIdentity * hostId, int establishSession)
{
+ BufferEntry *root;
+ BufferEntry *prev;
#if DEBUG_CONNECTION
EncName enc;
- IFLOG(LOG_EVERYTHING,
- hash2enc(&hostId->hashPubKey,
- &enc));
- LOG(LOG_EVERYTHING,
- "Adding host `%s' to the connection table.\n",
- &enc);
+ IFLOG(LOG_EVERYTHING, hash2enc(&hostId->hashPubKey, &enc));
+ LOG(LOG_EVERYTHING, "Adding host `%s' to the connection table.\n", &enc);
#endif
ENTRY();
root = lookForHost(hostId);
- if (root == NULL) {
+ if(root == NULL) {
root = CONNECTION_buffer_[computeIndex(hostId)];
prev = NULL;
- while (NULL != root) {
+ while(NULL != root) {
/* settle for entry in the linked list that is down */
- if ( (root->status == STAT_DOWN) ||
- (equalsHashCode512(&hostId->hashPubKey,
- &root->session.sender.hashPubKey)) )
- break;
+ if((root->status == STAT_DOWN) ||
+ (equalsHashCode512(&hostId->hashPubKey,
+ &root->session.sender.hashPubKey)))
+ break;
prev = root;
root = root->overflowChain;
}
- if (root == NULL) {
+ if(root == NULL) {
root = initBufferEntry();
- if (prev == NULL)
- CONNECTION_buffer_[computeIndex(hostId)] = root;
+ if(prev == NULL)
+ CONNECTION_buffer_[computeIndex(hostId)] = root;
else
- prev->overflowChain = root;
+ prev->overflowChain = root;
}
root->session.sender = *hostId;
}
- if ( (root->status == STAT_DOWN) &&
- (establishSession == YES) ) {
+ if((root->status == STAT_DOWN) && (establishSession == YES)) {
root->lastSequenceNumberReceived = 0;
session->tryConnect(hostId);
}
@@ -1683,19 +1603,18 @@
* @param arg the second argument to the method
* @return the number of connected hosts
*/
-static int forAllConnectedHosts(BufferEntryCallback method,
- void * arg) {
+static int forAllConnectedHosts(BufferEntryCallback method, void *arg) {
unsigned int i;
int count = 0;
- BufferEntry * be;
+ BufferEntry *be;
ENTRY();
- for (i=0;i<CONNECTION_MAX_HOSTS_;i++) {
+ for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
be = CONNECTION_buffer_[i];
- while (be != NULL) {
- if (be->status == STAT_UP) {
- if (method != NULL)
- method(be, arg);
+ while(be != NULL) {
+ if(be->status == STAT_UP) {
+ if(method != NULL)
+ method(be, arg);
count++;
}
be = be->overflowChain;
@@ -1711,14 +1630,12 @@
* @param arg closure of type fENHWrap giving the function
* to call
*/
-static void fENHCallback(BufferEntry * be,
- void * arg) {
- fENHWrap * wrap;
+static void fENHCallback(BufferEntry * be, void *arg) {
+ fENHWrap *wrap;
- wrap = (fENHWrap*) arg;
- if (wrap->method != NULL)
- wrap->method(&be->session.sender,
- wrap->arg);
+ wrap = (fENHWrap *) arg;
+ if(wrap->method != NULL)
+ wrap->method(&be->session.sender, wrap->arg);
}
@@ -1731,14 +1648,13 @@
* @param len the length of the pre-build message
* @return OK (always successful)
*/
-static int copyCallback(void * buf,
- void * closure,
- unsigned short len) {
- if (len > 0) {
+static int copyCallback(void *buf, void *closure, unsigned short len) {
+ if(len > 0) {
memcpy(buf, closure, len);
FREE(closure);
return OK;
- } else {
+ }
+ else {
FREE(closure);
return SYSERR;
}
@@ -1759,46 +1675,30 @@
ENTRY();
#if DEBUG_CONNECTION
- IFLOG(LOG_DEBUG,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
- LOG(LOG_DEBUG,
- "Shutting down connection with `%s'\n",
- &enc);
+ IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
+ LOG(LOG_DEBUG, "Shutting down connection with `%s'\n", &enc);
#endif
- if (be->status == STAT_DOWN)
- return; /* nothing to do */
- if (be->status == STAT_UP) {
- SendEntry * se;
+ if(be->status == STAT_DOWN)
+ return; /* nothing to do */
+ if(be->status == STAT_UP) {
+ SendEntry *se;
- hangup.header.type
- = htons(P2P_PROTO_hangup);
- hangup.header.size
- = htons(sizeof(P2P_hangup_MESSAGE));
+ hangup.header.type = htons(P2P_PROTO_hangup);
+ hangup.header.size = htons(sizeof(P2P_hangup_MESSAGE));
identity->getPeerIdentity(identity->getPublicPrivateKey(),
- &hangup.sender);
+ &hangup.sender);
se = MALLOC(sizeof(SendEntry));
- se->len
- = sizeof(P2P_hangup_MESSAGE);
- se->flags
- = SE_FLAG_PLACE_TAIL;
- se->pri
- = EXTREME_PRIORITY;
- se->transmissionTime
- = cronTime(NULL); /* now */
- se->callback
- = ©Callback;
- se->closure
- = MALLOC(sizeof(P2P_hangup_MESSAGE));
- se->knapsackSolution
- = NO;
- memcpy(se->closure,
- &hangup,
- sizeof(P2P_hangup_MESSAGE));
+ se->len = sizeof(P2P_hangup_MESSAGE);
+ se->flags = SE_FLAG_PLACE_TAIL;
+ se->pri = EXTREME_PRIORITY;
+ se->transmissionTime = cronTime(NULL); /* now */
+ se->callback = ©Callback;
+ se->closure = MALLOC(sizeof(P2P_hangup_MESSAGE));
+ se->knapsackSolution = NO;
+ memcpy(se->closure, &hangup, sizeof(P2P_hangup_MESSAGE));
appendToBuffer(be, se);
- if (stats != NULL)
- stats->change(stat_hangupSent,
- 1);
+ if(stats != NULL)
+ stats->change(stat_hangupSent, 1);
/* override send frequency and
really try hard to get the HANGUP
out! */
@@ -1809,28 +1709,24 @@
be->status = STAT_DOWN;
be->idealized_limit = MIN_BPM_PER_PEER;
be->max_transmitted_limit = MIN_BPM_PER_PEER;
- if (be->session.tsession != NULL) {
+ if(be->session.tsession != NULL) {
transport->disconnect(be->session.tsession);
be->session.tsession = NULL;
}
- for (i=0;i<be->sendBufferSize;i++) {
+ for(i = 0; i < be->sendBufferSize; i++) {
FREENONNULL(be->sendBuffer[i]->closure);
FREE(be->sendBuffer[i]);
}
- GROW(be->sendBuffer,
- be->sendBufferSize,
- 0);
+ GROW(be->sendBuffer, be->sendBufferSize, 0);
}
-/* ******** inbound bandwidth scheduling ************* */
+/* ******** inbound bandwidth scheduling ************* */
-static void gatherEntries(BufferEntry * be,
- UTL_Closure * utl) {
+static void gatherEntries(BufferEntry * be, UTL_Closure * utl) {
utl->e[utl->pos++] = be;
}
-static void resetRecentlyReceived(BufferEntry * be,
- void * unused) {
+static void resetRecentlyReceived(BufferEntry * be, void *unused) {
be->recently_received = 0;
}
@@ -1856,7 +1752,7 @@
* number of connections).
*/
static unsigned int minConnect() {
- return CONNECTION_MAX_HOSTS_/2;
+ return CONNECTION_MAX_HOSTS_ / 2;
}
/**
@@ -1870,14 +1766,14 @@
UTL_Closure utl;
static cron_t timeDifference;
cron_t now;
- BufferEntry ** entries;
- double * shares;
+ BufferEntry **entries;
+ double *shares;
double shareSum;
unsigned int u;
unsigned int minCon;
long long schedulableBandwidth;
long long decrementSB;
- long long * adjustedRR;
+ long long *adjustedRR;
int didAssign;
int firstRound;
int earlyRun;
@@ -1887,81 +1783,79 @@
cronTime(&now);
/* if this is the first round, don't bother... */
- if (lastRoundStart == 0) {
+ if(lastRoundStart == 0) {
/* no allocation the first time this function is called! */
lastRoundStart = now;
- forAllConnectedHosts(&resetRecentlyReceived,
- NULL);
+ forAllConnectedHosts(&resetRecentlyReceived, NULL);
MUTEX_UNLOCK(&lock);
return;
}
activePeerCount = forAllConnectedHosts(NULL, NULL);
- if (activePeerCount == 0) {
+ if(activePeerCount == 0) {
MUTEX_UNLOCK(&lock);
- return; /* nothing to be done here. */
+ return; /* nothing to be done here. */
}
- /* if time difference is too small, we don't have enough
+ /* if time difference is too small, we don't have enough
sample data and should NOT update the limits;
however, if we have FAR to few peers, reschedule
aggressively (since we are unlikely to get close
to the limits anyway) */
timeDifference = now - lastRoundStart;
earlyRun = 0;
- if (timeDifference < MIN_SAMPLE_TIME) {
+ if(timeDifference < MIN_SAMPLE_TIME) {
earlyRun = 1;
- if (activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
+ if(activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
MUTEX_UNLOCK(&lock);
- return; /* don't update too frequently, we need at least some
- semi-representative sampling! */
+ return; /* don't update too frequently, we need at
least some
+ semi-representative sampling! */
}
}
- if (timeDifference == 0)
+ if(timeDifference == 0)
timeDifference = 1;
/* build an array containing all BEs */
- entries = MALLOC(sizeof(BufferEntry*)*activePeerCount);
+ entries = MALLOC(sizeof(BufferEntry *) * activePeerCount);
utl.pos = 0;
utl.e = entries;
- forAllConnectedHosts((BufferEntryCallback)&gatherEntries,
- &utl);
+ forAllConnectedHosts((BufferEntryCallback) & gatherEntries, &utl);
/* compute shares */
- shares = MALLOC(sizeof(double)*activePeerCount);
+ shares = MALLOC(sizeof(double) * activePeerCount);
shareSum = 0.0;
- for (u=0;u<activePeerCount;u++) {
+ for(u = 0; u < activePeerCount; u++) {
shares[u] = SHARE_DISTRIBUTION_FUNCTION(entries[u]);
- if (shares[u] < 0.0)
+ if(shares[u] < 0.0)
shares[u] = 0.0;
shareSum += shares[u];
}
/* normalize distribution */
- if (shareSum >= 0.00001) { /* avoid numeric glitches... */
- for (u=0;u<activePeerCount;u++)
- shares[u] = shares[u] / shareSum;
- } else {
- for (u=0;u<activePeerCount;u++)
+ if(shareSum >= 0.00001) { /* avoid numeric glitches... */
+ for(u = 0; u < activePeerCount; u++)
+ shares[u] = shares[u] / shareSum;
+ }
+ else {
+ for(u = 0; u < activePeerCount; u++)
shares[u] = 1 / activePeerCount;
}
/* compute how much bandwidth we can bargain with */
minCon = minConnect();
- if (minCon > activePeerCount)
+ if(minCon > activePeerCount)
minCon = activePeerCount;
- schedulableBandwidth
- = max_bpm - minCon * MIN_BPM_PER_PEER;
+ schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
load = getNetworkLoadDown();
- if (load > 100) {
+ if(load > 100) {
/* take counter measures! */
schedulableBandwidth = schedulableBandwidth * 100 / load;
/* make sure we do not take it down too far */
- if ( (schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
- (max_bpm > minCon * MIN_BPM_PER_PEER * 2) )
+ if((schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
+ (max_bpm > minCon * MIN_BPM_PER_PEER * 2))
schedulableBandwidth = minCon * MIN_BPM_PER_PEER / 2;
}
@@ -1972,70 +1866,64 @@
and then merge the values; but for now, let's just go
hardcore and adjust all values rapidly */
GNUNET_ASSERT(timeDifference != 0);
- for (u=0;u<activePeerCount;u++) {
- adjustedRR[u] = entries[u]->recently_received * cronMINUTES /
timeDifference / 2;
+ for(u = 0; u < activePeerCount; u++) {
+ adjustedRR[u] =
+ entries[u]->recently_received * cronMINUTES / timeDifference / 2;
#if DEBUG_CONNECTION
- if (adjustedRR[u] > entries[u]->idealized_limit) {
+ if(adjustedRR[u] > entries[u]->idealized_limit) {
EncName enc;
- IFLOG(LOG_INFO,
- hash2enc(&entries[u]->session.sender.hashPubKey,
- &enc));
+ IFLOG(LOG_INFO, hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
LOG(LOG_INFO,
- "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
- &enc,
- adjustedRR[u],
- entries[u]->idealized_limit);
+ "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
+ &enc, adjustedRR[u], entries[u]->idealized_limit);
}
#endif
/* Check for peers grossly exceeding send limits. Be a bit
* reasonable and make the check against the max value we have
* sent to this peer (assume announcements may have got lost).
*/
- if ( (earlyRun == 0) &&
- (adjustedRR[u] > 2 * MAX_BUF_FACT *
- entries[u]->max_transmitted_limit) &&
- (adjustedRR[u] > 2 * MAX_BUF_FACT *
- entries[u]->idealized_limit) ) {
+ if((earlyRun == 0) &&
+ (adjustedRR[u] > 2 * MAX_BUF_FACT *
+ entries[u]->max_transmitted_limit) &&
+ (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
EncName enc;
entries[u]->violations++;
- entries[u]->recently_received = 0; /* "clear" slate */
- if (entries[u]->violations > 10) {
- IFLOG(LOG_INFO,
- hash2enc(&entries[u]->session.sender.hashPubKey,
- &enc));
- LOG(LOG_INFO,
- "blacklisting `%s': sent repeatedly %llu bpm "
- "(limit %u bpm, target %u bpm)\n",
- &enc,
- adjustedRR[u],
- entries[u]->max_transmitted_limit,
- entries[u]->idealized_limit);
- identity->blacklistHost(&entries[u]->session.sender,
- 1 / topology->getSaturation(),
- YES);
- shutdownConnection(entries[u]);
- activePeerCount--;
- entries[u] = entries[activePeerCount];
- shares[u] = shares[activePeerCount];
- adjustedRR[u] = adjustedRR[activePeerCount];
- u--;
- continue;
+ entries[u]->recently_received = 0; /* "clear" slate */
+ if(entries[u]->violations > 10) {
+ IFLOG(LOG_INFO,
+ hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
+ LOG(LOG_INFO,
+ "blacklisting `%s': sent repeatedly %llu bpm "
+ "(limit %u bpm, target %u bpm)\n",
+ &enc,
+ adjustedRR[u],
+ entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
+ identity->blacklistHost(&entries[u]->session.sender,
+ 1 / topology->getSaturation(), YES);
+ shutdownConnection(entries[u]);
+ activePeerCount--;
+ entries[u] = entries[activePeerCount];
+ shares[u] = shares[activePeerCount];
+ adjustedRR[u] = adjustedRR[activePeerCount];
+ u--;
+ continue;
}
- } else {
- if ( (earlyRun == 0) &&
- (adjustedRR[u] < entries[u]->max_transmitted_limit/2) &&
- (entries[u]->violations > 0) ) {
- /* allow very low traffic volume to
- balance out (rare) times of high
- volume */
- entries[u]->violations--;
+ }
+ else {
+ if((earlyRun == 0) &&
+ (adjustedRR[u] < entries[u]->max_transmitted_limit / 2) &&
+ (entries[u]->violations > 0)) {
+ /* allow very low traffic volume to
+ balance out (rare) times of high
+ volume */
+ entries[u]->violations--;
}
}
- if (adjustedRR[u] < MIN_BPM_PER_PEER/2)
- adjustedRR[u] = MIN_BPM_PER_PEER/2;
+ if(adjustedRR[u] < MIN_BPM_PER_PEER / 2)
+ adjustedRR[u] = MIN_BPM_PER_PEER / 2;
/* even if we received NO traffic, allow
at least MIN_BPM_PER_PEER */
}
@@ -2054,115 +1942,112 @@
didAssign = YES;
/* in the first round we cap by 2* previous utilization */
firstRound = YES;
- for (u=0;u<activePeerCount;u++)
+ for(u = 0; u < activePeerCount; u++)
entries[u]->idealized_limit = 0;
- while ( (schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
- (activePeerCount > 0) &&
- (didAssign == YES) ) {
+ while((schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
+ (activePeerCount > 0) && (didAssign == YES)) {
didAssign = NO;
decrementSB = 0;
- for (u=0;u<activePeerCount;u++) {
+ for(u = 0; u < activePeerCount; u++) {
/* always allow allocating MIN_BPM_PER_PEER */
- if ( (firstRound == NO) ||
- (entries[u]->idealized_limit < adjustedRR[u] * 2) ) {
- unsigned int share;
+ if((firstRound == NO) ||
+ (entries[u]->idealized_limit < adjustedRR[u] * 2)) {
+ unsigned int share;
- share = entries[u]->idealized_limit + (unsigned int) (shares[u] *
schedulableBandwidth);
- if (share < entries[u]->idealized_limit)
- share = 0xFFFFFFFF; /* int overflow */
- if ( (share > adjustedRR[u] * 2) &&
- (firstRound == YES) )
- share = adjustedRR[u] * 2;
- if (share > entries[u]->idealized_limit) {
- decrementSB += share - entries[u]->idealized_limit;
- didAssign = YES;
- }
- if ( (share < MIN_BPM_PER_PEER) &&
- (minCon > 0) ) {
- /* use one of the minCon's to keep the connection! */
- decrementSB -= share;
- share = MIN_BPM_PER_PEER;
- minCon--;
- }
- entries[u]->idealized_limit = share;
+ share =
+ entries[u]->idealized_limit +
+ (unsigned int) (shares[u] * schedulableBandwidth);
+ if(share < entries[u]->idealized_limit)
+ share = 0xFFFFFFFF; /* int overflow */
+ if((share > adjustedRR[u] * 2) && (firstRound == YES))
+ share = adjustedRR[u] * 2;
+ if(share > entries[u]->idealized_limit) {
+ decrementSB += share - entries[u]->idealized_limit;
+ didAssign = YES;
+ }
+ if((share < MIN_BPM_PER_PEER) && (minCon > 0)) {
+ /* use one of the minCon's to keep the connection! */
+ decrementSB -= share;
+ share = MIN_BPM_PER_PEER;
+ minCon--;
+ }
+ entries[u]->idealized_limit = share;
}
}
- if (decrementSB > schedulableBandwidth) {
+ if(decrementSB > schedulableBandwidth) {
schedulableBandwidth -= decrementSB;
- } else {
+ }
+ else {
schedulableBandwidth = 0;
break;
}
- if ( (activePeerCount > 0) &&
- (didAssign == NO) ) {
- int * perm = permute(WEAK, activePeerCount);
+ if((activePeerCount > 0) && (didAssign == NO)) {
+ int *perm = permute(WEAK, activePeerCount);
/* assign also to random "worthless" (zero-share) peers */
- for (u=0;u<activePeerCount;u++) {
- unsigned int v = perm[u]; /* use perm to avoid preference to
low-numbered slots */
- if ( (firstRound == NO) ||
- (entries[v]->idealized_limit < adjustedRR[u] * 2) ) {
- unsigned int share;
+ for(u = 0; u < activePeerCount; u++) {
+ unsigned int v = perm[u]; /* use perm to avoid preference to
low-numbered slots */
+ if((firstRound == NO) ||
+ (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
+ unsigned int share;
- share = entries[v]->idealized_limit + (unsigned int)
(schedulableBandwidth);
- if (share < entries[u]->idealized_limit)
- share = 0xFFFFFFFF; /* int overflow */
- if ( (firstRound == YES) &&
- (share > adjustedRR[u] * 2) )
- share = adjustedRR[u] * 2;
- schedulableBandwidth -= share - entries[v]->idealized_limit;
- entries[v]->idealized_limit = share;
- }
+ share =
+ entries[v]->idealized_limit +
+ (unsigned int) (schedulableBandwidth);
+ if(share < entries[u]->idealized_limit)
+ share = 0xFFFFFFFF; /* int overflow */
+ if((firstRound == YES) && (share > adjustedRR[u] * 2))
+ share = adjustedRR[u] * 2;
+ schedulableBandwidth -= share - entries[v]->idealized_limit;
+ entries[v]->idealized_limit = share;
+ }
}
FREE(perm);
perm = NULL;
- if ( (schedulableBandwidth > 0) &&
- (activePeerCount > 0) ) {
- /* assign rest disregarding traffic limits */
- perm = permute(WEAK, activePeerCount);
- for (u=0;u<activePeerCount;u++) {
- unsigned int share;
+ if((schedulableBandwidth > 0) && (activePeerCount > 0)) {
+ /* assign rest disregarding traffic limits */
+ perm = permute(WEAK, activePeerCount);
+ for(u = 0; u < activePeerCount; u++) {
+ unsigned int share;
- share = entries[perm[u]]->idealized_limit + (unsigned int)
(schedulableBandwidth/activePeerCount);
- if (share > entries[perm[u]]->idealized_limit) /* no int-overflow? */
- entries[perm[u]]->idealized_limit = share;
- }
- schedulableBandwidth = 0;
- FREE(perm);
- perm = NULL;
+ share =
+ entries[perm[u]]->idealized_limit +
+ (unsigned int) (schedulableBandwidth / activePeerCount);
+ if(share > entries[perm[u]]->idealized_limit) /* no int-overflow? */
+ entries[perm[u]]->idealized_limit = share;
+ }
+ schedulableBandwidth = 0;
+ FREE(perm);
+ perm = NULL;
}
- } /* didAssign == NO? */
- if (firstRound == YES) {
+ } /* didAssign == NO? */
+ if(firstRound == YES) {
/* keep some bandwidth off the market
- for new connections */
+ for new connections */
schedulableBandwidth /= 2;
}
firstRound = NO;
- } /* while bandwidth to distribute */
+ } /* while bandwidth to distribute */
/* randomly add the remaining MIN_BPM_PER_PEER to minCon peers; yes, this
will
yield some fluctuation, but some amount of fluctuation should be
good since it creates opportunities. */
- if (activePeerCount > 0)
- for (u=0;u<minCon;u++)
+ if(activePeerCount > 0)
+ for(u = 0; u < minCon; u++)
entries[weak_randomi(activePeerCount)]->idealized_limit
- += MIN_BPM_PER_PEER;
+ += MIN_BPM_PER_PEER;
/* prepare for next round */
lastRoundStart = now;
- for (u=0;u<activePeerCount;u++) {
+ for(u = 0; u < activePeerCount; u++) {
#if DEBUG_CONNECTION
EncName enc;
- IFLOG(LOG_DEBUG,
- hash2enc(&entries[u]->session.sender.hashPubKey,
- &enc));
+ IFLOG(LOG_DEBUG, hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
LOG(LOG_DEBUG,
- "inbound limit for peer %u: %s set to %u bpm\n",
- u,
- &enc,
- entries[u]->idealized_limit);
+ "inbound limit for peer %u: %s set to %u bpm\n",
+ u, &enc, entries[u]->idealized_limit);
#endif
entries[u]->current_connection_value /= 2.0;
entries[u]->recently_received /= 2;
@@ -2172,21 +2057,18 @@
FREE(adjustedRR);
FREE(shares);
FREE(entries);
- for (u=0;u<CONNECTION_MAX_HOSTS_;u++) {
- BufferEntry * be = CONNECTION_buffer_[u];
- if (be == NULL)
+ for(u = 0; u < CONNECTION_MAX_HOSTS_; u++) {
+ BufferEntry *be = CONNECTION_buffer_[u];
+ if(be == NULL)
continue;
- if (be->idealized_limit < MIN_BPM_PER_PEER) {
+ if(be->idealized_limit < MIN_BPM_PER_PEER) {
#if DEBUG_CONNECTION || 1
EncName enc;
-
- IFLOG(LOG_DEBUG,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
+
+ IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
LOG(LOG_DEBUG,
- "Number of connections too high, shutting down low-traffic connection
to %s (had only %u bpm)\n",
- &enc,
- be->idealized_limit);
+ "Number of connections too high, shutting down low-traffic
connection to %s (had only %u bpm)\n",
+ &enc, be->idealized_limit);
#endif
shutdownConnection(be);
}
@@ -2203,97 +2085,86 @@
*
* @param unused not used, just to make signature type nicely
*/
-static void cronDecreaseLiveness(void * unused) {
- BufferEntry * root;
- BufferEntry * prev;
- BufferEntry * tmp;
+static void cronDecreaseLiveness(void *unused) {
+ BufferEntry *root;
+ BufferEntry *prev;
+ BufferEntry *tmp;
cron_t now;
int i;
scheduleInboundTraffic();
cronTime(&now);
MUTEX_LOCK(&lock);
- for (i=0;i<CONNECTION_MAX_HOSTS_;i++) {
+ for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
root = CONNECTION_buffer_[i];
prev = NULL;
- while (NULL != root) {
+ while(NULL != root) {
switch (root->status) {
case STAT_DOWN:
- /* just compact linked list */
- if (prev == NULL)
- CONNECTION_buffer_[i] = root->overflowChain;
- else
- prev->overflowChain = root->overflowChain;
- tmp = root;
- root = root->overflowChain;
- FREE(tmp);
- continue; /* no need to call 'send buffer' */
+ /* just compact linked list */
+ if(prev == NULL)
+ CONNECTION_buffer_[i] = root->overflowChain;
+ else
+ prev->overflowChain = root->overflowChain;
+ tmp = root;
+ root = root->overflowChain;
+ FREE(tmp);
+ continue; /* no need to call 'send buffer' */
case STAT_UP:
- if ( (now > root->isAlive) && /* concurrency might make this false... */
- (now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS) ) {
- EncName enc;
-
- /* switch state form UP to DOWN: too much inactivity */
- IFLOG(LOG_DEBUG,
- hash2enc(&root->session.sender.hashPubKey,
- &enc));
- LOG(LOG_DEBUG,
- "closing connection with `%s': "
- "too much inactivity (%llu ms)\n",
- &enc,
- now - root->isAlive);
- shutdownConnection(root);
- /* the host may still be worth trying again soon: */
- identity->whitelistHost(&root->session.sender);
- }
- if ( (root->available_send_window >= 60000) &&
- (root->sendBufferSize < 4) &&
- (scl_nextHead != NULL) &&
- (getNetworkLoadUp() < 25) &&
- (getCPULoad() < 50) ) {
- /* create some traffic by force! */
- char * msgBuf;
- unsigned int mSize;
- SendCallbackList * pos;
+ if((now > root->isAlive) && /* concurrency might make this false... */
+ (now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS)) {
+ EncName enc;
- msgBuf = MALLOC(60000);
- pos = scl_nextHead;
- while (pos != NULL) {
- if (pos->minimumPadding <= 60000) {
- mSize = pos->callback(&root->session.sender,
- msgBuf,
- 60000);
- if (mSize > 0)
- unicast(&root->session.sender,
- (P2P_MESSAGE_HEADER*) msgBuf,
- 0,
- 5 * cronMINUTES);
- }
- pos = pos->next;
- }
- FREE(msgBuf);
- }
+ /* switch state form UP to DOWN: too much inactivity */
+ IFLOG(LOG_DEBUG, hash2enc(&root->session.sender.hashPubKey, &enc));
+ LOG(LOG_DEBUG,
+ "closing connection with `%s': "
+ "too much inactivity (%llu ms)\n", &enc, now - root->isAlive);
+ shutdownConnection(root);
+ /* the host may still be worth trying again soon: */
+ identity->whitelistHost(&root->session.sender);
+ }
+ if((root->available_send_window >= 60000) &&
+ (root->sendBufferSize < 4) &&
+ (scl_nextHead != NULL) &&
+ (getNetworkLoadUp() < 25) && (getCPULoad() < 50)) {
+ /* create some traffic by force! */
+ char *msgBuf;
+ unsigned int mSize;
+ SendCallbackList *pos;
- break;
- default: /* not up, not down - partial SETKEY exchange */
- if ( (now > root->isAlive) &&
- (now - root->isAlive > SECONDS_NOPINGPONG_DROP * cronSECONDS) ) {
- EncName enc;
- IFLOG(LOG_DEBUG,
- hash2enc(&root->session.sender.hashPubKey, &enc));
- LOG(LOG_DEBUG,
- "closing connection to %s: %s not answered.\n",
- &enc,
- (root->status == STAT_SETKEY_SENT) ? "SETKEY" : "PING");
- shutdownConnection(root);
- }
- break;
- } /* end of switch */
+ msgBuf = MALLOC(60000);
+ pos = scl_nextHead;
+ while(pos != NULL) {
+ if(pos->minimumPadding <= 60000) {
+ mSize = pos->callback(&root->session.sender, msgBuf, 60000);
+ if(mSize > 0)
+ unicast(&root->session.sender,
+ (P2P_MESSAGE_HEADER *) msgBuf, 0, 5 * cronMINUTES);
+ }
+ pos = pos->next;
+ }
+ FREE(msgBuf);
+ }
+
+ break;
+ default: /* not up, not down - partial SETKEY exchange */
+ if((now > root->isAlive) &&
+ (now - root->isAlive > SECONDS_NOPINGPONG_DROP * cronSECONDS)) {
+ EncName enc;
+ IFLOG(LOG_DEBUG, hash2enc(&root->session.sender.hashPubKey, &enc));
+ LOG(LOG_DEBUG,
+ "closing connection to %s: %s not answered.\n",
+ &enc, (root->status == STAT_SETKEY_SENT) ? "SETKEY" : "PING");
+ shutdownConnection(root);
+ }
+ break;
+ } /* end of switch */
sendBuffer(root);
prev = root;
root = root->overflowChain;
- } /* end of while */
- } /* for all buckets */
+ } /* end of while */
+ } /* for all buckets */
MUTEX_UNLOCK(&lock);
}
@@ -2311,133 +2182,111 @@
* SYSERR if it was malformed
*/
int checkHeader(const PeerIdentity * sender,
- P2P_PACKET_HEADER * msg,
- unsigned short size) {
- BufferEntry * be;
+ P2P_PACKET_HEADER * msg, unsigned short size) {
+ BufferEntry *be;
int res;
unsigned int sequenceNumber;
TIME_T stamp;
- char * tmp;
+ char *tmp;
HashCode512 hc;
EncName enc;
ENTRY();
GNUNET_ASSERT(msg != NULL);
GNUNET_ASSERT(sender != NULL);
- hash2enc(&sender->hashPubKey,
- &enc);
- if (size < sizeof(P2P_PACKET_HEADER)) {
+ hash2enc(&sender->hashPubKey, &enc);
+ if(size < sizeof(P2P_PACKET_HEADER)) {
LOG(LOG_WARNING,
- _("Message from `%s' discarded: invalid format.\n"),
- &enc);
+ _("Message from `%s' discarded: invalid format.\n"), &enc);
return SYSERR;
}
- hash2enc(&sender->hashPubKey,
- &enc);
- hash(&msg->sequenceNumber,
- size - sizeof(HashCode512),
- &hc);
- if ( equalsHashCode512(&hc,
- &msg->hash) &&
- (msg->sequenceNumber == 0) &&
- (msg->bandwidth == 0) &&
- (msg->timeStamp == 0) )
- return NO; /* plaintext */
+ hash2enc(&sender->hashPubKey, &enc);
+ hash(&msg->sequenceNumber, size - sizeof(HashCode512), &hc);
+ if(equalsHashCode512(&hc,
+ &msg->hash) &&
+ (msg->sequenceNumber == 0) &&
+ (msg->bandwidth == 0) && (msg->timeStamp == 0))
+ return NO; /* plaintext */
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Decrypting message from host `%s'\n",
- &enc);
+ LOG(LOG_DEBUG, "Decrypting message from host `%s'\n", &enc);
#endif
MUTEX_LOCK(&lock);
be = lookForHost(sender);
- if ( (be == NULL) ||
- (be->status == STAT_DOWN) ||
- (be->status == STAT_SETKEY_SENT) ) {
+ if((be == NULL) ||
+ (be->status == STAT_DOWN) || (be->status == STAT_SETKEY_SENT)) {
LOG(LOG_INFO,
- "Decrypting message from host `%s' failed, no sessionkey (yet)!\n",
- &enc);
+ "Decrypting message from host `%s' failed, no sessionkey (yet)!\n",
+ &enc);
/* try to establish a connection, that way, we don't keep
getting bogus messages until the other one times out. */
- if ( (be == NULL) || (be->status == STAT_DOWN) )
+ if((be == NULL) || (be->status == STAT_DOWN))
addHost(sender, YES);
MUTEX_UNLOCK(&lock);
- return SYSERR; /* could not decrypt */
+ return SYSERR; /* could not decrypt */
}
tmp = MALLOC(size - sizeof(HashCode512));
- res = decryptBlock(&be->skey_remote,
- &msg->sequenceNumber,
- size - sizeof(HashCode512),
- (const INITVECTOR*) &msg->hash, /* IV */
- tmp);
- hash(tmp,
- size - sizeof(HashCode512),
- &hc);
- if ( ! ( (res != OK) &&
- equalsHashCode512(&hc,
- &msg->hash)) ) {
+ res = decryptBlock(&be->skey_remote, &msg->sequenceNumber, size -
sizeof(HashCode512), (const INITVECTOR *) &msg->hash, /* IV */
+ tmp);
+ hash(tmp, size - sizeof(HashCode512), &hc);
+ if(!((res != OK) && equalsHashCode512(&hc, &msg->hash))) {
LOG(LOG_INFO,
- "Decrypting message from host `%s' failed, wrong sessionkey!\n",
- &enc);
+ "Decrypting message from host `%s' failed, wrong sessionkey!\n",
+ &enc);
#if DEBUG_CONNECTION
printMsg("Wrong sessionkey", sender,
- &be->skey_remote, (const INITVECTOR *) &msg->hash,
- crc32N(&msg->sequenceNumber, size - sizeof(HashCode512)));
+ &be->skey_remote, (const INITVECTOR *) &msg->hash,
+ crc32N(&msg->sequenceNumber, size - sizeof(HashCode512)));
#endif
addHost(sender, YES);
MUTEX_UNLOCK(&lock);
FREE(tmp);
return SYSERR;
}
- if (stats != NULL)
- stats->change(stat_decrypted,
- size - sizeof(HashCode512));
- memcpy(&msg->sequenceNumber,
- tmp,
- size - sizeof(HashCode512));
+ if(stats != NULL)
+ stats->change(stat_decrypted, size - sizeof(HashCode512));
+ memcpy(&msg->sequenceNumber, tmp, size - sizeof(HashCode512));
FREE(tmp);
res = YES;
sequenceNumber = ntohl(msg->sequenceNumber);
- if (be->lastSequenceNumberReceived >= sequenceNumber) {
+ if(be->lastSequenceNumberReceived >= sequenceNumber) {
res = SYSERR;
- if ( (be->lastSequenceNumberReceived - sequenceNumber <= 32) &&
- (be->lastSequenceNumberReceived != sequenceNumber) ) {
- unsigned int rotbit = 1 << (be->lastSequenceNumberReceived -
sequenceNumber - 1);
- if ( (be->lastPacketsBitmap & rotbit) == 0) {
- be->lastPacketsBitmap |= rotbit;
- res = OK;
+ if((be->lastSequenceNumberReceived - sequenceNumber <= 32) &&
+ (be->lastSequenceNumberReceived != sequenceNumber)) {
+ unsigned int rotbit =
+ 1 << (be->lastSequenceNumberReceived - sequenceNumber - 1);
+ if((be->lastPacketsBitmap & rotbit) == 0) {
+ be->lastPacketsBitmap |= rotbit;
+ res = OK;
}
}
- if (res == SYSERR) {
+ if(res == SYSERR) {
LOG(LOG_WARNING,
- _("Invalid sequence number"
- " %u <= %u, dropping message.\n"),
- sequenceNumber,
- be->lastSequenceNumberReceived);
+ _("Invalid sequence number"
+ " %u <= %u, dropping message.\n"),
+ sequenceNumber, be->lastSequenceNumberReceived);
MUTEX_UNLOCK(&lock);
return SYSERR;
}
- } else {
+ }
+ else {
be->lastPacketsBitmap =
be->lastPacketsBitmap
<< (sequenceNumber - be->lastSequenceNumberReceived);
be->lastSequenceNumberReceived = sequenceNumber;
}
stamp = ntohl(msg->timeStamp);
- if (stamp + 1 * cronDAYS < TIME(NULL)) {
- LOG(LOG_INFO,
- _("Message received more than one day old. Dropped.\n"));
+ if(stamp + 1 * cronDAYS < TIME(NULL)) {
+ LOG(LOG_INFO, _("Message received more than one day old. Dropped.\n"));
MUTEX_UNLOCK(&lock);
return SYSERR;
}
be->max_bpm = ntohl(msg->bandwidth);
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Received bandwidth cap of %u bpm\n",
- be->max_bpm);
+ LOG(LOG_DEBUG, "Received bandwidth cap of %u bpm\n", be->max_bpm);
#endif
- if (be->available_send_window >= be->max_bpm) {
+ if(be->available_send_window >= be->max_bpm) {
be->available_send_window = be->max_bpm;
cronTime(&be->last_bps_update);
}
@@ -2455,25 +2304,20 @@
* @return OK on success, SYSERR on error
*/
static int handleHANGUP(const PeerIdentity * sender,
- const P2P_MESSAGE_HEADER * msg) {
- BufferEntry * be;
+ const P2P_MESSAGE_HEADER * msg) {
+ BufferEntry *be;
EncName enc;
ENTRY();
- if (ntohs(msg->size) != sizeof(P2P_hangup_MESSAGE))
+ if(ntohs(msg->size) != sizeof(P2P_hangup_MESSAGE))
return SYSERR;
- if (!hostIdentityEquals(sender,
- &((P2P_hangup_MESSAGE*)msg)->sender))
+ if(!hostIdentityEquals(sender, &((P2P_hangup_MESSAGE *) msg)->sender))
return SYSERR;
- IFLOG(LOG_INFO,
- hash2enc(&sender->hashPubKey,
- &enc));
- LOG(LOG_INFO,
- "received HANGUP from `%s'\n",
- &enc);
+ IFLOG(LOG_INFO, hash2enc(&sender->hashPubKey, &enc));
+ LOG(LOG_INFO, "received HANGUP from `%s'\n", &enc);
MUTEX_LOCK(&lock);
be = lookForHost(sender);
- if (be == NULL) {
+ if(be == NULL) {
MUTEX_UNLOCK(&lock);
return SYSERR;
}
@@ -2494,32 +2338,29 @@
* YES if it is the key for sending
*/
void assignSessionKey(const SESSIONKEY * key,
- const PeerIdentity * peer,
- TIME_T age,
- int forSending) {
- BufferEntry * be;
+ const PeerIdentity * peer, TIME_T age, int forSending) {
+ BufferEntry *be;
MUTEX_LOCK(&lock);
be = lookForHost(peer);
- if (be == NULL)
+ if(be == NULL)
be = addHost(peer, NO);
- if (be != NULL) {
+ if(be != NULL) {
cronTime(&be->isAlive);
- if (forSending == YES) {
+ if(forSending == YES) {
be->skey_local = *key;
be->skey_local_created = age;
be->status = STAT_SETKEY_SENT | (be->status & STAT_SETKEY_RECEIVED);
- } else { /* for receiving */
- if ( ((be->status & STAT_SETKEY_RECEIVED) == 0) ||
- (be->skey_remote_created < age) ) {
- if (0 != memcmp(key,
- &be->skey_remote,
- sizeof(SESSIONKEY))) {
- be->skey_remote = *key;
- be->lastSequenceNumberReceived = 0;
- }
- be->skey_remote_created = age;
- be->status |= STAT_SETKEY_RECEIVED;
+ }
+ else { /* for receiving */
+ if(((be->status & STAT_SETKEY_RECEIVED) == 0) ||
+ (be->skey_remote_created < age)) {
+ if(0 != memcmp(key, &be->skey_remote, sizeof(SESSIONKEY))) {
+ be->skey_remote = *key;
+ be->lastSequenceNumberReceived = 0;
+ }
+ be->skey_remote_created = age;
+ be->status |= STAT_SETKEY_RECEIVED;
}
}
}
@@ -2532,17 +2373,16 @@
* @param peer the other peer,
*/
void confirmSessionUp(const PeerIdentity * peer) {
- BufferEntry * be;
+ BufferEntry *be;
MUTEX_LOCK(&lock);
be = lookForHost(peer);
- if (be != NULL) {
+ if(be != NULL) {
cronTime(&be->isAlive);
identity->whitelistHost(peer);
- if ( ( (be->status & STAT_SETKEY_SENT) > 0) &&
- ( (be->status & STAT_SETKEY_RECEIVED) > 0) &&
- (OK == ensureTransportConnected(be)) &&
- (be->status != STAT_UP) ) {
+ if(((be->status & STAT_SETKEY_SENT) > 0) &&
+ ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
+ (OK == ensureTransportConnected(be)) && (be->status != STAT_UP)) {
be->status = STAT_UP;
be->lastSequenceNumberReceived = 0;
be->lastSequenceNumberSend = 1;
@@ -2566,15 +2406,15 @@
* the slot
*/
int isSlotUsed(int slot) {
- BufferEntry * be;
+ BufferEntry *be;
int ret;
ret = 0;
MUTEX_LOCK(&lock);
- if ( (slot >= 0) && (slot < CONNECTION_MAX_HOSTS_) ) {
+ if((slot >= 0) && (slot < CONNECTION_MAX_HOSTS_)) {
be = CONNECTION_buffer_[slot];
- while (be != NULL) {
- if (be->status == STAT_UP)
- ret++;
+ while(be != NULL) {
+ if(be->status == STAT_UP)
+ ret++;
be = be->overflowChain;
}
}
@@ -2588,19 +2428,18 @@
* @param time updated with the time
* @return SYSERR if we are not connected to the peer at the moment
*/
-int getLastActivityOf(const PeerIdentity * peer,
- cron_t * time) {
+int getLastActivityOf(const PeerIdentity * peer, cron_t * time) {
int ret;
- BufferEntry * be;
+ BufferEntry *be;
ret = 0;
MUTEX_LOCK(&lock);
be = lookForHost(peer);
- if ( (be != NULL) &&
- (be->status == STAT_UP) ) {
+ if((be != NULL) && (be->status == STAT_UP)) {
*time = be->isAlive;
ret = OK;
- } else {
+ }
+ else {
*time = 0;
ret = SYSERR;
}
@@ -2620,26 +2459,25 @@
* OK if the sessionkey was set.
*/
int getCurrentSessionKey(const PeerIdentity * peer,
- SESSIONKEY * key,
- TIME_T * age,
- int forSending) {
+ SESSIONKEY * key, TIME_T * age, int forSending) {
int ret;
- BufferEntry * be;
+ BufferEntry *be;
ret = SYSERR;
MUTEX_LOCK(&lock);
be = lookForHost(peer);
- if (be != NULL) {
- if (forSending == YES) {
- if ((be->status & STAT_SETKEY_SENT) > 0) {
- *key = be->skey_local;
- *age = be->skey_local_created;
- ret = OK;
+ if(be != NULL) {
+ if(forSending == YES) {
+ if((be->status & STAT_SETKEY_SENT) > 0) {
+ *key = be->skey_local;
+ *age = be->skey_local_created;
+ ret = OK;
}
- } else { /* for receiving */
- if ((be->status & STAT_SETKEY_RECEIVED) > 0) {
- *key = be->skey_remote;
- *age = be->skey_remote_created;
- ret = OK;
+ }
+ else { /* for receiving */
+ if((be->status & STAT_SETKEY_RECEIVED) > 0) {
+ *key = be->skey_remote;
+ *age = be->skey_remote_created;
+ ret = OK;
}
}
}
@@ -2662,40 +2500,39 @@
* @param tsession the transport session that is for grabs
* @param sender the identity of the other node
*/
-void considerTakeover(const PeerIdentity * sender,
- TSession * tsession) {
- BufferEntry * be;
+void considerTakeover(const PeerIdentity * sender, TSession * tsession) {
+ BufferEntry *be;
ENTRY();
- if (tsession == NULL)
+ if(tsession == NULL)
return;
MUTEX_LOCK(&lock);
be = lookForHost(sender);
- if (be != NULL) {
- if (be->status != STAT_DOWN) {
+ if(be != NULL) {
+ if(be->status != STAT_DOWN) {
unsigned int cost = -1;
- if (be->session.tsession != NULL)
- cost = transport->getCost(be->session.tsession->ttype);
+ if(be->session.tsession != NULL)
+ cost = transport->getCost(be->session.tsession->ttype);
/* Question: doesn't this always do takeover in tcp/udp
- case, which have the same costs? Should it? -IW
+ case, which have the same costs? Should it? -IW
- Answer: this will always switch to TCP in the long run (if
- that is possible) since udpAssociate always
- returns SYSERR. This is intended since for long-running
- sessions, TCP is the better choice. UDP is only better for
- sending very few messages (e.g. attempting an initial exchange
- to get to know each other). See also transport paper and the
- data on throughput. - CG
- */
- if (transport->getCost(tsession->ttype) < cost) {
- if (transport->associate(tsession) == OK) {
- if (be->session.tsession != NULL)
- transport->disconnect(be->session.tsession);
- be->session.tsession = tsession;
- be->session.mtu = transport->getMTU(tsession->ttype);
- }
- } /* end if cheaper AND possible */
- } /* end if connected */
+ Answer: this will always switch to TCP in the long run (if
+ that is possible) since udpAssociate always
+ returns SYSERR. This is intended since for long-running
+ sessions, TCP is the better choice. UDP is only better for
+ sending very few messages (e.g. attempting an initial exchange
+ to get to know each other). See also transport paper and the
+ data on throughput. - CG
+ */
+ if(transport->getCost(tsession->ttype) < cost) {
+ if(transport->associate(tsession) == OK) {
+ if(be->session.tsession != NULL)
+ transport->disconnect(be->session.tsession);
+ be->session.tsession = tsession;
+ be->session.mtu = transport->getMTU(tsession->ttype);
+ }
+ } /* end if cheaper AND possible */
+ } /* end if connected */
}
MUTEX_UNLOCK(&lock);
transport->disconnect(tsession);
@@ -2713,71 +2550,65 @@
MUTEX_LOCK(&lock);
/* max_bpm may change... */
- new_max_bpm
- = 60 * getConfigurationInt("LOAD",
- "MAXNETDOWNBPSTOTAL");
- if (new_max_bpm == 0)
- new_max_bpm = 50000 * 60; /* assume 50 kbps */
- if (max_bpm != new_max_bpm) {
+ new_max_bpm = 60 * getConfigurationInt("LOAD", "MAXNETDOWNBPSTOTAL");
+ if(new_max_bpm == 0)
+ new_max_bpm = 50000 * 60; /* assume 50 kbps */
+ if(max_bpm != new_max_bpm) {
unsigned int newMAXHOSTS = 0;
max_bpm = new_max_bpm;
- newMAXHOSTS
- = max_bpm / (MIN_BPM_PER_PEER*2);
+ newMAXHOSTS = max_bpm / (MIN_BPM_PER_PEER * 2);
/* => for 1000 bps, we get 12 (rounded DOWN to 8) connections! */
- if (newMAXHOSTS < 2)
- newMAXHOSTS = 2; /* strict minimum is 2 */
- if (newMAXHOSTS > 256)
- newMAXHOSTS = 256; /* limit, before we run out of sockets! */
+ if(newMAXHOSTS < 2)
+ newMAXHOSTS = 2; /* strict minimum is 2 */
+ if(newMAXHOSTS > 256)
+ newMAXHOSTS = 256; /* limit, before we run out of sockets! */
i = 1;
- while (i <= newMAXHOSTS)
- i*=2;
- newMAXHOSTS = i/2; /* make sure it's a power of 2 */
+ while(i <= newMAXHOSTS)
+ i *= 2;
+ newMAXHOSTS = i / 2; /* make sure it's a power of 2 */
- if (newMAXHOSTS != CONNECTION_MAX_HOSTS_) {
+ if(newMAXHOSTS != CONNECTION_MAX_HOSTS_) {
/* change size of connection buffer!!! */
unsigned int olen;
- BufferEntry ** newBuffer;
+ BufferEntry **newBuffer;
olen = CONNECTION_MAX_HOSTS_;
CONNECTION_MAX_HOSTS_ = newMAXHOSTS;
setConfigurationInt("gnunetd",
- "connection-max-hosts",
- CONNECTION_MAX_HOSTS_);
- newBuffer = (BufferEntry**) MALLOC(sizeof(BufferEntry*)*newMAXHOSTS);
- for (i=0;i<CONNECTION_MAX_HOSTS_;i++)
- newBuffer[i] = NULL;
+ "connection-max-hosts", CONNECTION_MAX_HOSTS_);
+ newBuffer =
+ (BufferEntry **) MALLOC(sizeof(BufferEntry *) * newMAXHOSTS);
+ for(i = 0; i < CONNECTION_MAX_HOSTS_; i++)
+ newBuffer[i] = NULL;
/* rehash! */
- for (i=0;i<olen;i++) {
- BufferEntry * be;
+ for(i = 0; i < olen; i++) {
+ BufferEntry *be;
- be = CONNECTION_buffer_[i];
- while (be != NULL) {
- BufferEntry * next;
- unsigned int j;
+ be = CONNECTION_buffer_[i];
+ while(be != NULL) {
+ BufferEntry *next;
+ unsigned int j;
- next = be->overflowChain;
- j = computeIndex(&be->session.sender);
- be->overflowChain = newBuffer[j];
- newBuffer[j] = be;
- be = next;
- }
+ next = be->overflowChain;
+ j = computeIndex(&be->session.sender);
+ be->overflowChain = newBuffer[j];
+ newBuffer[j] = be;
+ be = next;
+ }
}
FREENONNULL(CONNECTION_buffer_);
CONNECTION_buffer_ = newBuffer;
LOG(LOG_DEBUG,
- "connection goal is %s%d peers (%llu BPS bandwidth downstream)\n",
- (olen == 0) ? "" : "now ",
- CONNECTION_MAX_HOSTS_,
- max_bpm);
+ "connection goal is %s%d peers (%llu BPS bandwidth downstream)\n",
+ (olen == 0) ? "" : "now ", CONNECTION_MAX_HOSTS_, max_bpm);
}
}
disable_random_padding = testConfigurationString("GNUNETD-EXPERIMENTAL",
- "PADDING",
- "NO");
+ "PADDING", "NO");
MUTEX_UNLOCK(&lock);
}
@@ -2785,52 +2616,42 @@
* Initialize this module.
*/
void initConnection() {
- GNUNET_ASSERT(P2P_MESSAGE_OVERHEAD
- == sizeof(P2P_PACKET_HEADER));
+ GNUNET_ASSERT(P2P_MESSAGE_OVERHEAD == sizeof(P2P_PACKET_HEADER));
GNUNET_ASSERT(sizeof(P2P_hangup_MESSAGE) == 68);
ENTRY();
- scl_nextHead
- = NULL;
- scl_nextTail
- = NULL;
+ scl_nextHead = NULL;
+ scl_nextTail = NULL;
MUTEX_CREATE_RECURSIVE(&lock);
registerConfigurationUpdateCallback(&connectionConfigChangeCallback);
CONNECTION_MAX_HOSTS_ = 0;
connectionConfigChangeCallback();
- registerp2pHandler(P2P_PROTO_hangup,
- &handleHANGUP);
- addCronJob(&cronDecreaseLiveness,
- 1 * cronSECONDS,
- 1 * cronSECONDS,
- NULL);
+ registerp2pHandler(P2P_PROTO_hangup, &handleHANGUP);
+ addCronJob(&cronDecreaseLiveness, 1 * cronSECONDS, 1 * cronSECONDS, NULL);
#if DEBUG_COLLECT_PRIO == YES
prioFile = FOPEN("/tmp/knapsack_prio.txt", "w");
#endif
transport = requestService("transport");
GNUNET_ASSERT(transport != NULL);
- identity = requestService("identity");
+ identity = requestService("identity");
GNUNET_ASSERT(identity != NULL);
- session = requestService("session");
+ session = requestService("session");
GNUNET_ASSERT(session != NULL);
fragmentation = requestService("fragmentation");
GNUNET_ASSERT(fragmentation != NULL);
topology = requestService("topology");
GNUNET_ASSERT(topology != NULL);
stats = requestService("stats");
- if (stats != NULL) {
+ if(stats != NULL) {
stat_messagesDropped
= stats->create(gettext_noop("# outgoing messages dropped"));
stat_sizeMessagesDropped
= stats->create(gettext_noop("# bytes of outgoing messages dropped"));
stat_hangupSent
= stats->create(gettext_noop("# connections closed (HANGUP sent)"));
- stat_encrypted
- = stats->create(gettext_noop("# bytes encrypted"));
- stat_decrypted
- = stats->create(gettext_noop("# bytes decrypted"));
- stat_noise_sent
- = stats->create(gettext_noop("# bytes noise sent"));
+ stat_encrypted = stats->create(gettext_noop("# bytes encrypted"));
+ stat_decrypted = stats->create(gettext_noop("# bytes decrypted"));
+ stat_noise_sent = stats->create(gettext_noop("# bytes noise sent"));
}
transport->start(&core_receive);
}
@@ -2841,23 +2662,20 @@
*/
void doneConnection() {
unsigned int i;
- BufferEntry * be;
- SendCallbackList * scl;
+ BufferEntry *be;
+ SendCallbackList *scl;
ENTRY();
transport->stop();
unregisterConfigurationUpdateCallback(&connectionConfigChangeCallback);
- delCronJob(&cronDecreaseLiveness,
- 1 * cronSECONDS,
- NULL);
- for (i=0;i<CONNECTION_MAX_HOSTS_;i++) {
- BufferEntry * prev;
+ delCronJob(&cronDecreaseLiveness, 1 * cronSECONDS, NULL);
+ for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
+ BufferEntry *prev;
prev = NULL;
be = CONNECTION_buffer_[i];
- while (be != NULL) {
- LOG(LOG_DEBUG,
- "Closing connection: shutdown\n");
+ while(be != NULL) {
+ LOG(LOG_DEBUG, "Closing connection: shutdown\n");
shutdownConnection(be);
prev = be;
be = be->overflowChain;
@@ -2868,7 +2686,7 @@
FREENONNULL(CONNECTION_buffer_);
CONNECTION_buffer_ = NULL;
CONNECTION_MAX_HOSTS_ = 0;
- while (scl_nextHead != NULL) {
+ while(scl_nextHead != NULL) {
scl = scl_nextHead;
scl_nextHead = scl_nextHead->next;
FREE(scl);
@@ -2900,16 +2718,14 @@
* @param arg second argument to method
* @return number of connected nodes
*/
-int forEachConnectedNode(PerNodeCallback method,
- void * arg) {
+int forEachConnectedNode(PerNodeCallback method, void *arg) {
fENHWrap wrap;
int ret;
wrap.method = method;
wrap.arg = arg;
MUTEX_LOCK(&lock);
- ret = forAllConnectedHosts(&fENHCallback,
- &wrap);
+ ret = forAllConnectedHosts(&fENHCallback, &wrap);
MUTEX_UNLOCK(&lock);
return ret;
}
@@ -2919,7 +2735,7 @@
*/
void printConnectionBuffer() {
unsigned int i;
- BufferEntry * tmp;
+ BufferEntry *tmp;
EncName hostName;
EncName skey_local;
EncName skey_remote;
@@ -2927,37 +2743,33 @@
MUTEX_LOCK(&lock);
ENTRY();
- for (i=0;i<CONNECTION_MAX_HOSTS_;i++) {
+ for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
tmp = CONNECTION_buffer_[i];
- while (tmp != NULL) {
- if (tmp->status != STAT_DOWN) {
+ while(tmp != NULL) {
+ if(tmp->status != STAT_DOWN) {
IFLOG(LOG_MESSAGE,
- hash2enc(&tmp->session.sender.hashPubKey,
- &hostName);
- hash2enc((HashCode512*) &tmp->skey_local,
- &skey_local);
- hash2enc((HashCode512*) &tmp->skey_remote,
- &skey_remote));
- hostName.encoding[4] = '\0';
- skey_local.encoding[4] = '\0';
- skey_remote.encoding[4] = '\0';
- ttype = 0;
- if (tmp->session.tsession != NULL)
- ttype = tmp->session.tsession->ttype;
- LOG(LOG_MESSAGE,
- "CONNECTION-TABLE: %3d-%1d-%2d-%4ds"
- " (of %ds) BPM %4llu %8ut-%3u: %s-%s-%s\n",
- i,
- tmp->status,
- ttype,
- (int) ((cronTime(NULL) - tmp->isAlive)/cronSECONDS),
- SECONDS_INACTIVE_DROP,
- tmp->recently_received,
- tmp->idealized_limit,
- tmp->sendBufferSize,
- &hostName,
- &skey_local,
- &skey_remote);
+ hash2enc(&tmp->session.sender.hashPubKey,
+ &hostName);
+ hash2enc((HashCode512 *) & tmp->skey_local,
+ &skey_local);
+ hash2enc((HashCode512 *) & tmp->skey_remote, &skey_remote));
+ hostName.encoding[4] = '\0';
+ skey_local.encoding[4] = '\0';
+ skey_remote.encoding[4] = '\0';
+ ttype = 0;
+ if(tmp->session.tsession != NULL)
+ ttype = tmp->session.tsession->ttype;
+ LOG(LOG_MESSAGE,
+ "CONNECTION-TABLE: %3d-%1d-%2d-%4ds"
+ " (of %ds) BPM %4llu %8ut-%3u: %s-%s-%s\n",
+ i,
+ tmp->status,
+ ttype,
+ (int) ((cronTime(NULL) - tmp->isAlive) / cronSECONDS),
+ SECONDS_INACTIVE_DROP,
+ tmp->recently_received,
+ tmp->idealized_limit,
+ tmp->sendBufferSize, &hostName, &skey_local, &skey_remote);
}
tmp = tmp->overflowChain;
}
@@ -2986,8 +2798,8 @@
* @return OK if the handler was registered, SYSERR on error
*/
int registerSendCallback(const unsigned int minimumPadding,
- BufferFillCallback callback) {
- SendCallbackList * scl;
+ BufferFillCallback callback) {
+ SendCallbackList *scl;
ENTRY();
scl = MALLOC(sizeof(SendCallbackList));
@@ -2995,10 +2807,11 @@
scl->callback = callback;
scl->next = NULL;
MUTEX_LOCK(&lock);
- if (scl_nextTail == NULL) {
+ if(scl_nextTail == NULL) {
scl_nextHead = scl;
scl_nextTail = scl;
- } else {
+ }
+ else {
scl_nextTail->next = scl;
scl_nextTail = scl;
}
@@ -3020,22 +2833,21 @@
* @return OK if the handler was removed, SYSERR on error
*/
int unregisterSendCallback(const unsigned int minimumPadding,
- BufferFillCallback callback) {
- SendCallbackList * pos;
- SendCallbackList * prev;
+ BufferFillCallback callback) {
+ SendCallbackList *pos;
+ SendCallbackList *prev;
prev = NULL;
MUTEX_LOCK(&lock);
pos = scl_nextHead;
- while (pos != NULL) {
- if ( (pos->callback == callback) &&
- (pos->minimumPadding == minimumPadding) ) {
- if (prev == NULL)
- scl_nextHead = pos->next;
+ while(pos != NULL) {
+ if((pos->callback == callback) && (pos->minimumPadding == minimumPadding))
{
+ if(prev == NULL)
+ scl_nextHead = pos->next;
else
- prev->next = pos->next;
- if (scl_nextTail == pos)
- scl_nextTail = prev;
+ prev->next = pos->next;
+ if(scl_nextTail == pos)
+ scl_nextTail = prev;
FREE(pos);
MUTEX_UNLOCK(&lock);
return OK;
@@ -3059,33 +2871,27 @@
* @param msg the message to transmit, should contain P2P_MESSAGE_HEADERs
* @return OK on success, SYSERR on failure, NO on temporary failure
*/
-int sendPlaintext(TSession * tsession,
- const char * msg,
- unsigned int size) {
- char * buf;
+int sendPlaintext(TSession * tsession, const char *msg, unsigned int size) {
+ char *buf;
int ret;
- P2P_PACKET_HEADER * hdr;
+ P2P_PACKET_HEADER *hdr;
GNUNET_ASSERT(tsession != NULL);
- if ( (transport->getMTU(tsession->ttype)>0) &&
- (transport->getMTU(tsession->ttype)<size + sizeof(P2P_PACKET_HEADER)) )
{
+ if((transport->getMTU(tsession->ttype) > 0) &&
+ (transport->getMTU(tsession->ttype) <
+ size + sizeof(P2P_PACKET_HEADER))) {
BREAK();
return SYSERR;
}
buf = MALLOC(size + sizeof(P2P_PACKET_HEADER));
- hdr = (P2P_PACKET_HEADER*) buf;
+ hdr = (P2P_PACKET_HEADER *) buf;
hdr->sequenceNumber = 0;
hdr->timeStamp = 0;
hdr->bandwidth = 0;
- memcpy(&buf[sizeof(P2P_PACKET_HEADER)],
- msg,
- size);
+ memcpy(&buf[sizeof(P2P_PACKET_HEADER)], msg, size);
hash(&hdr->sequenceNumber,
- size + sizeof(P2P_PACKET_HEADER) - sizeof(HashCode512),
- &hdr->hash);
- ret = transport->send(tsession,
- buf,
- size + sizeof(P2P_PACKET_HEADER));
+ size + sizeof(P2P_PACKET_HEADER) - sizeof(HashCode512), &hdr->hash);
+ ret = transport->send(tsession, buf, size + sizeof(P2P_PACKET_HEADER));
FREE(buf);
return ret;
}
@@ -3102,30 +2908,24 @@
* @param maxdelay how long can the message wait?
*/
void unicastCallback(const PeerIdentity * hostId,
- BuildMessageCallback callback,
- void * closure,
- unsigned short len,
- unsigned int importance,
- unsigned int maxdelay) {
- BufferEntry * be;
+ BuildMessageCallback callback,
+ void *closure,
+ unsigned short len,
+ unsigned int importance, unsigned int maxdelay) {
+ BufferEntry *be;
#if DEBUG_CONNECTION
EncName enc;
- IFLOG(LOG_DEBUG,
- hash2enc(&hostId->hashPubKey,
- &enc));
+ IFLOG(LOG_DEBUG, hash2enc(&hostId->hashPubKey, &enc));
LOG(LOG_DEBUG,
"%s: sending message to host %s message of size %d\n",
- __FUNCTION__,
- &enc,
- len);
+ __FUNCTION__, &enc, len);
#endif
ENTRY();
MUTEX_LOCK(&lock);
be = addHost(hostId, YES);
- if ( (be != NULL) &&
- (be->status != STAT_DOWN) ) {
- SendEntry * entry;
+ if((be != NULL) && (be->status != STAT_DOWN)) {
+ SendEntry *entry;
entry = MALLOC(sizeof(SendEntry));
entry->len = len;
@@ -3135,9 +2935,9 @@
entry->callback = callback;
entry->closure = closure;
entry->knapsackSolution = NO;
- appendToBuffer(be,
- entry);
- } else {
+ appendToBuffer(be, entry);
+ }
+ else {
FREENONNULL(closure);
}
MUTEX_UNLOCK(&lock);
@@ -3153,33 +2953,26 @@
* @param maxdelay how long can the message be delayed?
*/
void unicast(const PeerIdentity * receiver,
- const P2P_MESSAGE_HEADER * msg,
- unsigned int importance,
- unsigned int maxdelay) {
- char * closure;
+ const P2P_MESSAGE_HEADER * msg,
+ unsigned int importance, unsigned int maxdelay) {
+ char *closure;
unsigned short len;
- if (msg == NULL) {
+ if(msg == NULL) {
/* little hack for topology,
which cannot do this directly
due to cyclic dependencies! */
- if (getBandwidthAssignedTo(receiver) == 0)
+ if(getBandwidthAssignedTo(receiver) == 0)
session->tryConnect(receiver);
return;
}
len = ntohs(msg->size);
- if (len == 0)
+ if(len == 0)
return;
closure = MALLOC(len);
- memcpy(closure,
- msg,
- len);
+ memcpy(closure, msg, len);
unicastCallback(receiver,
- ©Callback,
- closure,
- len,
- importance,
- maxdelay);
+ ©Callback, closure, len, importance, maxdelay);
}
/**
@@ -3189,18 +2982,19 @@
* @return NO if we are not connected, YES if we are
*/
int isConnected(const PeerIdentity * hi) {
- BufferEntry * be;
+ BufferEntry *be;
MUTEX_LOCK(&lock);
be = lookForHost(hi);
MUTEX_UNLOCK(&lock);
- if (be == NULL) {
+ if(be == NULL) {
return NO;
- } else {
+ }
+ else {
return (be->status == STAT_UP);
}
}
-
+
/**
* Compute the hashtable index of a host id.
*
@@ -3208,9 +3002,9 @@
* @return the index for this peer in the connection table
*/
unsigned int computeIndex(const PeerIdentity * hostId) {
- unsigned int res = (((unsigned int)hostId->hashPubKey.bits[0]) &
- ((unsigned int)(CONNECTION_MAX_HOSTS_ - 1)));
- GNUNET_ASSERT(res < CONNECTION_MAX_HOSTS_);
+ unsigned int res = (((unsigned int) hostId->hashPubKey.bits[0]) &
+ ((unsigned int) (CONNECTION_MAX_HOSTS_ - 1)));
+ GNUNET_ASSERT(res < CONNECTION_MAX_HOSTS_);
return res;
}
@@ -3219,23 +3013,23 @@
*
* @return the lock
*/
-Mutex * getConnectionModuleLock() {
+Mutex *getConnectionModuleLock() {
return &lock;
}
unsigned int getBandwidthAssignedTo(const PeerIdentity * node) {
- BufferEntry * be;
+ BufferEntry *be;
unsigned int ret;
ENTRY();
MUTEX_LOCK(&lock);
be = lookForHost(node);
- if ( (be != NULL) &&
- (be->status == STAT_UP) ) {
+ if((be != NULL) && (be->status == STAT_UP)) {
ret = be->idealized_limit;
- if (ret == 0)
+ if(ret == 0)
ret = 1;
- } else {
+ }
+ else {
ret = 0;
}
MUTEX_UNLOCK(&lock);
@@ -3247,14 +3041,13 @@
* @param node the identity of the other peer
* @param preference how much should the traffic preference be increased?
*/
-void updateTrafficPreference(const PeerIdentity * node,
- double preference) {
- BufferEntry * be;
+void updateTrafficPreference(const PeerIdentity * node, double preference) {
+ BufferEntry *be;
ENTRY();
MUTEX_LOCK(&lock);
be = lookForHost(node);
- if (be != NULL)
+ if(be != NULL)
be->current_connection_value += preference;
MUTEX_UNLOCK(&lock);
}
@@ -3265,20 +3058,17 @@
*
* @param peer the peer to disconnect
*/
-void disconnectFromPeer(const PeerIdentity *node) {
- BufferEntry * be;
+void disconnectFromPeer(const PeerIdentity * node) {
+ BufferEntry *be;
EncName enc;
ENTRY();
MUTEX_LOCK(&lock);
be = lookForHost(node);
- if (be != NULL) {
- IFLOG(LOG_DEBUG,
- hash2enc(&node->hashPubKey,
- &enc));
+ if(be != NULL) {
+ IFLOG(LOG_DEBUG, hash2enc(&node->hashPubKey, &enc));
LOG(LOG_DEBUG,
- "Closing connection to `%s' as requested by application.\n",
- &enc);
+ "Closing connection to `%s' as requested by application.\n", &enc);
shutdownConnection(be);
}
MUTEX_UNLOCK(&lock);
@@ -3293,13 +3083,11 @@
* @return OK on success, SYSERR if there is a problem
*/
int registerSendNotify(MessagePartHandler callback) {
- if (callback == NULL)
+ if(callback == NULL)
return SYSERR;
MUTEX_LOCK(&lock);
- GROW(rsns,
- rsnSize,
- rsnSize+1);
- rsns[rsnSize-1] = callback;
+ GROW(rsns, rsnSize, rsnSize + 1);
+ rsns[rsnSize - 1] = callback;
MUTEX_UNLOCK(&lock);
return OK;
}
@@ -3315,12 +3103,10 @@
int unregisterSendNotify(MessagePartHandler callback) {
int i;
MUTEX_LOCK(&lock);
- for (i=0;i<rsnSize;i++) {
- if (rsns[i] == callback) {
- rsns[i] = rsns[rsnSize-1];
- GROW(rsns,
- rsnSize,
- rsnSize-1);
+ for(i = 0; i < rsnSize; i++) {
+ if(rsns[i] == callback) {
+ rsns[i] = rsns[rsnSize - 1];
+ GROW(rsns, rsnSize, rsnSize - 1);
MUTEX_UNLOCK(&lock);
return OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r2604 - GNUnet/src/server,
durner <=