gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37350 - in gnunet/src: include nse testbed-logger


From: gnunet
Subject: [GNUnet-SVN] r37350 - in gnunet/src: include nse testbed-logger
Date: Fri, 24 Jun 2016 16:34:07 +0200

Author: grothoff
Date: 2016-06-24 16:34:07 +0200 (Fri, 24 Jun 2016)
New Revision: 37350

Modified:
   gnunet/src/include/gnunet_testbed_logger_service.h
   gnunet/src/nse/gnunet-service-nse.c
   gnunet/src/testbed-logger/test_testbed_logger_api.c
   gnunet/src/testbed-logger/testbed_logger_api.c
Log:
adapting testbed-logger to MQ API

Modified: gnunet/src/include/gnunet_testbed_logger_service.h
===================================================================
--- gnunet/src/include/gnunet_testbed_logger_service.h  2016-06-24 14:05:58 UTC 
(rev 37349)
+++ gnunet/src/include/gnunet_testbed_logger_service.h  2016-06-24 14:34:07 UTC 
(rev 37350)
@@ -40,7 +40,7 @@
 #endif
 #endif
 
-#include "gnunet_configuration_lib.h"
+#include "gnunet_util_lib.h"
 
 /**
  * Opaque handle for the logging service
@@ -70,7 +70,7 @@
 
 /**
  * Functions of this type are called to notify a successful transmission of the
- * message to the logger service
+ * message to the logger service.
  *
  * @param cls the closure given to GNUNET_TESTBED_LOGGER_send()
  * @param size the amount of data sent
@@ -87,7 +87,7 @@
  *
  * @param h the logger handle
  * @param data the data to send;
- * @param size how many bytes of data to send
+ * @param size how many bytes of @a data to send
  */
 void
 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
@@ -99,13 +99,11 @@
  * Flush the buffered data to the logger service
  *
  * @param h the logger handle
- * @param timeout how long to wait before calling the flust completion callback
  * @param cb the callback to call after the data is flushed
- * @param cb_cls the closure for the above callback
+ * @param cb_cls the closure for @a cb
  */
 void
 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             struct GNUNET_TIME_Relative timeout,
                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
                              void *cb_cls);
 

Modified: gnunet/src/nse/gnunet-service-nse.c
===================================================================
--- gnunet/src/nse/gnunet-service-nse.c 2016-06-24 14:05:58 UTC (rev 37349)
+++ gnunet/src/nse/gnunet-service-nse.c 2016-06-24 14:34:07 UTC (rev 37350)
@@ -1386,9 +1386,9 @@
   }
   if (NULL != lh)
   {
-    struct GNUNET_TIME_Relative timeout;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
-    GNUNET_TESTBED_LOGGER_flush (lh, timeout, &flush_comp_cb, NULL);
+    GNUNET_TESTBED_LOGGER_flush (lh,
+                                 &flush_comp_cb,
+                                 NULL);
   }
   if (NULL != histogram)
   {

Modified: gnunet/src/testbed-logger/test_testbed_logger_api.c
===================================================================
--- gnunet/src/testbed-logger/test_testbed_logger_api.c 2016-06-24 14:05:58 UTC 
(rev 37349)
+++ gnunet/src/testbed-logger/test_testbed_logger_api.c 2016-06-24 14:34:07 UTC 
(rev 37350)
@@ -209,7 +209,6 @@
   if (0 == i++)
     return;
   GNUNET_TESTBED_LOGGER_flush (h,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
                                &flush_comp,
                                &write_task);
 }

Modified: gnunet/src/testbed-logger/testbed_logger_api.c
===================================================================
--- gnunet/src/testbed-logger/testbed_logger_api.c      2016-06-24 14:05:58 UTC 
(rev 37349)
+++ gnunet/src/testbed-logger/testbed_logger_api.c      2016-06-24 14:34:07 UTC 
(rev 37350)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2008--2013 GNUnet e.V.
+      Copyright (C) 2008--2013, 2016 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -22,6 +22,7 @@
  * @file testbed-logger/testbed_logger_api.c
  * @brief Client-side routines for communicating with the tesbted logger 
