gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19627 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r19627 - gnunet/src/transport
Date: Thu, 2 Feb 2012 13:47:26 +0100

Author: wachs
Date: 2012-02-02 13:47:26 +0100 (Thu, 02 Feb 2012)
New Revision: 19627

Modified:
   gnunet/src/transport/plugin_transport_udp_new.c
Log:
- working with fragmentation now fine tuning


Modified: gnunet/src/transport/plugin_transport_udp_new.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp_new.c     2012-02-02 11:04:15 UTC 
(rev 19626)
+++ gnunet/src/transport/plugin_transport_udp_new.c     2012-02-02 12:47:26 UTC 
(rev 19627)
@@ -108,8 +108,10 @@
 
   struct GNUNET_ATS_Information ats;
 
-  struct FragmentationContext * head;
-  struct FragmentationContext * tail;
+  struct FragmentationContext * frag_ctx;
+
+//  struct FragmentationContext * head;
+//  struct FragmentationContext * tail;
 };
 
 
@@ -201,9 +203,6 @@
    * Length of 'src_addr'
    */
   size_t addr_len;
-
-  struct GNUNET_PeerIdentity id;
-
 };
 
 
@@ -257,7 +256,7 @@
    */
   void *cont_cls;
 
-  struct FragmentationContext *frag;
+  struct FragmentationContext *frag_ctx;
 
 };
 
@@ -526,6 +525,7 @@
 {
   struct Plugin *plugin = cls;
   struct Session *s = value;
+  struct UDPMessageWrapper *udpw;
 
 #if DEBUG_UDP
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -534,22 +534,35 @@
          GNUNET_i2s (&s->target),
          GNUNET_a2s (s->sock_addr, s->addrlen));
 #endif
-  struct FragmentationContext *fctx = s->head;
-  while (fctx != NULL)
+  plugin->env->session_end (plugin->env->cls, &s->target, s);
+
+  while (s->frag_ctx != NULL)
   {
-    GNUNET_FRAGMENT_context_destroy(fctx->frag);
-    GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx);
-    GNUNET_free (fctx);
-    fctx = s->head;
+    GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
+    GNUNET_free (s->frag_ctx);
+    s->frag_ctx = NULL;
   }
 
-  plugin->env->session_end (plugin->env->cls, &s->target, s);
+  udpw = plugin->msg_head;
+  while (udpw != NULL)
+  {
+    if (udpw->session == s)
+    {
+      GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
 
+      if (udpw->cont != NULL)
+        udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
+      GNUNET_free (udpw);
+    }
+    udpw = plugin->msg_head;
+  }
+
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
                                                        &s->target.hashPubKey,
                                                        s));
 
+
   GNUNET_free (s);
   return GNUNET_OK;
 }
@@ -576,6 +589,8 @@
   /* Clean up sessions */
   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, 
&target->hashPubKey, &disconnect_and_free_it, plugin);
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "FREEED SESSIONS from peer `%s'\n", GNUNET_i2s (target));
 }
 
 static struct Session *
@@ -650,7 +665,7 @@
 
   socklen_t s_addrlen = s->addrlen;
 
-#if VERBOSE
+#if VERBOSE_UDP
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
       udp_address_to_string (NULL, (void *) address->address, 
address->address_length),
       GNUNET_a2s (s->sock_addr, s->addrlen));
@@ -719,13 +734,13 @@
   struct SessionCompareContext cctx;
   cctx.addr = address;
   cctx.res = NULL;
-#if DEBUG_UDP
+#if VERBOSE_UDP
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer 
`%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, 
address->address, address->address_length));
 #endif
   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, 
&address->peer.hashPubKey, session_cmp_it, &cctx);
   if (cctx.res != NULL)
   {
-#if DEBUG_UDP
+#if VERBOSE_UDP
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", 
cctx.res);
 #endif
     return cctx.res;
@@ -737,7 +752,7 @@
       address->address,
       address->address_length,
       NULL, NULL);
-#if DEBUG_UDP
+#if VERBOSE
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Creating new session %p for peer `%s' address `%s'\n",
               s,
@@ -771,11 +786,10 @@
 
   size_t msg_len = ntohs (msg->size);
 
-#if DEBUG_UDP
+#if VERBOSE_UDP
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes 
%u\n", msg_len , sizeof (struct UDPMessageWrapper));
 #endif
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes 
%u\n", msg_len , sizeof (struct UDPMessageWrapper));
 
