gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32125 - in gnunet/src: core fs include


From: gnunet
Subject: [GNUnet-SVN] r32125 - in gnunet/src: core fs include
Date: Thu, 30 Jan 2014 20:22:23 +0100

Author: grothoff
Date: 2014-01-30 20:22:23 +0100 (Thu, 30 Jan 2014)
New Revision: 32125

Modified:
   gnunet/src/core/core.h
   gnunet/src/core/core_api.c
   gnunet/src/core/gnunet-service-core_clients.c
   gnunet/src/core/gnunet-service-core_sessions.c
   gnunet/src/core/gnunet-service-core_sessions.h
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/include/gnunet_core_service.h
Log:
-towards fixing #3295 (core traffic prioritization)

Modified: gnunet/src/core/core.h
===================================================================
--- gnunet/src/core/core.h      2014-01-30 18:24:13 UTC (rev 32124)
+++ gnunet/src/core/core.h      2014-01-30 19:22:23 UTC (rev 32125)
@@ -286,7 +286,7 @@
   /**
    * Always 0.
    */
-  uint64_t reserved GNUNET_PACKED;
+  uint32_t reserved GNUNET_PACKED;
 
 };
 

Modified: gnunet/src/core/core_api.c
===================================================================
--- gnunet/src/core/core_api.c  2014-01-30 18:24:13 UTC (rev 32124)
+++ gnunet/src/core/core_api.c  2014-01-30 19:22:23 UTC (rev 32125)
@@ -700,7 +700,8 @@
  * @param ignore_currently_down transmit message even if not initialized?
  */
 static void
-trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
+trigger_next_request (struct GNUNET_CORE_Handle *h,
+                      int ignore_currently_down)
 {
   uint16_t msize;
 
@@ -742,7 +743,8 @@
  * @param msg the message received from the core service
  */
 static void
-main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+main_notify_handler (void *cls,
+                     const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CORE_Handle *h = cls;
   const struct InitReplyMessage *m;
@@ -1265,7 +1267,7 @@
  * @param handle connection to core service
  * @param cork is corking allowed for this transmission?
  * @param priority how important is the message?
- * @param maxdelay how long can the message wait?
+ * @param maxdelay how long can the message wait? Only effective if @a cork is 
#GNUNET_YES
  * @param target who should receive the message, never NULL (can be this 
peer's identity for loopback)
  * @param notify_size how many bytes of buffer space does @a notify want?
  * @param notify function to call when buffer space is available;
@@ -1278,7 +1280,8 @@
  *         if NULL is returned, @a notify will NOT be called.
  */
 struct GNUNET_CORE_TransmitHandle *
-GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
+GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
+                                   int cork,
                                    enum GNUNET_CORE_Priority priority,
                                    struct GNUNET_TIME_Relative maxdelay,
                                    const struct GNUNET_PeerIdentity *target,

Modified: gnunet/src/core/gnunet-service-core_clients.c
===================================================================
--- gnunet/src/core/gnunet-service-core_clients.c       2014-01-30 18:24:13 UTC 
(rev 32124)
+++ gnunet/src/core/gnunet-service-core_clients.c       2014-01-30 19:22:23 UTC 
(rev 32125)
@@ -419,6 +419,11 @@
   struct GSC_ClientActiveRequest *car;
 
   /**
+   * How important is this message.
+   */
+  enum GNUNET_CORE_Priority priority;
+
+  /**
    * Is corking allowed (set only once we have the real message).
    */
   int cork;
@@ -454,7 +459,7 @@
   msize -= sizeof (struct SendMessage);
   GNUNET_break (0 == ntohl (sm->reserved));
   c = find_client (client);
-  if (c == NULL)
+  if (NULL == c)
   {
     /* client did not send INIT first! */
     GNUNET_break (0);
@@ -482,10 +487,13 @@
                                                        &sm->peer,
                                                        tc.car));
   tc.cork = ntohl (sm->cork);
+  tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client asked for transmission of %u bytes to `%s' %s\n", msize,
+              "Client asked for transmission of %u bytes to `%s' %s\n",
+              msize,
               GNUNET_i2s (&sm->peer), tc.cork ? "now" : "");
-  GNUNET_SERVER_mst_receive (client_mst, &tc, (const char *) &sm[1], msize,
+  GNUNET_SERVER_mst_receive (client_mst, &tc,
+                             (const char *) &sm[1], msize,
                              GNUNET_YES, GNUNET_NO);
   if (0 !=
       memcmp (&tc.car->target, &GSC_my_identity,
@@ -541,7 +549,8 @@
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Delivering message of type %u to %s\n", ntohs (message->type),
+                "Delivering message of type %u to %s\n",
+                ntohs (message->type),
                 GNUNET_i2s (&car->target));
     GSC_CLIENTS_deliver_message (&car->target, message,
                                 ntohs (message->size),
@@ -549,7 +558,10 @@
     GSC_CLIENTS_deliver_message (&car->target, message,
                                 sizeof (struct GNUNET_MessageHeader),
                                 GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
-    GSC_SESSIONS_transmit (car, message, tc->cork);
+    GSC_SESSIONS_transmit (car,
+                           message,
+                           tc->cork,
+                           tc->priority);
   }
   return GNUNET_OK;
 }

Modified: gnunet/src/core/gnunet-service-core_sessions.c
===================================================================
--- gnunet/src/core/gnunet-service-core_sessions.c      2014-01-30 18:24:13 UTC 
(rev 32124)
+++ gnunet/src/core/gnunet-service-core_sessions.c      2014-01-30 19:22:23 UTC 
(rev 32125)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009-2013 Christian Grothoff (and other contributing authors)
+     (C) 2009-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -74,6 +74,11 @@
    */
   size_t size;
 
+  /**
+   * How important is this message.
+   */
+  enum GNUNET_CORE_Priority priority;
+
 };
 
 
@@ -145,7 +150,7 @@
 
 
 /**
- * Map of peer identities to 'struct Session'.
+ * Map of peer identities to `struct Session`.
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
 
@@ -180,7 +185,8 @@
   session = find_session (pid);
   if (NULL == session)
     return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Destroying session for peer `%4s'\n",
               GNUNET_i2s (&session->peer));
   if (GNUNET_SCHEDULER_NO_TASK != session->cork_task)
   {
@@ -218,11 +224,12 @@
  * Transmit our current typemap message to the other peer.
  * (Done periodically in case an update got lost).
  *
- * @param cls the 'struct Session*'
+ * @param cls the `struct Session *`
  * @param tc unused
  */
 static void
-transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc)
+transmit_typemap_task (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Session *session = cls;
   struct GNUNET_MessageHeader *hdr;
@@ -263,7 +270,8 @@
 {
   struct Session *session;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating session for peer `%4s'\n",
               GNUNET_i2s (peer));
   session = GNUNET_new (struct Session);
   session->tmap = GSC_TYPEMAP_create ();
@@ -286,13 +294,14 @@
 /**
  * Notify the given client about the session (client is new).
  *
- * @param cls the 'struct GSC_Client'
+ * @param cls the `struct GSC_Client`
  * @param key peer identity
- * @param value the 'struct Session'
- * @return GNUNET_OK (continue to iterate)
+ * @param value the `struct Session`
+ * @return #GNUNET_OK (continue to iterate)
  */
 static int
-notify_client_about_session (void *cls, const struct GNUNET_PeerIdentity * key,
+notify_client_about_session (void *cls,
+                             const struct GNUNET_PeerIdentity *key,
                              void *value)
 {
   struct GSC_Client *client = cls;
@@ -334,8 +343,8 @@
  *
  * @param car request to queue; this handle is then shared between
  *         the caller (CLIENTS subsystem) and SESSIONS and must not
- *         be released by either until either 'GNUNET_SESSIONS_dequeue',
- *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
+ *         be released by either until either #GSC_SESSIONS_dequeue(),
+ *         #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
  *         have been invoked on it
  */
 void
@@ -344,7 +353,7 @@
   struct Session *session;
 
   session = find_session (&car->target);
-  if (session == NULL)
+  if (NULL == session)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Dropped client request for transmission (am disconnected)\n");
@@ -423,27 +432,40 @@
 
 
 /**
- * Solicit messages for transmission.
+ * Solicit messages for transmission, starting with those of the highest
+ * priority.
  *
  * @param session session to solict messages for
+ * @param msize how many bytes do we have already
  */
 static void
-solicit_messages (struct Session *session)
+solicit_messages (struct Session *session,
+                  size_t msize)
 {
   struct GSC_ClientActiveRequest *car;
   struct GSC_ClientActiveRequest *nxt;
   size_t so_size;
+  enum GNUNET_CORE_Priority pmax;
 
   discard_expired_requests (session);
-  so_size = 0;
+  so_size = msize;
+  pmax = GNUNET_CORE_PRIO_BACKGROUND;
+  for (car = session->active_client_request_head; NULL != car; car = car->next)
+  {
+    if (GNUNET_YES == car->was_solicited)
+      continue;
+    pmax = GNUNET_MAX (pmax, car->priority);
+  }
   nxt = session->active_client_request_head;
   while (NULL != (car = nxt))
   {
     nxt = car->next;
+    if (car->priority < pmax)
+      continue;
     if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
       break;
     so_size += car->msize;
-    if (car->was_solicited == GNUNET_YES)
+    if (GNUNET_YES == car->was_solicited)
       continue;
     car->was_solicited = GNUNET_YES;
     GSC_CLIENTS_solicit_request (car);
@@ -455,11 +477,12 @@
  * Some messages were delayed (corked), but the timeout has now expired.
  * Send them now.
  *
- * @param cls 'struct Session' with the messages to transmit now
+ * @param cls `struct Session` with the messages to transmit now
  * @param tc scheduler context (unused)
  */
 static void
-pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+pop_cork_task (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Session *session = cls;
 
@@ -470,7 +493,8 @@
 
 /**
  * Try to perform a transmission on the given session. Will solicit
- * additional messages if the 'sme' queue is not full enough.
+ * additional messages if the 'sme' queue is not full enough or has
+ * only low-priority messages.
  *
  * @param session session to transmit messages from
  */
@@ -481,33 +505,58 @@
   size_t msize;
   struct GNUNET_TIME_Absolute now;
   struct GNUNET_TIME_Absolute min_deadline;
+  enum GNUNET_CORE_Priority maxp;
+  enum GNUNET_CORE_Priority maxpc;
+  struct GSC_ClientActiveRequest *car;
 
   if (GNUNET_YES != session->ready_to_transmit)
     return;
   msize = 0;
   min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
   /* check 'ready' messages */
+  maxp = GNUNET_CORE_PRIO_BACKGROUND;
   pos = session->sme_head;
   while ((NULL != pos) &&
          (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
   {
     GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
     msize += pos->size;
+    maxp = GNUNET_MAX (maxp, pos->priority);
     min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline);
     pos = pos->next;
   }
+  if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL)
+  {
+    maxpc = GNUNET_CORE_PRIO_BACKGROUND;
+    for (car = session->active_client_request_head; NULL != car; car = 
car->next)
+    {
+      if (GNUNET_YES == car->was_solicited)
+        continue;
+      maxpc = GNUNET_MAX (maxpc, car->priority);
+    }
+    if (maxpc > maxp)
+    {
+      /* we have messages waiting for solicitation that have a higher
+         priority than those that we already accepted; solicit the
+         high-priority messages first */
+      solicit_messages (session, 0);
+      return;
+    }
+  }
+
   now = GNUNET_TIME_absolute_get ();
-  if ((msize == 0) ||
+  if ((0 == msize) ||
       ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
        (min_deadline.abs_value_us > now.abs_value_us)))
   {
     /* not enough ready yet, try to solicit more */
-    solicit_messages (session);
+    solicit_messages (session,
+                      msize);
     if (msize > 0)
     {
       /* if there is data to send, just not yet, make sure we do transmit
        * it once the deadline is reached */
-      if (session->cork_task != GNUNET_SCHEDULER_NO_TASK)
+      if (GNUNET_SCHEDULER_NO_TASK != session->cork_task)
         GNUNET_SCHEDULER_cancel (session->cork_task);
       session->cork_task =
           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
@@ -540,7 +589,8 @@
       total_msgs = 1;
       total_bytes = used;
     }
-    GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message",
+    GNUNET_STATISTICS_set (GSC_stats,
+                           "# avg payload per encrypted message",
                            total_bytes / total_msgs, GNUNET_NO);
     /* now actually transmit... */
     session->ready_to_transmit = GNUNET_NO;
@@ -554,11 +604,13 @@
  *
  * @param cls the message
  * @param key neighbour's identity
- * @param value 'struct Neighbour' of the target
- * @return always GNUNET_OK
+ * @param value `struct Neighbour` of the target
+ * @return always #GNUNET_OK
  */
 static int
-do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void 
*value)
+do_send_message (void *cls,
+                 const struct GNUNET_PeerIdentity *key,
+                 void *value)
 {
   const struct GNUNET_MessageHeader *hdr = cls;
   struct Session *session = value;
@@ -569,7 +621,8 @@
   m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
   memcpy (&m[1], hdr, size);
   m->size = size;
-  GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m);
+  m->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+  GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, m);
   try_transmission (session);
   return GNUNET_OK;
 }
@@ -585,7 +638,8 @@
 {
   if (NULL == sessions)
     return;
-  GNUNET_CONTAINER_multipeermap_iterate (sessions, &do_send_message,
+  GNUNET_CONTAINER_multipeermap_iterate (sessions,
+                                         &do_send_message,
                                          (void *) msg);
 }
 
@@ -617,13 +671,17 @@
  *            this handle will now be 'owned' by the SESSIONS subsystem
  * @param msg message to transmit
  * @param cork is corking allowed?
+ * @param priority how important is this message
  */
 void
 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
-                       const struct GNUNET_MessageHeader *msg, int cork)
+                       const struct GNUNET_MessageHeader *msg,
+                       int cork,
+                       enum GNUNET_CORE_Priority priority)
 {
   struct Session *session;
   struct SessionMessageEntry *sme;
+  struct SessionMessageEntry *pos;
   size_t msize;
 
   session = find_session (&car->target);
@@ -633,10 +691,23 @@
   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
   memcpy (&sme[1], msg, msize);
   sme->size = msize;
+  sme->priority = priority;
   if (GNUNET_YES == cork)
     sme->deadline =
         GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
-  GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme);
+  pos = session->sme_head;
+  while ( (NULL != pos) &&
+          (pos->priority > sme->priority) )
+    pos = pos->next;
+  if (NULL == pos)
+    GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
+                                      session->sme_tail,
+                                      sme);
+  else
+    GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
+                                       session->sme_tail,
+                                       pos->prev,
+                                       sme);
   try_transmission (session);
 }
 
@@ -644,14 +715,16 @@
 /**
  * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
  *
- * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
+ * @param cls the `struct GNUNET_SERVER_TransmitContext` to queue replies
  * @param key identity of the connected peer
- * @param value the 'struct Neighbour' for the peer
+ * @param value the `struct Neighbour` for the peer
  * @return GNUNET_OK (continue to iterate)
  */
 #include "core.h"
 static int
-queue_connect_message (void *cls, const struct GNUNET_PeerIdentity * key, void 
*value)
+queue_connect_message (void *cls,
+                       const struct GNUNET_PeerIdentity *key,
+                       void *value)
 {
   struct GNUNET_SERVER_TransmitContext *tc = cls;
   struct Session *session = value;
@@ -768,11 +841,13 @@
  *
  * @param cls NULL
  * @param key identity of the connected peer
- * @param value the 'struct Session' for the peer
- * @return GNUNET_OK (continue to iterate)
+ * @param value the `struct Session` for the peer
+ * @return #GNUNET_OK (continue to iterate)
  */
 static int
-free_session_helper (void *cls, const struct GNUNET_PeerIdentity * key, void 
*value)
+free_session_helper (void *cls,
+                     const struct GNUNET_PeerIdentity *key,
+                     void *value)
 {
   struct Session *session = value;
 

Modified: gnunet/src/core/gnunet-service-core_sessions.h
===================================================================
--- gnunet/src/core/gnunet-service-core_sessions.h      2014-01-30 18:24:13 UTC 
(rev 32124)
+++ gnunet/src/core/gnunet-service-core_sessions.h      2014-01-30 19:22:23 UTC 
(rev 32125)
@@ -92,10 +92,13 @@
  *            ownership does not change (dequeue will be called soon).
  * @param msg message to transmit
  * @param cork is corking allowed?
+ * @param priority how important is this message
  */
 void
 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
-                       const struct GNUNET_MessageHeader *msg, int cork);
+                       const struct GNUNET_MessageHeader *msg,
+                       int cork,
+                       enum GNUNET_CORE_Priority priority);
 
 
 /**

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2014-01-30 18:24:13 UTC (rev 
32124)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2014-01-30 19:22:23 UTC (rev 
32125)
@@ -268,7 +268,7 @@
 
   /**
    * Set to 1 if we're currently in the process of calling
-   * #GNUNET_CORE_notify_transmit_ready() (so while cth is
+   * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
    * NULL, we should not call notify_transmit_ready for this
    * handle right now).
    */

Modified: gnunet/src/include/gnunet_core_service.h
===================================================================
--- gnunet/src/include/gnunet_core_service.h    2014-01-30 18:24:13 UTC (rev 
32124)
+++ gnunet/src/include/gnunet_core_service.h    2014-01-30 19:22:23 UTC (rev 
32125)
@@ -53,25 +53,26 @@
 {
 
   /**
-   * Highest priority, control traffic (i.e. NSE, Core/Mesh KX).
+   * Lowest priority, i.e. background traffic (i.e. fs)
    */
-  GNUNET_CORE_PRIO_CRITICAL_CONTROL = 0,
+  GNUNET_CORE_PRIO_BACKGROUND = 0,
 
   /**
-   * Urgent traffic (local peer, i.e. conversation).
+   * Normal traffic (i.e. mesh/dv relay, DHT)
    */
-  GNUNET_CORE_PRIO_URGENT = 1,
+  GNUNET_CORE_PRIO_BEST_EFFORT = 1,
 
   /**
-   * Normal traffic (i.e. mesh/dv relay, DHT)
+   * Urgent traffic (local peer, i.e. conversation).
    */
-  GNUNET_CORE_PRIO_BEST_EFFORT = 2,
+  GNUNET_CORE_PRIO_URGENT = 2,
 
   /**
-   * Background traffic (i.e. fs)
+   * Highest priority, control traffic (i.e. NSE, Core/Mesh KX).
    */
-  GNUNET_CORE_PRIO_BACKGROUND = 3
+  GNUNET_CORE_PRIO_CRITICAL_CONTROL = 3
 
+
 };
 
 
@@ -250,7 +251,7 @@
  * @param handle connection to core service
  * @param cork is corking allowed for this transmission?
  * @param priority how important is the message?
- * @param maxdelay how long can the message wait?
+ * @param maxdelay how long can the message wait? Only effective if @a cork is 
#GNUNET_YES
  * @param target who should receive the message, never NULL (can be this 
peer's identity for loopback)
  * @param notify_size how many bytes of buffer space does notify want?
  * @param notify function to call when buffer space is available;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]