service
  * @author Sree Harsha Totakura <address@hidden>
+ * @author Christian Grothoff
  */
 
 #include "platform.h"
@@ -34,59 +35,13 @@
 #define LOG(kind, ...)                          \
   GNUNET_log_from (kind, "testbed-logger-api", __VA_ARGS__)
 
-/**
- * Debug logging
- */
-#define LOG_DEBUG(...)                          \
-  LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
 
-#ifdef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#undef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#endif
-
 /**
- * Threshold after which exponential backoff should not increase (15 s).
- */
-#define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD 
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
-
-/**
  * The size of the buffer we fill before sending out the message
  */
-#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE
+#define BUFFER_SIZE (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct 
GNUNET_MessageHeader))
 
 /**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
-  /**
-   * next pointer for DLL
-   */
-  struct MessageQueue *next;
-
-  /**
-   * prev pointer for DLL
-   */
-  struct MessageQueue *prev;
-
-  /**
-   * The message to be sent
-   */
-  struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Completion callback
-   */
-  GNUNET_TESTBED_LOGGER_FlushCompletion cb;
-
-  /**
-   * callback closure
-   */
-  void *cb_cls;
-};
-
-
-/**
  * Connection handle for the logger service
  */
 struct GNUNET_TESTBED_LOGGER_Handle
@@ -94,30 +49,15 @@
   /**
    * Client connection
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
-   * The transport handle
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
-   * DLL head for the message queue
-   */
-  struct MessageQueue *mq_head;
-
-  /**
-   * DLL tail for the message queue
-   */
-  struct MessageQueue *mq_tail;
-
-  /**
    * Flush completion callback
    */
   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
 
   /**
-   * Closure for the above callback
+   * Closure for @e cb
    */
   void *cb_cls;
 
@@ -124,12 +64,12 @@
   /**
    * Local buffer for data to be transmitted
    */
-  void *buf;
+  char buf[BUFFER_SIZE];
 
   /**
-   * The size of the local buffer
+   * How many bytes in @a buf are in use?
    */
-  size_t bs;
+  size_t buse;
 
   /**
    * Number of bytes wrote since last flush
@@ -144,29 +84,16 @@
   /**
    * Task to call the flush completion callback
    */
-  struct GNUNET_SCHEDULER_Task * flush_completion_task;
+  struct GNUNET_SCHEDULER_Task *flush_completion_task;
 
   /**
-   * Task to be executed when flushing takes too long
+   * Number of entries in the MQ.
    */
-  struct GNUNET_SCHEDULER_Task * timeout_flush_task;
+  unsigned int mq_len;
 };
 
 
 /**
- * Cancels the flush timeout task
- *
- * @param h handle to the logger
- */
-static void
-cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
-{
-  GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
-  h->timeout_flush_task = NULL;
-}
-
-
-/**
  * Task to call the flush completion notification
  *
  * @param cls the logger handle
@@ -186,8 +113,6 @@
   h->cb = NULL;
   cb_cls = h->cb_cls;
   h->cb_cls = NULL;
-  if (NULL != h->timeout_flush_task)
-    cancel_timeout_flush (h);
   if (NULL != cb)
     cb (cb_cls, bw);
 }
@@ -203,97 +128,39 @@
 {
   if (NULL != h->flush_completion_task)
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-  h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, 
h);
+  h->flush_completion_task
+    = GNUNET_SCHEDULER_add_now (&call_flush_completion,
+                                h);
 }
 
 
 /**
- * Function called to notify a client about the connection begin ready to queue
- * more data.  "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
+ * Send the buffered data to the service
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param h the logger handle
  */
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
-  struct MessageQueue *mq;
+static void
+dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h);
 