-
   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
   udpw->session = frag_ctx->session;
   udpw->udp = (char *) &udpw[1];
@@ -784,7 +798,7 @@
   udpw->cont = frag_ctx->cont;
   udpw->cont_cls = frag_ctx->cont_cls;
   udpw->timeout = frag_ctx->timeout;
-  udpw->frag = frag_ctx;
+  udpw->frag_ctx = frag_ctx;
   memcpy (udpw->udp, msg, msg_len);
 
   GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
@@ -848,7 +862,7 @@
     return GNUNET_SYSERR;
   }
 
-  LOG (GNUNET_ERROR_TYPE_ERROR,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
        "UDP transmits %u-byte message to `%s' using address `%s'\n",
          msgbuf_size,
          GNUNET_i2s (&s->target),
@@ -870,7 +884,7 @@
     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
     udpw->cont = cont;
     udpw->cont_cls = cont_cls;
-    udpw->frag = NULL;
+    udpw->frag_ctx = NULL;
 
     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
@@ -879,8 +893,10 @@
   }
   else
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
          "UDP has to fragment message \n");
+    if  (s->frag_ctx != NULL)
+      return GNUNET_SYSERR;
     memcpy (&udp[1], msgbuf, msgbuf_size);
     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct 
FragmentationContext));
 
@@ -898,7 +914,7 @@
               &enqueue_fragment,
               frag_ctx);
 
-    GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx);
+    s->frag_ctx = frag_ctx;
 
   }
 
@@ -1013,7 +1029,7 @@
                 &si->sender,
                 hdr,
                 (const struct GNUNET_ATS_Information *) &ats, 2,
-                si->session,
+                NULL,
                 si->arg,
                 si->args);
   si->session->flow_delay_for_other_peer = delay;
@@ -1147,13 +1163,16 @@
     GNUNET_break (0);
     return;
   }
+  LOG (GNUNET_ERROR_TYPE_ERROR, "Sending fragment_msg_proc ms\n");
   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
                        rc->src_addr, rc->addr_len);
 }
 
 struct LookupContext
 {
-  struct DefragContext *rc;
+  const struct sockaddr * addr;
+  size_t addrlen;
+
   struct Session *res;
 };
 
@@ -1163,11 +1182,9 @@
   struct LookupContext *l_ctx = cls;
   struct Session * s = value;
 
