[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33778 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33778 - gnunet/src/transport |
Date: |
Mon, 23 Jun 2014 13:26:59 +0200 |
Author: grothoff
Date: 2014-06-23 13:26:59 +0200 (Mon, 23 Jun 2014)
New Revision: 33778
Modified:
gnunet/src/transport/plugin_transport_tcp.c
gnunet/src/transport/plugin_transport_udp.c
gnunet/src/transport/plugin_transport_unix.c
Log:
-towards having the monitoring API supported by TCP
Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2014-06-23 11:11:40 UTC (rev
33777)
+++ gnunet/src/transport/plugin_transport_tcp.c 2014-06-23 11:26:59 UTC (rev
33778)
@@ -272,6 +272,11 @@
struct GNUNET_SERVER_TransmitHandle *transmit_handle;
/**
+ * Address of the other peer.
+ */
+ struct GNUNET_HELLO_Address *address;
+
+ /**
* ID of task used to delay receiving more to throttle sender.
*/
GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
@@ -281,26 +286,28 @@
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
- struct GNUNET_HELLO_Address *address;
-
/**
- * Address of the other peer (either based on our 'connect'
- * call or on our 'accept' call).
- *
- * struct IPv4TcpAddress or struct IPv6TcpAddress
+ * When will this session time out?
*/
- //void *addr;
+ struct GNUNET_TIME_Absolute timeout;
+
/**
- * Length of @e addr.
- */
- //size_t addrlen;
- /**
* Last activity on this connection. Used to select preferred
* connection.
*/
struct GNUNET_TIME_Absolute last_activity;
/**
+ * Number of bytes waiting for transmission to this peer.
+ */
+ unsigned long long bytes_in_queue;
+
+ /**
+ * Number of messages waiting for transmission to this peer.
+ */
+ unsigned int msgs_in_queue;
+
+ /**
* Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO)
*/
int expecting_welcome;
@@ -378,6 +385,16 @@
struct GNUNET_RESOLVER_RequestHandle *ext_dns;
/**
+ * Function to call about session status changes.
+ */
+ GNUNET_TRANSPORT_SessionInfoCallback sic;
+
+ /**
+ * Closure for @e sic.
+ */
+ void *sic_cls;
+
+ /**
* How many more TCP sessions are we allowed to open right now?
*/
unsigned long long max_connections;
@@ -410,7 +427,41 @@
};
+
/**
+ * If a session monitor is attached, notify it about the new
+ * session state.
+ *
+ * @param plugin our plugin
+ * @param session session that changed state
+ * @param state new state of the session
+ */
+static void
+notify_session_monitor (struct Plugin *plugin,
+ struct Session *session,
+ enum GNUNET_TRANSPORT_SessionState state)
+{
+ struct GNUNET_TRANSPORT_SessionInfo info;
+
+ if (NULL == plugin->sic)
+ return;
+ memset (&info, 0, sizeof (info));
+ info.state = state;
+ info.is_inbound = GNUNET_SYSERR; /* hard to say */
+ info.num_msg_pending = session->msgs_in_queue;
+ info.num_bytes_pending = session->bytes_in_queue;
+ /* info.receive_delay remains zero as this is not supported by UDP
+ (cannot selectively not receive from 'some' peer while continuing
+ to receive from others) */
+ info.session_timeout = session->timeout;
+ info.address = session->address;
+ plugin->sic (plugin->sic_cls,
+ session,
+ &info);
+}
+
+
+/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
* address and that the next call to this function is allowed
@@ -780,6 +831,7 @@
{
GNUNET_SCHEDULER_cancel (session->timeout_task);
session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
}
if (GNUNET_YES
@@ -825,16 +877,26 @@
: "Could not deliver message to `%4s', notifying.\n",
GNUNET_i2s (&session->target));
GNUNET_STATISTICS_update (session->plugin->env->stats,
- gettext_noop ("# bytes currently in TCP buffers"),
- -(int64_t) pm->message_size, GNUNET_NO);
- GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop
- ("# bytes discarded by TCP (disconnect)"), pm->message_size, GNUNET_NO);
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ gettext_noop ("# bytes currently in TCP
buffers"),
+ -(int64_t) pm->message_size, GNUNET_NO);
+ GNUNET_STATISTICS_update (session->plugin->env->stats,
+ gettext_noop ("# bytes discarded by TCP
(disconnect)"),
+ pm->message_size,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pm->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pm->message_size;
if (NULL != pm->transmit_cont)
- pm->transmit_cont (pm->transmit_cont_cls, &session->target,
GNUNET_SYSERR,
- pm->message_size, 0);
- GNUNET_free(pm);
+ pm->transmit_cont (pm->transmit_cont_cls,
+ &session->target,
+ GNUNET_SYSERR,
+ pm->message_size,
+ 0);
+ GNUNET_free (pm);
}
if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK )
{
@@ -881,10 +943,25 @@
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Session *s = cls;
+ struct GNUNET_TIME_Relative left;
s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+ if (0 != left.rel_value_us)
+ {
+ /* not actually our turn yet, but let's at least update
+ the monitor, it may think we're about to die ... */
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+ &session_timeout,
+ s);
+ return;
+ }
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
- "Session %p was idle for %s, disconnecting\n", s,
+ "Session %p was idle for %s, disconnecting\n",
+ s,
GNUNET_STRINGS_relative_time_to_string
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
GNUNET_YES));
/* call session destroy function */
@@ -901,14 +978,10 @@
reschedule_session_timeout (struct Session *s)
{
GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- GNUNET_SCHEDULER_cancel (s->timeout_task);
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s);
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
- "Timeout rescheduled for session %p set to %s\n", s,
- GNUNET_STRINGS_relative_time_to_string
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES));
+ s->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
}
+
/**
* Create a new session. Also queues a welcome message.
*
@@ -959,8 +1032,11 @@
GNUNET_STATISTICS_update (plugin->env->stats,
gettext_noop ("# bytes currently in TCP buffers"), pm->message_size,
GNUNET_NO);
- GNUNET_CONTAINER_DLL_insert(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ GNUNET_CONTAINER_DLL_insert (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
if (GNUNET_YES != is_nat)
{
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -968,8 +1044,10 @@
}
plugin->env->register_quota_notification (plugin->env->cls,
&address->peer, PLUGIN_NAME, session);
- session->timeout_task = GNUNET_SCHEDULER_add_delayed (
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
+ session->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ session->timeout_task = GNUNET_SCHEDULER_add_delayed
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ session);
return session;
}
@@ -1012,9 +1090,9 @@
plugin = session->plugin;
if (NULL == buf)
{
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Timeout trying to transmit to peer `%4s', discarding message
queue.\n",
- GNUNET_i2s (&session->target));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout trying to transmit to peer `%4s', discarding message
queue.\n",
+ GNUNET_i2s (&session->target));
/* timeout; cancel all messages that have already expired */
hd = NULL;
tl = NULL;
@@ -1023,13 +1101,19 @@
while ((NULL != (pos = session->pending_messages_head))
&& (pos->timeout.abs_value_us <= now.abs_value_us))
{
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pos);
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit %u byte message to `%4s'.\n", pos->message_size,
- GNUNET_i2s (&session->target));
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pos);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pos->message_size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to transmit %u byte message to `%4s'.\n",
+ pos->message_size,
+ GNUNET_i2s (&session->target));
ret += pos->message_size;
- GNUNET_CONTAINER_DLL_insert_after(hd, tl, tl, pos);
+ GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
}
/* do this call before callbacks (so that if callbacks destroy
* session, they have a chance to cancel actions done by this
@@ -1040,10 +1124,12 @@
* the callbacks may abort the session */
while (NULL != (pos = hd))
{
- GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
- if (pos->transmit_cont != NULL )
- pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR,
- pos->message_size, 0);
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid,
+ GNUNET_SYSERR,
+ pos->message_size, 0);
GNUNET_free(pos);
}
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1062,10 +1148,16 @@
{
if (ret + pos->message_size > size)
break;
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pos);
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pos);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pos->message_size;
GNUNET_assert(size >= pos->message_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Transmitting message of type %u size %u\n",
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting message of type %u size %u\n",
ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type),
pos->message_size);
/* FIXME: this memcpy can be up to 7% of our total runtime */
@@ -1073,7 +1165,7 @@
cbuf += pos->message_size;
ret += pos->message_size;
size -= pos->message_size;
- GNUNET_CONTAINER_DLL_insert_tail(hd, tl, pos);
+ GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos);
}
/* schedule 'continuation' before callbacks so that callbacks that
* cancel everything don't cause us to use a session that no longer
@@ -1085,10 +1177,13 @@
* we should not use 'session' after this point */
while (NULL != (pos = hd))
{
- GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
- if (pos->transmit_cont != NULL )
- pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK,
- pos->message_size, pos->message_size); /* FIXME: include TCP
overhead */
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid,
+ GNUNET_OK,
+ pos->message_size,
+ pos->message_size); /* FIXME: include TCP overhead */
GNUNET_free(pos);
}
GNUNET_assert(hd == NULL);
@@ -1253,11 +1348,12 @@
"Asked to transmit %u bytes to `%s', added message to list.\n",
msgbuf_size, GNUNET_i2s (&session->target));
- if (GNUNET_YES
- == GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
- &session->target, session))
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
+ &session->target,
+ session))
{
- GNUNET_assert(NULL != session->client);
+ GNUNET_assert (NULL != session->client);
GNUNET_SERVER_client_set_timeout (session->client,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1265,9 +1361,11 @@
GNUNET_NO);
/* append pm to pending_messages list */
- GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
- session->pending_messages_tail, pm);
-
+ GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
process_pending_messages (session);
return msgbuf_size;
}
@@ -1283,17 +1381,25 @@
GNUNET_NO);
/* append pm to pending_messages list */
- GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
return msgbuf_size;
}
else
{
- LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid session %p\n", session);
+ LOG(GNUNET_ERROR_TYPE_ERROR,
+ "Invalid session %p\n", session);
if (NULL != cont)
- cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0);
- GNUNET_break(0);
- GNUNET_free(pm);
+ cont (cont_cls,
+ &session->target,
+ GNUNET_SYSERR,
+ pm->message_size,
+ 0);
+ GNUNET_break (0);
+ GNUNET_free (pm);
return GNUNET_SYSERR; /* session does not exist here */
}
}
@@ -2346,9 +2452,10 @@
size_t ret;
tcp_probe_ctx->transmit_handle = NULL;
- GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
- tcp_probe_ctx);
- if (buf == NULL )
+ GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe_ctx);
+ if (buf == NULL)
{
GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock);
GNUNET_free(tcp_probe_ctx);
@@ -2402,8 +2509,9 @@
sizeof(struct GNUNET_PeerIdentity));
tcp_probe_ctx->plugin = plugin;
tcp_probe_ctx->sock = sock;
- GNUNET_CONTAINER_DLL_insert(plugin->probe_head, plugin->probe_tail,
- tcp_probe_ctx);
+ GNUNET_CONTAINER_DLL_insert (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe_ctx);
tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready (
sock, ntohs (tcp_probe_ctx->message.header.size),
GNUNET_TIME_UNIT_FOREVER_REL, ¬ify_send_probe, tcp_probe_ctx);
@@ -2433,6 +2541,62 @@
/**
+ * Return information about the given session to the
+ * monitor callback.
+ *
+ * @param cls the `struct Plugin` with the monitor callback (`sic`)
+ * @param peer peer we send information about
+ * @param value our `struct Session` to send information about
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+send_session_info_iter (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *value)
+{
+ struct Plugin *plugin = cls;
+ struct Session *session = value;
+
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Begin monitoring sessions of a plugin. There can only
+ * be one active monitor per plugin (i.e. if there are
+ * multiple monitors, the transport service needs to
+ * multiplex the generated events over all of them).
+ *
+ * @param cls closure of the plugin
+ * @param sic callback to invoke, NULL to disable monitor;
+ * plugin will being by iterating over all active
+ * sessions immediately and then enter monitor mode
+ * @param sic_cls closure for @a sic
+ */
+static void
+tcp_plugin_setup_monitor (void *cls,
+ GNUNET_TRANSPORT_SessionInfoCallback sic,
+ void *sic_cls)
+{
+ struct Plugin *plugin = cls;
+
+ plugin->sic = sic;
+ plugin->sic_cls = sic_cls;
+ if (NULL != sic)
+ {
+ GNUNET_CONTAINER_multipeermap_iterate (plugin->sessionmap,
+ &send_session_info_iter,
+ plugin);
+ /* signal end of first iteration */
+ sic (sic_cls, NULL, NULL);
+ }
+}
+
+
+/**
* Entry point for the plugin.
*
* @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*'
@@ -2569,6 +2733,7 @@
api->get_network = &tcp_get_network;
api->update_session_timeout = &tcp_plugin_update_session_timeout;
api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
+ api->setup_monitor = &tcp_plugin_setup_monitor;
plugin->service = service;
if (NULL != service)
{
@@ -2678,8 +2843,9 @@
GNUNET_NAT_unregister (plugin->nat);
while (NULL != (tcp_probe = plugin->probe_head))
{
- GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
- tcp_probe);
+ GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe);
GNUNET_CONNECTION_destroy (tcp_probe->sock);
GNUNET_free(tcp_probe);
}
Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:11:40 UTC (rev
33777)
+++ gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:26:59 UTC (rev
33778)
@@ -1573,6 +1573,7 @@
s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
+ s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
&session_timeout, s);
return s;
Modified: gnunet/src/transport/plugin_transport_unix.c
===================================================================
--- gnunet/src/transport/plugin_transport_unix.c 2014-06-23 11:11:40 UTC
(rev 33777)
+++ gnunet/src/transport/plugin_transport_unix.c 2014-06-23 11:26:59 UTC
(rev 33778)
@@ -886,6 +886,7 @@
session->target = address->peer;
session->address = GNUNET_HELLO_address_copy (address);
session->plugin = plugin;
+ session->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
session->timeout_task = GNUNET_SCHEDULER_add_delayed
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
&session_timeout,
session);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33778 - gnunet/src/transport,
gnunet <=