-  h->th = NULL;
-  mq = h->mq_head;
-  GNUNET_assert (NULL != mq);
-  if ((0 == size) && (NULL == buf))     /* Timeout */
-  {
-    LOG_DEBUG ("Message sending timed out -- retrying\n");
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                             ntohs (mq->msg->size),
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify, h);
-    return 0;
-  }
-  h->retry_backoff = GNUNET_TIME_UNIT_ZERO;
-  GNUNET_assert (ntohs (mq->msg->size) <= size);
-  size = ntohs (mq->msg->size);
-  memcpy (buf, mq->msg, size);
-  LOG_DEBUG ("Message of type: %u and size: %u sent\n",
-             ntohs (mq->msg->type), size);
-  GNUNET_free (mq->msg);
-  GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
-  GNUNET_free (mq);
-  h->bwrote += (size - sizeof (struct GNUNET_MessageHeader));
-  mq = h->mq_head;
-  if (NULL != mq)
-  {
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                             ntohs (mq->msg->size),
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify, h);
-    return size;
-  }
-  if (NULL != h->cb)
-    trigger_flush_notification (h);       /* Call the flush completion 
callback */
-  return size;
-}
 
-
 /**
- * Queues a message in send queue of the logger handle
+ * MQ successfully sent a message.
  *
- * @param h the logger handle
- * @param msg the message to queue
+ * @param cls our handle
  */
 static void
-queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h,
-               struct GNUNET_MessageHeader *msg)
+notify_sent (void *cls)
 {
-  struct MessageQueue *mq;
-  uint16_t type;
-  uint16_t size;
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
 
-  type = ntohs (msg->type);
-  size = ntohs (msg->size);
-  mq = GNUNET_new (struct MessageQueue);
-  mq->msg = msg;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing message of type %u, size %u for sending\n", type,
-       ntohs (msg->size));
-  GNUNET_CONTAINER_DLL_insert_tail (h->mq_head, h->mq_tail, mq);
-  if (NULL == h->th)
+  h->mq_len--;
+  if ( (0 == h->mq_len) &&
+       (NULL != h->cb) )
   {
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client, size,
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify,
-                                             h);
+    if (0 == h->buse)
+      trigger_flush_notification (h);
+    else
+      dispatch_buffer (h);
   }
 }
 
@@ -307,20 +174,44 @@
 dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
   struct GNUNET_MessageHeader *msg;
-  size_t msize;
+  struct GNUNET_MQ_Envelope *env;
 
-  msize = sizeof (struct GNUNET_MessageHeader) + h->bs;
-  msg = GNUNET_realloc (h->buf, msize);
-  h->buf = NULL;
-  memmove (&msg[1], msg, h->bs);
-  h->bs = 0;
-  msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
-  msg->size = htons (msize);
-  queue_message (h, msg);
+  env = GNUNET_MQ_msg_extra (msg,
+                             h->buse,
+                             GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
+  memcpy (&msg[1],
+          h->buf,
+          h->buse);
+  h->bwrote += h->buse;
+  h->buse = 0;
+  h->mq_len++;
+  GNUNET_MQ_notify_sent (env,
+                         &notify_sent,
+                         h);
+  GNUNET_MQ_send (h->mq,
+                  env);
 }
 
 
 /**
+ * We got disconnected from the logger.  Stop logging.
+  *
+ * @param cls the `struct GNUNET_TESTBED_LOGGER_Handle`
+ * @param error error code
+  */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+
+  GNUNET_break (0);
+  GNUNET_MQ_destroy (h->mq);
+  h->mq = NULL;
+}
+
+
+/**
  * Connect to the testbed logger service
  *
  * @param cfg configuration to use
@@ -331,13 +222,18 @@
 GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_TESTBED_LOGGER_Handle *h;
-  struct GNUNET_CLIENT_Connection *client;
 
-  client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
-  if (NULL == client)
+  h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
+  h->mq = GNUNET_CLIENT_connecT (cfg,
+                                 "testbed-logger",
+                                 NULL,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+  {
+    GNUNET_free (h);
     return NULL;
-  h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
-  h->client = client;
+  }
   return h;
 }
 
@@ -350,23 +246,20 @@
 void
 GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
-  struct MessageQueue *mq;
-  unsigned int lost;
-
   if (NULL != h->flush_completion_task)
+  {
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-  lost = 0;
-  while (NULL != (mq = h->mq_head))
+    h->flush_completion_task = NULL;
+  }
+  if (0 != h->mq_len)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Disconnect lost %u logger message[s]\n",
+         h->mq_len);
+  if (NULL != h->mq)
   {
-    GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
-    GNUNET_free (mq->msg);
-    GNUNET_free (mq);
-    lost++;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  if (0 != lost)
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger 
message[s]\n",
-         lost);
-  GNUNET_CLIENT_disconnect (h->client);
   GNUNET_free (h);
 }
 
@@ -378,86 +271,48 @@
  *
  * @param h the logger handle
  * @param data the data to send;
- * @param size how many bytes of data to send
+ * @param size how many bytes of @a data to send
  */
 void
 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             const void *data, size_t size)
