gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36211 - gnunet/src/rps


From: gnunet
Subject: [GNUnet-SVN] r36211 - gnunet/src/rps
Date: Wed, 5 Aug 2015 23:47:36 +0200

Author: ch3
Date: 2015-08-05 23:47:36 +0200 (Wed, 05 Aug 2015)
New Revision: 36211

Modified:
   gnunet/src/rps/gnunet-service-rps.c
Log:
-keep track of messages passed to mq

Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-08-05 21:47:34 UTC (rev 36210)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-08-05 21:47:36 UTC (rev 36211)
@@ -183,6 +183,34 @@
 
 
 /**
+ * List containing all messages that are yet to be send
+ */
+struct PendingMessage
+{
+  /**
+   * DLL next, prev
+   */
+  struct PendingMessage *next;
+  struct PendingMessage *prev;
+
+  /**
+   * The envelope to the corresponding message
+   */
+  struct GNUNET_MQ_Envelope *ev;
+
+  /**
+   * The corresponding context
+   */
+  struct PeerContext *peer_ctx;
+
+  /**
+   * The message type
+   */
+  const char *type;
+};
+
+
+/**
  * Struct used to keep track of other peer's status
  *
  * This is stored in a multipeermap.
@@ -232,6 +260,12 @@
   uint32_t peer_flags;
 
   /**
+   * DLL with all messages that are yet to be sent
+   */
+  struct PendingMessage *pending_messages_head;
+  struct PendingMessage *pending_messages_tail;
+
+  /**
    * This is pobably followed by 'statistical' data (when we first saw
    * him, how did we get his ID, how many pushes (in a timeinterval),
    * ...)
@@ -1148,6 +1182,45 @@
 
 
 /**
+ * @brief Add an envelope to a message passed to mq to list of pending messages
+ *
+ * @param peer peer the message was sent to
+ * @param ev envelope to the message
+ * @param type type of the message to be sent
+ */
+static struct PendingMessage *
+insert_pending_message (const struct GNUNET_PeerIdentity *peer,
+                        struct GNUNET_MQ_Envelope *ev,
+                        const char *type)
+{
+  struct PendingMessage *pending_msg;
+  struct PeerContext *peer_ctx;
+
+  peer_ctx = get_peer_ctx (peer);
+  pending_msg = GNUNET_new (struct PendingMessage);
+  pending_msg->ev = ev;
+  pending_msg->peer_ctx = peer_ctx;
+  pending_msg->type = type;
+  GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
+                               peer_ctx->pending_messages_tail,
+                               pending_msg);
+  return pending_msg;
+}
+
+static void
+remove_pending_message (struct PendingMessage *pending_msg)
+{
+  struct PeerContext *peer_ctx;
+
+  peer_ctx = pending_msg->peer_ctx;
+  GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
+                               peer_ctx->pending_messages_tail,
+                               pending_msg);
+  GNUNET_free (pending_msg);
+}
+
+
+/**
  * @brief This is called once a message is sent.
  *
  * @param cls type of the message that was sent
@@ -1155,10 +1228,11 @@
 static void
 mq_notify_sent_cb (void *cls)
 {
-  const char *type = cls;
+  struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "%s was sent.\n",
-      type);
+      pending_msg->type);
+  remove_pending_message (pending_msg);
 }
 
 
@@ -1178,6 +1252,7 @@
   struct GNUNET_MQ_Handle *mq;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
+  struct PendingMessage *pending_msg;
 
   /* Compute actual size */
   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
@@ -1208,9 +1283,10 @@
   memcpy (&out_msg[1], peer_ids,
          send_size * sizeof (struct GNUNET_PeerIdentity));
 
+  pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY");
   GNUNET_MQ_notify_sent (ev,
       mq_notify_sent_cb,
-      "PULL REPLY");
+      pending_msg);
   GNUNET_MQ_send (mq, ev);
 }
 
@@ -1923,11 +1999,12 @@
  * @param peer_id the peer to send the pull request to.
  */
 static void
-send_pull_request (struct GNUNET_PeerIdentity *peer_id)
+send_pull_request (const struct GNUNET_PeerIdentity *peer_id)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_MQ_Handle *mq;
   struct PeerContext *peer_ctx;
+  struct PendingMessage *pending_msg;
 
   peer_ctx = get_peer_ctx (peer_id);
   GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING));
@@ -1939,9 +2016,10 @@
 
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
   mq = get_mq (peer_id);
+  pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST");
   GNUNET_MQ_notify_sent (ev,
       mq_notify_sent_cb,
-      "PULL REQUEST");
+      pending_msg);
   GNUNET_MQ_send (mq, ev);
 }
 
@@ -1952,10 +2030,11 @@
  * @param peer_id the peer to send the push to.
  */
 static void
-send_push (struct GNUNET_PeerIdentity *peer_id)
+send_push (const struct GNUNET_PeerIdentity *peer_id)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_MQ_Handle *mq;
+  struct PendingMessage *pending_msg;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Going to send PUSH to peer %s.\n",
@@ -1963,9 +2042,10 @@
 
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
   mq = get_mq (peer_id);
+  pending_msg = insert_pending_message (peer_id, ev, "PUSH");
   GNUNET_MQ_notify_sent (ev,
       mq_notify_sent_cb,
-      "PUSH");
+      pending_msg);
   GNUNET_MQ_send (mq, ev);
 }
 
@@ -2542,7 +2622,7 @@
        GNUNET_i2s (&peer_ctx->peer_id));
 
   /* Remove it from the sampler used for the Brahms protocol */
-    RPS_sampler_reinitialise_by_value (prot_sampler, key);
+  RPS_sampler_reinitialise_by_value (prot_sampler, key);
 
   /* If operations are still scheduled for this peer cancel those */
   if (0 != peer_ctx->num_outstanding_ops)
@@ -2585,6 +2665,18 @@
   if (GNUNET_YES == in_arr (pull_list, pull_list_size, key))
     rem_from_list (&pull_list, &pull_list_size, key);
 
+  /* Cancle messages that have not been sent yet */
+  while (NULL != peer_ctx->pending_messages_head)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Removing unsent %s\n",
+        peer_ctx->pending_messages_head->type);
+    /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does 
not
+     * set a #GNUNET_MQ_CancelImpl */
+    /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */
+    remove_pending_message (peer_ctx->pending_messages_head);
+  }
+
   /* If there is still a mq destroy it */
   if (NULL != peer_ctx->mq)
   {
@@ -2636,19 +2728,20 @@
 
   if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
        (GNUNET_NO  == GNUNET_CONTAINER_multipeermap_contains (view, peer)) &&
-       (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) 
&&
        (GNUNET_NO  == in_arr (push_list, push_list_size, peer)) &&
-       (GNUNET_NO  == in_arr (pull_list, pull_list_size, peer)) )
+       (GNUNET_NO  == in_arr (pull_list, pull_list_size, peer)) &&
+       (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) 
)
   {
     peer_ctx = get_peer_ctx (peer);
 
     if ( (NULL == peer_ctx->recv_channel) &&
+         (NULL == peer_ctx->pending_messages_head) &&
          (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) )
     {
       #ifdef ENABLE_MALICIOUS
       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
         peer_remove_cb (NULL, peer, peer_ctx);
-      #else
+      #else /* ENABLE_MALICIOUS */
       peer_remove_cb (NULL, peer, peer_ctx);
       #endif /* ENABLE_MALICIOUS */
     }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]