gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26423 - gnunet/src/dv


From: gnunet
Subject: [GNUnet-SVN] r26423 - gnunet/src/dv
Date: Thu, 14 Mar 2013 14:46:14 +0100

Author: grothoff
Date: 2013-03-14 14:46:14 +0100 (Thu, 14 Mar 2013)
New Revision: 26423

Modified:
   gnunet/src/dv/Makefile.am
   gnunet/src/dv/gnunet-service-dv.c
Log:
-more DV hacking

Modified: gnunet/src/dv/Makefile.am
===================================================================
--- gnunet/src/dv/Makefile.am   2013-03-14 12:49:54 UTC (rev 26422)
+++ gnunet/src/dv/Makefile.am   2013-03-14 13:46:14 UTC (rev 26423)
@@ -38,6 +38,7 @@
 gnunet_service_dv_SOURCES = \
  gnunet-service-dv.c dv.h
 gnunet_service_dv_LDADD = \
+  $(top_builddir)/src/consensus/libgnunetconsensus.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(top_builddir)/src/core/libgnunetcore.la \
   $(top_builddir)/src/util/libgnunetutil.la \

Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c   2013-03-14 12:49:54 UTC (rev 26422)
+++ gnunet/src/dv/gnunet-service-dv.c   2013-03-14 13:46:14 UTC (rev 26423)
@@ -43,6 +43,11 @@
 #define GNUNET_DV_CONSENSUS_FREQUENCY 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5))
 
 /**
+ * Maximum number of messages we queue per peer.
+ */
+#define MAX_QUEUE_SIZE 16
+
+/**
  * The default fisheye depth, from how many hops away will
  * we keep peers?
  */
@@ -101,6 +106,11 @@
    */
   struct GNUNET_PeerIdentity target;
 
+  /**
+   * The (actual) sender of the message.
+   */
+  struct GNUNET_PeerIdentity sender;
+
 };
 
 GNUNET_NETWORK_STRUCT_END
@@ -190,6 +200,12 @@
   struct GNUNET_CONSENSUS_Handle *consensus;
 
   /**
+   * ID of the task we use to (periodically) update our consensus
+   * with this peer.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier consensus_task;
+
+  /**
    * At what offset are we, with respect to inserting our own routes
    * into the consensus?
    */
@@ -201,6 +217,11 @@
    */
   unsigned int consensus_insertion_distance;
 
+  /**
+   * Number of messages currently in the 'pm_XXXX'-DLL.
+   */
+  unsigned int pm_queue_size;
+
 };
 
 
@@ -272,12 +293,6 @@
 static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
 
 /**
- * ID of the task we use to (periodically) update our consensus
- * with other peers.
- */
-static GNUNET_SCHEDULER_Task consensus_task;
-
-/**
  * Handle to the core service api.
  */
 static struct GNUNET_CORE_Handle *core_api;
@@ -319,12 +334,7 @@
  */
 struct GNUNET_STATISTICS_Handle *stats;
 
-/**
- * How far out to keep peers we learn about.
- */
-static unsigned long long fisheye_depth;
 