-  if ((s->addrlen == l_ctx->rc->addr_len) &&
-      (0 == memcmp (s->sock_addr, l_ctx->rc->src_addr, s->addrlen)))
+  if ((s->addrlen == l_ctx->addrlen) &&
+      (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY \n");
     l_ctx->res = s;
     return GNUNET_NO;
   }
@@ -1185,6 +1202,7 @@
 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
 {
   struct DefragContext *rc = cls;
+  LOG (GNUNET_ERROR_TYPE_ERROR, "Sending ACK ms\n");
 
   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
   struct UDP_ACK_Message *udp_ack;
@@ -1193,22 +1211,19 @@
   struct Session *s;
 
   struct LookupContext l_ctx;
-  l_ctx.rc = rc;
+  l_ctx.addr = rc->src_addr;
+  l_ctx.addrlen = rc->addr_len;
   l_ctx.res = NULL;
-  GNUNET_CONTAINER_multihashmap_get_multiple(rc->plugin->sessions,
-      &rc->id.hashPubKey,
+  GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
       &lookup_session_by_addr_it,
       &l_ctx);
   s = l_ctx.res;
-  if (s != NULL)
-  {
-    if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
-      delay = s->flow_delay_for_other_peer.rel_value;
-    else
-      delay = UINT32_MAX;
-  }
 
+  GNUNET_assert (s != NULL);
 
+  if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+    delay = s->flow_delay_for_other_peer.rel_value;
+
 #if DEBUG_UDP
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending ACK to `%s' including delay of %u ms\n",
@@ -1221,7 +1236,7 @@
   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
   udpw->cont = NULL;
   udpw->cont_cls = NULL;
-  udpw->frag = NULL;
+  udpw->frag_ctx = NULL;
   udpw->msg_size = msize;
   udpw->session = s;
   udpw->timeout = GNUNET_TIME_absolute_get_forever();
@@ -1253,15 +1268,79 @@
   return;
 }
 
-static void read_process_ack ()
+static void read_process_ack (struct Plugin *plugin,
+    const struct GNUNET_MessageHeader *msg,
+    char *addr,
+    socklen_t fromlen)
 {
-  //const struct GNUNET_MessageHeader *ack;
-  //struct Session *peer_session;
-  //const struct UDP_ACK_Message *udp_ack;
-  //struct Session *s = NULL;
-  //struct GNUNET_TIME_Relative flow_delay;
-  //struct GNUNET_ATS_Information ats;
-  GNUNET_break_op (0);
+  const struct GNUNET_MessageHeader *ack;
+  const struct UDP_ACK_Message *udp_ack;
+  struct LookupContext l_ctx;
+  struct Session *s = NULL;
+  struct GNUNET_TIME_Relative flow_delay;
+
+  if (ntohs (msg->size) <
+      sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  udp_ack = (const struct UDP_ACK_Message *) msg;
+
+  l_ctx.addr = (const struct sockaddr *) addr;
+  l_ctx.addrlen = fromlen;
+  l_ctx.res = NULL;
+  GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
+      &lookup_session_by_addr_it,
+      &l_ctx);
+  s = l_ctx.res;
+  GNUNET_assert (s != NULL);
+
+  if (s != NULL)
+  {
+    flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
+         flow_delay.rel_value);
+
+    s->flow_delay_from_other_peer =
+        GNUNET_TIME_relative_to_absolute (flow_delay);
+  }
+
+  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+  if (ntohs (ack->size) !=
+      ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
+  {
+#if DEBUG_UDP
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+       (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+       GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+    return;
+  }
+
+#if DEBUG_UDP
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "FULL MESSAGE ACKed\n",
+       (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+       GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+  plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy 
(s->frag_ctx->frag);
+
+  if (s->frag_ctx->cont != NULL)
+    s->frag_ctx->cont
+    (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
+  GNUNET_free (s->frag_ctx);
+  s->frag_ctx = NULL;
+  return;
 }
 
 static void read_process_fragment (struct Plugin *plugin,
@@ -1307,7 +1386,20 @@
         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
                                       (GNUNET_CONTAINER_HeapCostType)
                                       now.abs_value);
+#if DEBUG_UDP
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for 
%u-byte fragment from `%s'\n",
+       (unsigned int) ntohs (msg->size),
+       GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
   }
+  else
+  {
+#if DEBUG_UDP
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for 
%u-byte fragment from `%s'\n",
+       (unsigned int) ntohs (msg->size),
+       GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
+  }
 
   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
   {
@@ -1375,7 +1467,7 @@
     return;
 
   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
-    read_process_ack ();
+    read_process_ack (plugin, msg, addr, fromlen);;
     return;
 
   case GNUNET_MESSAGE_TYPE_FRAGMENT:
@@ -1409,13 +1501,13 @@
 
       if (udpw->cont != NULL)
         udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
-      if (udpw->frag != NULL)
+      if (udpw->frag_ctx != NULL)
       {
 #if DEBUG_UDP
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmendted message for peer 
`%s' with size %u timed out\n",
-            GNUNET_i2s(&udpw->session->target), udpw->frag->bytes_to_send);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' 
with size %u timed out\n",
+            GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
 #endif
-        GNUNET_FRAGMENT_context_destroy(udpw->frag->frag);
+        GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
       }
       else
       {
@@ -1462,22 +1554,22 @@
   if (GNUNET_SYSERR == sent)
   {
     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto");
-    LOG (GNUNET_ERROR_TYPE_ERROR,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
          "UDP transmitted %u-byte message to %s (%d: %s)\n",
-         (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) 
sent,
+         (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
          (sent < 0) ? STRERROR (errno) : "ok");
     if (udpw->cont != NULL)
       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "UDP transmitted %u-byte message to %s (%d: %s)\n",
-       (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) 
sent,
+       (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
        (sent < 0) ? STRERROR (errno) : "ok");
 
   /* This was just a message fragment */
-  if (udpw->frag != NULL)
+  if (udpw->frag_ctx != NULL)
   {
-    GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag);
+    GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
   }
   /* This was a complete message*/
   else
@@ -1865,7 +1957,7 @@
 {
   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
-
+GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "libgnunet_plugin_transport_udp_done\n ");
   stop_broadcast (plugin);
 
   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]