+                             const void *data,
+                             size_t size)
 {
-  size_t fit_size;
-
-  GNUNET_assert (0 != size);
-  GNUNET_assert (NULL != data);
-  GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader)));
-  fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
-  if ( BUFFER_SIZE < fit_size )
-    dispatch_buffer (h);
-  if (NULL == h->buf)
+  if (NULL == h->mq)
+    return;
+  while (0 != size)
   {
-    h->buf = GNUNET_malloc (size);
-    h->bs = size;
-    memcpy (h->buf, data, size);
-    goto dispatch_ready;
+    size_t fit_size = GNUNET_MIN (size,
+                                  BUFFER_SIZE - h->buse);
+    memcpy (&h->buf[h->buse],
+            data,
+            fit_size);
+    h->buse += fit_size;
+    data += fit_size;
+    size -= fit_size;
+    if (0 != size)
+      dispatch_buffer (h);
   }
-  h->buf = GNUNET_realloc (h->buf, h->bs + size);
-  memcpy (h->buf + h->bs, data, size);
-  h->bs += size;
-
- dispatch_ready:
-  if (BUFFER_SIZE == fit_size)
-    dispatch_buffer (h);
 }
 
 
 /**
- * Task to be executed when flushing our local buffer takes longer than timeout
- * given to GNUNET_TESTBED_LOGGER_flush().  The flush completion callback will
- * be called with 0 as the amount of data sent.
- *
- * @param cls the logger handle
- */
-static void
-timeout_flush (void *cls)
-{
-  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
-  GNUNET_TESTBED_LOGGER_FlushCompletion cb;
-  void *cb_cls;
-
-  h->timeout_flush_task = NULL;
-  cb = h->cb;
-  h->cb = NULL;
-  cb_cls = h->cb_cls;
-  h->cb_cls = NULL;
-  if (NULL != h->flush_completion_task)
-  {
-    GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-    h->flush_completion_task = NULL;
-  }
-  if (NULL != cb)
-    cb (cb_cls, 0);
-}
-
-
-/**
  * Flush the buffered data to the logger service
  *
  * @param h the logger handle
- * @param timeout how long to wait before calling the flust completion callback
  * @param cb the callback to call after the data is flushed
  * @param cb_cls the closure for the above callback
  */
 void
 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             struct GNUNET_TIME_Relative timeout,
                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
                              void *cb_cls)
 {
+  GNUNET_assert (NULL == h->cb);
   h->cb = cb;
   h->cb_cls = cb_cls;
-  GNUNET_assert (NULL == h->timeout_flush_task);
-  h->timeout_flush_task =
-      GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
-  if (NULL == h->buf)
+  if ( (NULL == h->mq) ||
+       (NULL == h->buf) )
   {
     trigger_flush_notification (h);
     return;
@@ -481,8 +336,6 @@
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
     h->flush_completion_task = NULL;
   }
-  if (NULL != h->timeout_flush_task)
-    cancel_timeout_flush (h);
   h->cb = NULL;
   h->cb_cls = NULL;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]