[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r17811 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r17811 - gnunet/src/transport |
Date: |
Thu, 27 Oct 2011 15:01:11 +0200 |
Author: wachs
Date: 2011-10-27 15:01:11 +0200 (Thu, 27 Oct 2011)
New Revision: 17811
Modified:
gnunet/src/transport/gnunet-service-transport_neighbours_fsm.c
Log:
more changes:
including connection switching
Modified: gnunet/src/transport/gnunet-service-transport_neighbours_fsm.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours_fsm.c
2011-10-27 12:17:14 UTC (rev 17810)
+++ gnunet/src/transport/gnunet-service-transport_neighbours_fsm.c
2011-10-27 13:01:11 UTC (rev 17811)
@@ -365,60 +365,41 @@
return GNUNET_NO;
}
-static int
-change (struct NeighbourMapEntry * n, int state, int line)
+static const char *
+print_state (int state)
{
- char * old = NULL;
- char * new = NULL;
-
- switch (n->state) {
- case S_CONNECTED:
- old = "S_CONNECTED";
- break;
- case S_CONNECT_RECV:
- old = "S_CONNECT_RECV";
- break;
- case S_CONNECT_RECV_ACK_SENT:
- old = "S_CONNECT_RECV_ACK_SENT";
- break;
- case S_CONNECT_SENT:
- old = "S_CONNECT_SENT";
- break;
- case S_DISCONNECT:
- old = "S_DISCONNECT";
- break;
- case S_NOT_CONNECTED:
- old = "S_NOT_CONNECTED";
- break;
- default:
- GNUNET_break (0);
- break;
- }
-
switch (state) {
case S_CONNECTED:
- new = "S_CONNECTED";
+ return "S_CONNECTED";
break;
case S_CONNECT_RECV:
- new = "S_CONNECT_RECV";
+ return "S_CONNECT_RECV";
break;
case S_CONNECT_RECV_ACK_SENT:
- new = "S_CONNECT_RECV_ACK_SENT";
+ return"S_CONNECT_RECV_ACK_SENT";
break;
case S_CONNECT_SENT:
- new = "S_CONNECT_SENT";
+ return "S_CONNECT_SENT";
break;
case S_DISCONNECT:
- new = "S_DISCONNECT";
+ return "S_DISCONNECT";
break;
case S_NOT_CONNECTED:
- new = "S_NOT_CONNECTED";
+ return "S_NOT_CONNECTED";
break;
default:
GNUNET_break (0);
break;
}
+ return NULL;
+}
+static int
+change (struct NeighbourMapEntry * n, int state, int line)
+{
+ char * old = strdup(print_state(n->state));
+ char * new = strdup(print_state(state));
+
/* allowed transitions */
int allowed = GNUNET_NO;
switch (n->state) {
@@ -471,18 +452,21 @@
"Illegal state transition from `%s' to `%s' in line %u \n",
old, new, line);
GNUNET_break (0);
+ GNUNET_free (old);
+ GNUNET_free (new);
return GNUNET_SYSERR;
}
n->state = state;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "State for neighbour `%s' %X changed
from `%s' to `%s' in line %u\n",
GNUNET_i2s (&n->id), n, old, new, line);
+ GNUNET_free (old);
+ GNUNET_free (new);
return GNUNET_OK;
}
static ssize_t
-send_with_plugin (void *cls,
- const struct GNUNET_PeerIdentity * target,
+send_with_plugin ( const struct GNUNET_PeerIdentity * target,
const char *msgbuf,
size_t msgbuf_size,
uint32_t priority,
@@ -584,7 +568,6 @@
struct MessageQueue *mq;
struct GNUNET_TIME_Relative timeout;
ssize_t ret;
- struct GNUNET_TRANSPORT_PluginFunctions *papi;
if (n->is_active != NULL)
{
@@ -609,8 +592,7 @@
if (NULL == mq)
return; /* no more messages */
- papi = GST_plugins_find (n->plugin_name);
- if (papi == NULL)
+ if (GST_plugins_find (n->plugin_name) == NULL)
{
GNUNET_break (0);
return;
@@ -628,13 +610,13 @@
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "try_transmission_to_peer\n");
- papi = GST_plugins_find (n->plugin_name);
- ret =
- papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
- 0 /* priority -- remove from plugin API? */ ,
- timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
- &transmit_send_continuation, mq);
+
+ ret = send_with_plugin (&n->id,
+ mq->message_buf, mq->message_buf_size, 0,
+ timeout,
+ n->session, n->plugin_name, n->addr, n->addrlen,
+ GNUNET_YES,
+ &transmit_send_continuation, mq);
if (ret == -1)
{
/* failure, but 'send' would not call continuation in this case,
@@ -654,7 +636,6 @@
static void
transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "transmission_task\n");
struct NeighbourMapEntry *n = cls;
GNUNET_assert (NULL != lookup_neighbour(&n->id));
n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
@@ -679,7 +660,19 @@
neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
}
+static void
+send_disconnect_cont (void *cls,
+ const struct GNUNET_PeerIdentity * target,
+ int result)
+{
+ struct NeighbourMapEntry *n = cls;
+ if (result == GNUNET_OK)
+ change_state (n, S_DISCONNECT);
+ else
+ change_state (n, S_NOT_CONNECTED);
+}
+
static int
send_disconnect (struct NeighbourMapEntry *n)
{
@@ -703,10 +696,10 @@
&disconnect_msg.purpose,
&disconnect_msg.signature));
- ret = send_with_plugin(NULL, &n->id,
+ ret = send_with_plugin(&n->id,
(const char *) &disconnect_msg, sizeof (disconnect_msg),
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name,
n->addr, n->addrlen,
- GNUNET_YES, NULL, n);
+ GNUNET_YES, &send_disconnect_cont, n);
if (ret == GNUNET_SYSERR)
return GNUNET_SYSERR;
@@ -738,13 +731,11 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Sent DISCONNECT_MSG to `%s'\n",
GNUNET_i2s (&n->id));
- change_state (n, S_DISCONNECT);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not send DISCONNECT_MSG to
`%s'\n",
GNUNET_i2s (&n->id));
- change_state (n, S_NOT_CONNECTED);
}
}
@@ -851,7 +842,7 @@
m.size = htons (sizeof (struct GNUNET_MessageHeader));
m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
- send_with_plugin(NULL, &n->id, (const void *) &m,
+ send_with_plugin(&n->id, (const void *) &m,
sizeof (m),
UINT32_MAX /* priority */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
@@ -921,6 +912,8 @@
struct NeighbourMapEntry *n = cls;
GNUNET_assert (n != NULL);
+ GNUNET_assert (!is_connected(n));
+
if (GNUNET_YES == n->in_disconnect)
return; /* neighbour is going away */
if (GNUNET_YES != success)
@@ -934,7 +927,6 @@
n->addrlen),
n->session);
#endif
- change_state(n, S_NOT_CONNECTED);
GNUNET_ATS_address_destroyed (GST_ats,
&n->id,
@@ -949,7 +941,53 @@
change_state(n, S_CONNECT_SENT);
}
+
/**
+ * We tried to switch addresses with an peer already connected. If it failed,
+ * we should tell ATS to not use this address anymore (until it is
re-validated).
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param success GNUNET_OK on success
+ */
+static void
+send_switch_address_continuation (void *cls,
+ const struct GNUNET_PeerIdentity * target,
+ int success)
+
+{
+ struct NeighbourMapEntry *n = cls;
+
+ GNUNET_assert (n != NULL);
+ if (GNUNET_YES == n->in_disconnect)
+ return; /* neighbour is going away */
+
+ GNUNET_assert (n->state == S_CONNECTED);
+ if (GNUNET_YES != success)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s'
address '%s' session %X, asking ATS for new address \n",
+ GNUNET_i2s (&n->id), n->plugin_name,
+ (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s
(n->plugin_name,
+ n->addr,
+ n->addrlen),
+ n->session);
+#endif
+ change_state(n, S_NOT_CONNECTED);
+
+ GNUNET_ATS_address_destroyed (GST_ats,
+ &n->id,
+ n->plugin_name,
+ n->addr,
+ n->addrlen,
+ NULL);
+
+ GNUNET_ATS_suggest_address(GST_ats, &n->id);
+ return;
+ }
+}
+
+/**
* We tried to send a SESSION_CONNECT message to another peer. If this
* succeeded, we change the state. If it failed, we should tell
* ATS to not use this address anymore (until it is re-validated).
@@ -965,11 +1003,6 @@
{
struct NeighbourMapEntry *n = cls;
- //FIMXE comeplete this
- GNUNET_break (0);
-return;
-
-
GNUNET_assert (n != NULL);
if (GNUNET_YES == n->in_disconnect)
return; /* neighbour is going away */
@@ -996,7 +1029,7 @@
GNUNET_ATS_suggest_address(GST_ats, &n->id);
return;
}
- change_state(n, S_CONNECT_SENT);
+ //change_state(n, S_CONNECT_SENT);
}
/**
@@ -1026,14 +1059,6 @@
size_t msg_len;
size_t ret;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session
%X\n",
- GNUNET_i2s (peer), plugin_name,
- (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
- address,
- address_len),
- session);
-
GNUNET_assert (neighbours != NULL);
n = lookup_neighbour (peer);
if (NULL == n)
@@ -1046,16 +1071,17 @@
return GNUNET_NO;
}
-
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session
%X\n",
- GNUNET_i2s (peer), plugin_name,
+#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "ATS tells us to switch to plugin `%s' address '%s' session %X
for %s peer `%s'\n",
+ plugin_name,
(address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
address,
address_len),
- session);
-#endif
+ session, (is_connected(n) ? "CONNECTED" : "NOT CONNECTED"),
+ GNUNET_i2s (peer));
+
GNUNET_free_non_null (n->addr);
n->addr = GNUNET_malloc (address_len);
memcpy (n->addr, address, address_len);
@@ -1079,7 +1105,7 @@
connect_msg.timestamp =
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
- ret =send_with_plugin (NULL, peer, (const char *) &connect_msg, msg_len,
0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len,
GNUNET_YES, &send_connect_continuation, n);
+ ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0,
GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len,
GNUNET_YES, &send_connect_continuation, n);
if (ret == GNUNET_SYSERR)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1090,7 +1116,9 @@
address_len),
session);
}
+ return GNUNET_NO;
}
+ /* We received a CONNECT message and asked ATS for an address */
else if (n->state == S_CONNECT_RECV)
{
msg_len = sizeof (struct SessionConnectMessage);
@@ -1100,22 +1128,41 @@
connect_msg.reserved = htonl (0);
connect_msg.timestamp = GNUNET_TIME_absolute_hton
(GNUNET_TIME_absolute_get ());
- ret = send_with_plugin(NULL, &n->id, (const void *) &connect_msg, msg_len,
0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len,
GNUNET_YES, &send_connect_ack_continuation, n);
+ ret = send_with_plugin(&n->id, (const void *) &connect_msg, msg_len, 0,
GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len,
GNUNET_YES, &send_connect_ack_continuation, n);
if (ret == GNUNET_SYSERR)
{
change_state (n, S_NOT_CONNECTED);
GNUNET_break (0);
- return GNUNET_NO;
}
+ return GNUNET_NO;
}
- else
+ /* connected peer is switching addresses */
+ else if (n->state == S_CONNECTED)
{
- GNUNET_break (0);
+ msg_len = sizeof (struct SessionConnectMessage);
+ connect_msg.header.size = htons (msg_len);
+ connect_msg.header.type =
+ htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
+ connect_msg.reserved = htonl (0);
+ connect_msg.timestamp =
+ GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
+
+ ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0,
GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len,
GNUNET_YES, &send_switch_address_continuation, n);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s'
address '%s' session %X\n",
+ GNUNET_i2s (peer), plugin_name,
+ (address_len == 0) ? "<inbound>" : GST_plugins_a2s
(plugin_name,
+ address,
+
address_len),
+ session);
+ }
+ return GNUNET_NO;
}
-
-
-
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Invalid connection state to switch
addresses ");
+ GNUNET_break (0);
return GNUNET_NO;
}
@@ -1222,10 +1269,11 @@
GNUNET_assert (neighbours != NULL);
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Session %X to peer `%s' ended \n",
session, GNUNET_i2s (peer));
-#endif
+
n = lookup_neighbour (peer);
if (NULL == n)
return;
@@ -1246,6 +1294,7 @@
GNUNET_SCHEDULER_add_delayed
(GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
&neighbour_timeout_task, n);
/* try QUICKLY to re-establish a connection, reduce timeout! */
+// change_state (n, S_NOT_CONNECTED);
GNUNET_ATS_suggest_address (GST_ats, peer);
}
@@ -1267,7 +1316,7 @@
{
struct NeighbourMapEntry *n;
struct MessageQueue *mq;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__);
+
GNUNET_assert (neighbours != NULL);
n = lookup_neighbour (target);
@@ -1291,7 +1340,7 @@
cont (cont_cls, GNUNET_SYSERR);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u %X %s\n",
__LINE__ , n->session, GST_plugins_a2s(n->plugin_name, n->addr, n->addrlen));
+
if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
{
GNUNET_STATISTICS_update (GST_stats,
@@ -1308,7 +1357,7 @@
cont (cont_cls, GNUNET_SYSERR);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__);
+
GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
GNUNET_STATISTICS_update (GST_stats,
gettext_noop
@@ -1324,8 +1373,6 @@
mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__);
-
if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
(NULL == n->is_active))
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
@@ -1642,47 +1689,6 @@
GST_neighbours_force_disconnect (peer);
}
-static void neighbour_connected (struct NeighbourMapEntry *n,
- const struct GNUNET_ATS_Information *ats,
- uint32_t ats_count, int send_connect_ack)
-{
- struct GNUNET_MessageHeader msg;
- size_t msg_len;
- int ret;
-
- if (is_connected(n))
- return;
-
- change_state (n, S_CONNECTED);
- n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
-
&neighbour_keepalive_task,
- n);
-
- if (send_connect_ack)
- {
- /* send CONNECT_ACK (SYN_ACK)*/
- msg_len = sizeof (msg);
- msg.size = htons (msg_len);
- msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
-
- ret = send_with_plugin (NULL, &n->id, (const char *) &msg, msg_len, 0,
- GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name, n->addr,
n->addrlen, GNUNET_YES, NULL, NULL);
- if (ret == GNUNET_SYSERR)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s'
address '%s' session %X\n",
- GNUNET_i2s (&n->id), n->plugin_name,
- (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s
(n->plugin_name,
- n->addr,
- n->addrlen),
- n->session);
- }
- neighbours_connected++;
- GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
- GNUNET_NO);
- connect_notify_cb (callback_cls, &n->id, ats, ats_count);
-}
-
-
/**
* We received a 'SESSION_CONNECT_ACK' message from the other peer.
* Consider switching to it.
@@ -1707,8 +1713,12 @@
uint32_t ats_count)
{
const struct SessionConnectMessage *scm;
+ struct GNUNET_MessageHeader msg;
struct GNUNET_TIME_Absolute ts;
struct NeighbourMapEntry *n;
+ size_t msg_len;
+ size_t ret;
+
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"GST_neighbours_handle_connect_ack SYN/ACK\n");
if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
@@ -1743,7 +1753,33 @@
plugin_name, sender_address, sender_address_len,
session, ats, ats_count);
- neighbour_connected (n, ats, ats_count, GNUNET_YES);
+ change_state (n, S_CONNECTED);
+ n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
+
&neighbour_keepalive_task,
+ n);
+ /* send ACK (ACK)*/
+ msg_len = sizeof (msg);
+ msg.size = htons (msg_len);
+ msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
+
+ ret = send_with_plugin (&n->id, (const char *) &msg, msg_len, 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ n->session, n->plugin_name, n->addr, n->addrlen,
+ GNUNET_YES, NULL, NULL);
+
+ if (ret == GNUNET_SYSERR)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to send SESSION_ACK to `%4s' using plugin `%s' address
'%s' session %X\n",
+ GNUNET_i2s (&n->id), n->plugin_name,
+ (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s
(n->plugin_name,
+ n->addr,
+ n->addrlen),
+ n->session);
+
+ neighbours_connected++;
+ GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
+ GNUNET_NO);
+ connect_notify_cb (callback_cls, &n->id, ats, ats_count);
}
void
@@ -1773,7 +1809,8 @@
if (n->state != S_CONNECT_RECV)
{
- send_disconnect(n);
+ send_disconnect (n);
+ change_state (n, S_DISCONNECT);
GNUNET_break (0);
return;
}
@@ -1793,7 +1830,15 @@
plugin_name, sender_address, sender_address_len,
session, ats, ats_count);
- neighbour_connected (n, ats, ats_count, GNUNET_NO);
+ change_state (n, S_CONNECTED);
+ n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
+
&neighbour_keepalive_task,
+ n);
+
+ neighbours_connected++;
+ GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
+ GNUNET_NO);
+ connect_notify_cb (callback_cls, &n->id, ats, ats_count);
}
struct BlackListCheckContext
@@ -1855,7 +1900,7 @@
GNUNET_free (bcc);
- if (n->state > S_NOT_CONNECTED)
+ if (n->state != S_NOT_CONNECTED)
return;
change_state (n, S_CONNECT_RECV);
@@ -1887,6 +1932,7 @@
uint32_t ats_count)
{
const struct SessionConnectMessage *scm;
+ struct NeighbourMapEntry * n;
struct BlackListCheckContext * bcc = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"GST_neighbours_handle_connect SYN\n");
@@ -1900,7 +1946,18 @@
scm = (const struct SessionConnectMessage *) message;
GNUNET_break_op (ntohl (scm->reserved) == 0);
+ n = lookup_neighbour(peer);
+ if (n != NULL)
+ {
+ /* connected peer switches addresses */
+ if (is_connected(n))
+ {
+ GNUNET_ATS_address_update(GST_ats, peer, plugin_name, sender_address,
sender_address_len, session, ats, ats_count);
+ return;
+ }
+ }
+ /* we are not connected to this peer */
/* do blacklist check*/
bcc = GNUNET_malloc (sizeof (struct BlackListCheckContext) +
sizeof (struct GNUNET_ATS_Information) * ats_count +
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r17811 - gnunet/src/transport,
gnunet <=