[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33776 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33776 - gnunet/src/transport |
Date: |
Mon, 23 Jun 2014 13:08:12 +0200 |
Author: grothoff
Date: 2014-06-23 13:08:12 +0200 (Mon, 23 Jun 2014)
New Revision: 33776
Modified:
gnunet/src/transport/plugin_transport_udp.c
Log:
adding monitoring support to UDP plugin, plus some doxygen/indentation fixes
Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2014-06-23 10:53:10 UTC (rev
33775)
+++ gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:08:12 UTC (rev
33776)
@@ -185,13 +185,11 @@
/**
* Number of bytes waiting for transmission to this peer.
- * FIXME: not updated yet!
*/
unsigned long long bytes_in_queue;
/**
* Number of messages waiting for transmission to this peer.
- * FIXME: not updated yet!
*/
unsigned int msgs_in_queue;
@@ -1183,14 +1181,21 @@
/**
- * FIXME.
+ * Remove a message from the transmission queue.
+ *
+ * @param plugin the UDP plugin
+ * @param udpw message wrapper to queue
*/
static void
dequeue (struct Plugin *plugin,
struct UDP_MessageWrapper *udpw)
{
+ struct Session *session = udpw->session;
+
if (plugin->bytes_in_buffer < udpw->msg_size)
- GNUNET_break(0);
+ {
+ GNUNET_break (0);
+ }
else
{
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1203,13 +1208,22 @@
"# UDP, total, msgs in buffers",
-1, GNUNET_NO);
if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head,
- plugin->ipv4_queue_tail, udpw);
+ GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
+ plugin->ipv4_queue_tail,
+ udpw);
else if (udpw->session->address->address_length == sizeof(struct
IPv6UdpAddress))
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head,
- plugin->ipv6_queue_tail, udpw);
+ GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
+ plugin->ipv6_queue_tail,
+ udpw);
else
+ {
GNUNET_break (0);
+ return;
+ }
+ GNUNET_assert (session->msgs_in_queue > 0);
+ session->msgs_in_queue--;
+ GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
+ session->bytes_in_queue -= udpw->msg_size;
}
@@ -1240,9 +1254,7 @@
dummy.cont = NULL;
dummy.cont_cls = NULL;
dummy.session = s;
-
call_continuation (&dummy, result);
-
/* Remove leftover fragments from queue */
if (s->address->address_length == sizeof(struct IPv6UdpAddress))
{
@@ -1273,12 +1285,15 @@
udpw = tmp;
}
}
-
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
/* Destroy fragmentation context */
- GNUNET_FRAGMENT_context_destroy (fc->frag, &s->last_expected_msg_delay,
- &s->last_expected_ack_delay);
+ GNUNET_FRAGMENT_context_destroy (fc->frag,
+ &s->last_expected_msg_delay,
+ &s->last_expected_ack_delay);
s->frag_ctx = NULL;
- GNUNET_free(fc);
+ GNUNET_free (fc);
}
@@ -1363,7 +1378,6 @@
GNUNET_free (d_ctx);
}
}
-
next = plugin->ipv4_queue_head;
while (NULL != (udpw = next))
{
@@ -1386,6 +1400,9 @@
GNUNET_free(udpw);
}
}
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_DOWN);
plugin->env->session_end (plugin->env->cls,
s->address,
s);
@@ -1414,7 +1431,9 @@
GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
GNUNET_NO);
if (s->rc > 0)
+ {
s->in_destroy = GNUNET_YES;
+ }
else
{
GNUNET_HELLO_address_free (s->address);
@@ -1881,30 +1900,45 @@
/**
- * FIXME.
+ * Enqueue a message for transmission.
+ *
+ * @param plugin the UDP plugin
+ * @param udpw message wrapper to queue
*/
static void
enqueue (struct Plugin *plugin,
struct UDP_MessageWrapper *udpw)
{
+ struct Session *session = udpw->session;
+
if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
- GNUNET_break(0);
+ {
+ GNUNET_break (0);
+ }
else
{
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
plugin->bytes_in_buffer += udpw->msg_size;
}
- GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, msgs in
buffers",
- 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, total, msgs in buffers",
+ 1, GNUNET_NO);
if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
- plugin->ipv4_queue_tail, udpw);
+ plugin->ipv4_queue_tail,
+ udpw);
else if (udpw->session->address->address_length == sizeof (struct
IPv6UdpAddress))
- GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head,
- plugin->ipv6_queue_tail, udpw);
+ GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
+ plugin->ipv6_queue_tail,
+ udpw);
else
+ {
GNUNET_break (0);
+ return;
+ }
+ session->msgs_in_queue++;
+ session->bytes_in_queue += udpw->msg_size;
}
@@ -1949,7 +1983,9 @@
struct UDP_MessageWrapper * udpw;
size_t msg_len = ntohs (msg->size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes\n", msg_len);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Enqueuing fragment with %u bytes\n",
+ msg_len);
frag_ctx->fragments_used++;
udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
udpw->session = frag_ctx->session;
@@ -2073,8 +2109,9 @@
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, unfragmented msgs, bytes payload, attempt", udpw->payload_size,
- GNUNET_NO);
+ "# UDP, unfragmented msgs, bytes payload,
attempt",
+ udpw->payload_size,
+ GNUNET_NO);
}
else
{
@@ -2092,17 +2129,30 @@
frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without
UDP overhead */
frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
- UDP_MTU, &plugin->tracker, s->last_expected_msg_delay,
- s->last_expected_ack_delay, &udp->header, &enqueue_fragment, frag_ctx);
+ UDP_MTU,
+ &plugin->tracker,
+
s->last_expected_msg_delay,
+
s->last_expected_ack_delay,
+ &udp->header,
+ &enqueue_fragment,
+ frag_ctx);
s->frag_ctx = frag_ctx;
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, fragmented msgs, messages, pending", 1, GNUNET_NO);
+ "# UDP, fragmented msgs, messages, pending",
+ 1,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, fragmented msgs, messages, attempt", 1, GNUNET_NO);
+ "# UDP, fragmented msgs, messages, attempt",
+ 1,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, fragmented msgs, bytes payload, attempt",
- frag_ctx->payload_size, GNUNET_NO);
+ "# UDP, fragmented msgs, bytes payload, attempt",
+ frag_ctx->payload_size,
+ GNUNET_NO);
}
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
schedule_select (plugin);
return udpmlen;
}
@@ -2392,6 +2442,9 @@
udp_ack->sender = *rc->plugin->env->my_identity;
memcpy (&udp_ack[1], msg, ntohs (msg->size));
enqueue (rc->plugin, udpw);
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
schedule_select (rc->plugin);
}
@@ -2658,7 +2711,9 @@
struct GNUNET_TIME_Relative remaining;
struct Session *session;
struct Plugin *plugin;
+ int removed;
+ removed = GNUNET_NO;
udpw = head;
while (NULL != udpw)
{
@@ -2673,52 +2728,74 @@
{
case UMT_MSG_UNFRAGMENTED:
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
+ "# UDP, total, bytes, sent, timeout",
+ udpw->msg_size,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
+ "# UDP, total, messages, sent, timeout",
+ 1,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, unfragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
+ "# UDP, unfragmented msgs, messages, sent,
timeout",
+ 1,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, unfragmented msgs, bytes, sent, timeout",
- udpw->payload_size, GNUNET_NO);
+ "# UDP, unfragmented msgs, bytes, sent,
timeout",
+ udpw->payload_size,
+ GNUNET_NO);
/* Not fragmented message */
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Message for peer `%s' with size %u timed out\n",
- GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message for peer `%s' with size %u timed out\n",
+ GNUNET_i2s (&udpw->session->target),
+ udpw->payload_size);
call_continuation (udpw, GNUNET_SYSERR);
/* Remove message */
+ removed = GNUNET_YES;
dequeue (plugin, udpw);
GNUNET_free(udpw);
break;
case UMT_MSG_FRAGMENTED:
/* Fragmented message */
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, bytes, sent, timeout", udpw->frag_ctx->on_wire_size,
- GNUNET_NO);
+ "# UDP, total, bytes, sent, timeout",
+ udpw->frag_ctx->on_wire_size,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
+ "# UDP, total, messages, sent, timeout",
+ 1,
+ GNUNET_NO);
call_continuation (udpw, GNUNET_SYSERR);
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Fragment for message for peer `%s' with size %u timed out\n",
- GNUNET_i2s (&udpw->session->target), udpw->frag_ctx->payload_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragment for message for peer `%s' with size %u timed out\n",
+ GNUNET_i2s (&udpw->session->target),
+ udpw->frag_ctx->payload_size);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, fragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
+ "# UDP, fragmented msgs, messages, sent,
timeout",
+ 1,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, fragmented msgs, bytes, sent, timeout",
- udpw->frag_ctx->payload_size, GNUNET_NO);
+ "# UDP, fragmented msgs, bytes, sent,
timeout",
+ udpw->frag_ctx->payload_size,
+ GNUNET_NO);
/* Remove fragmented message due to timeout */
fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
break;
case UMT_MSG_ACK:
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
+ "# UDP, total, bytes, sent, timeout",
+ udpw->msg_size,
+ GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "ACK Message for peer `%s' with size %u timed out\n",
- GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+ "# UDP, total, messages, sent, timeout",
+ 1,
+ GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "ACK Message for peer `%s' with size %u timed out\n",
+ GNUNET_i2s (&udpw->session->target),
+ udpw->payload_size);
call_continuation (udpw, GNUNET_SYSERR);
+ removed = GNUNET_YES;
dequeue (plugin, udpw);
GNUNET_free(udpw);
break;
@@ -2735,32 +2812,38 @@
udpw = NULL;
}
GNUNET_STATISTICS_update (plugin->env->stats,
- "# messages dismissed due to timeout", 1, GNUNET_NO);
+ "# messages discarded due to timeout",
+ 1,
+ GNUNET_NO);
}
else
{
/* Message did not time out, check flow delay */
- remaining = GNUNET_TIME_absolute_get_remaining (
- udpw->session->flow_delay_from_other_peer);
+ remaining = GNUNET_TIME_absolute_get_remaining
(udpw->session->flow_delay_from_other_peer);
if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
{
/* this message is not delayed */
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Message for peer `%s' (%u bytes) is not delayed \n",
- GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message for peer `%s' (%u bytes) is not delayed \n",
+ GNUNET_i2s (&udpw->session->target),
+ udpw->payload_size);
break; /* Found message to send, break */
}
else
{
/* Message is delayed, try next */
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Message for peer `%s' (%u bytes) is delayed for %s\n",
- GNUNET_i2s (&udpw->session->target), udpw->payload_size,
- GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message for peer `%s' (%u bytes) is delayed for %s\n",
+ GNUNET_i2s (&udpw->session->target), udpw->payload_size,
+ GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
udpw = udpw->next;
}
}
}
+ if (GNUNET_YES == removed)
+ notify_session_monitor (session->plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
return udpw;
}
@@ -2868,13 +2951,18 @@
{
call_continuation (udpw, GNUNET_OK);
dequeue (plugin, udpw);
+ notify_session_monitor (plugin,
+ udpw->session,
+ GNUNET_TRANSPORT_SS_UP);
GNUNET_free (udpw);
return GNUNET_SYSERR;
}
- sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, a,
- slen);
-
+ sent = GNUNET_NETWORK_socket_sendto (sock,
+ udpw->msg_buf,
+ udpw->msg_size,
+ a,
+ slen);
if (GNUNET_SYSERR == sent)
{
/* Failure */
@@ -2902,9 +2990,10 @@
call_continuation (udpw, GNUNET_OK);
}
dequeue (plugin, udpw);
+ notify_session_monitor (plugin,
+ udpw->session,
+ GNUNET_TRANSPORT_SS_UP);
GNUNET_free(udpw);
- udpw = NULL;
-
return sent;
}
@@ -3146,8 +3235,10 @@
}
else
{
- LOG(GNUNET_ERROR_TYPE_ERROR, "Failed to bind UDP socket to %s: %s\n",
- GNUNET_a2s (server_addr, addrlen), STRERROR (eno));
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to bind UDP socket to %s: %s\n"),
+ GNUNET_a2s (server_addr, addrlen),
+ STRERROR (eno));
}
}
@@ -3185,10 +3276,15 @@
}
schedule_select (plugin);
- plugin->nat = GNUNET_NAT_register (plugin->env->cfg, GNUNET_NO, plugin->port,
+ plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
+ GNUNET_NO,
+ plugin->port,
sockets_created,
- (const struct sockaddr **) addrs,
addrlens,
- &udp_nat_port_map_callback, NULL, plugin);
+ (const struct sockaddr **) addrs,
+ addrlens,
+ &udp_nat_port_map_callback,
+ NULL,
+ plugin);
return sockets_created;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33776 - gnunet/src/transport,
gnunet <=