-
 /**
  * Get distance information from 'atsi'.
  *
@@ -400,7 +410,7 @@
  */
 static void
 send_data_to_plugin (const struct GNUNET_MessageHeader *message, 
-                    struct GNUNET_PeerIdentity *distant_neighbor, 
+                    const struct GNUNET_PeerIdentity *distant_neighbor, 
                     uint32_t distance)
 {
   struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -487,7 +497,7 @@
  * @param uid plugin-chosen UID for the message
  */
 static void
-send_ack_to_plugin (struct GNUNET_PeerIdentity *target, 
+send_ack_to_plugin (const struct GNUNET_PeerIdentity *target, 
                    uint32_t uid)
 {
   struct GNUNET_DV_AckMessage ack_msg;
@@ -581,6 +591,7 @@
   while ( (NULL != (pending = dn->pm_head)) &&
          (size >= off + (msize = ntohs (pending->msg->size))))
   {
+    dn->pm_queue_size--;
     GNUNET_CONTAINER_DLL_remove (dn->pm_head,
                                 dn->pm_tail,
                                  pending);
@@ -604,6 +615,59 @@
 
 
 /**
+ * Forward the given payload to the given target.
+ *
+ * @param target where to send the message
+ * @param distance expected (remaining) distance to the target
+ * @param sender original sender of the message
+ * @param payload payload of the message
+ */
+static void
+forward_payload (struct DirectNeighbor *target,
+                uint32_t distance,
+                const struct GNUNET_PeerIdentity *sender,
+                const struct GNUNET_MessageHeader *payload)
+{
+  struct PendingMessage *pm;
+  struct RouteMessage *rm;
+  size_t msize;
+
+  if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
+       (0 != memcmp (sender,
+                    &my_identity,
+                    sizeof (struct GNUNET_PeerIdentity))) )
+    return;
+  msize = sizeof (struct RouteMessage) + ntohs (payload->size);
+  if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+  pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
+  rm = (struct RouteMessage *) &pm[1];
+  rm->header.size = htons ((uint16_t) msize);
+  rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
+  rm->distance = htonl (distance);
+  rm->target = target->peer;
+  rm->sender = *sender;
+  memcpy (&rm[1], payload, ntohs (payload->size));
+  GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
+                                   target->pm_tail,
+                                   pm);
+  target->pm_queue_size++;
+  if (NULL == target->cth)
+    target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
+                                                    GNUNET_YES /* cork */,
+                                                    0 /* priority */,
+                                                    
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                    &target->peer,
+                                                    msize,                     
                 
+                                                    &core_transmit_notify, 
target);
+}
+
+
+/**
  * Find a free slot for storing a 'route' in the 'consensi'
  * set at the given distance.
  *
@@ -741,6 +805,7 @@
  * @param message the message
  * @param atsi transport ATS information (latency, distance, etc.)
  * @param atsi_count number of entries in atsi
+ * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the 
protocol
  */
 static int
 handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
@@ -748,7 +813,62 @@
                         const struct GNUNET_ATS_Information *atsi,
                         unsigned int atsi_count)
 {
-  GNUNET_break (0); // FIXME
+  const struct RouteMessage *rm;
+  const struct GNUNET_MessageHeader *payload;
+  struct Route *route;
+
+  if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct 
GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  rm = (const struct RouteMessage *) message;
+  payload = (const struct GNUNET_MessageHeader *) &rm[1];
+  if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs 
(payload->size))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (0 == memcmp (&rm->target,
+                  &my_identity,
+                  sizeof (struct GNUNET_PeerIdentity)))
+  {
+    /* message is for me, check reverse route! */
+    route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+                                              &rm->sender.hashPubKey);
+    if (NULL == route)
+    {
+      /* don't have reverse route, drop */
+      GNUNET_STATISTICS_update (stats,
+                               "# message discarded (no reverse route)",
+                               1, GNUNET_NO);
+      return GNUNET_OK;
+    }
+    send_data_to_plugin (payload,
+                        &rm->sender,
+                        route->target.distance);
+    return GNUNET_OK;
+  }
+  route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+                                            &rm->target.hashPubKey);
+  if (NULL == route)
+  {
+    GNUNET_STATISTICS_update (stats,
+                             "# messages discarded (no route)",
+                             1, GNUNET_NO);
+    return GNUNET_OK;
+  }
+  if (route->target.distance > ntohl (rm->distance) + 1)
+  {
+    GNUNET_STATISTICS_update (stats,
+                             "# messages discarded (target too far)",
+                             1, GNUNET_NO);
+    return GNUNET_OK;
+  }
+  forward_payload (route->next_hop,
+                  route->target.distance,
+                  &rm->sender,
+                  payload);
   return GNUNET_OK;  
 }
 
@@ -765,7 +885,43 @@
 handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
                         const struct GNUNET_MessageHeader *message)
 {
-  GNUNET_break (0); // FIXME
+  struct Route *route;
+  const struct GNUNET_DV_SendMessage *msg;
+  const struct GNUNET_MessageHeader *payload;
+
+  if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof 
(struct GNUNET_MessageHeader))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  msg = (const struct GNUNET_DV_SendMessage *) message;
+  payload = (const struct GNUNET_MessageHeader *) &msg[1];
+  if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs 
(payload->size))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+                                            &msg->target.hashPubKey);
+  if (NULL == route)
+  {
+    /* got disconnected, send ACK anyway? 
+       FIXME: What we really want is an 'NACK' here... */
+    GNUNET_STATISTICS_update (stats,
+                             "# local messages discarded (no route)",
+                             1, GNUNET_NO);
+    send_ack_to_plugin (&msg->target, htonl (msg->uid));
+    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    return;
+  }
+  // FIXME: flow control (send ACK only once message has left the queue...)
+  send_ack_to_plugin (&msg->target, htonl (msg->uid));
+  forward_payload (route->next_hop,
+                  route->target.distance,
+                  &my_identity,
+                  payload);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -876,6 +1032,7 @@
 
   while (NULL != (pending = neighbor->pm_head))
   {
+    neighbor->pm_queue_size--;
     GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
                                 neighbor->pm_tail,
                                 pending);    
@@ -889,6 +1046,16 @@
     GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
     neighbor->cth = NULL;
   }
+  if (GNUNET_SCHEDULER_NO_TASK != neighbor->consensus_task)
+  {
+    GNUNET_SCHEDULER_cancel (neighbor->consensus_task);
+    neighbor->consensus_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (NULL != neighbor->consensus)
+  {
+    GNUNET_CONSENSUS_destroy (neighbor->consensus);
+    neighbor->consensus = NULL;
+  }
   GNUNET_assert (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_remove (direct_neighbors, 
                                                       
&neighbor->peer.hashPubKey,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]