gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: implement long-polling in fakeba


From: gnunet
Subject: [taler-exchange] branch master updated: implement long-polling in fakebank
Date: Thu, 12 Aug 2021 19:07:31 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 777dd74b implement long-polling in fakebank
777dd74b is described below

commit 777dd74b16064c91068e617a6fd39b2800fc0588
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Thu Aug 12 19:07:28 2021 +0200

    implement long-polling in fakebank
---
 src/bank-lib/fakebank.c | 716 ++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 636 insertions(+), 80 deletions(-)

diff --git a/src/bank-lib/fakebank.c b/src/bank-lib/fakebank.c
index ecb5934e..580012b0 100644
--- a/src/bank-lib/fakebank.c
+++ b/src/bank-lib/fakebank.c
@@ -21,11 +21,12 @@
  * @brief library that fakes being a Taler bank for testcases
  * @author Christian Grothoff <christian@grothoff.org>
  */
-// TODO: support long polling
 // TODO: support adding WAD transfers
 
 #include "platform.h"
 #include <pthread.h>
+#include <poll.h>
+#include <sys/eventfd.h>
 #include "taler_fakebank_lib.h"
 #include "taler_bank_service.h"
 #include "taler_mhd_lib.h"
@@ -43,6 +44,73 @@
  */
 #define MAX_URL_LEN 64
 
+/**
+ * Per account information.
+ */
+struct Account;
+
+
+/**
+ * Types of long polling activities.
+ */
+enum LongPollType
+{
+  /**
+   * Transfer TO the exchange.
+   */
+  LP_CREDIT,
+
+  /**
+   * Transfer FROM the exchange.
+   */
+  LP_DEBIT
+
+};
+
+/**
+ * Client waiting for activity on this account.
+ */
+struct LongPoller
+{
+
+  /**
+   * Kept in a DLL.
+   */
+  struct LongPoller *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct LongPoller *prev;
+
+  /**
+   * Account this long poller is waiting on.
+   */
+  struct Account *account;
+
+  /**
+   * Entry in the heap for this long poller.
+   */
+  struct GNUNET_CONTAINER_HeapNode *hn;
+
+  /**
+   * Client that is waiting for transactions.
+   */
+  struct MHD_Connection *conn;
+
+  /**
+   * When will this long poller time out?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * What does the @e connection wait for?
+   */
+  enum LongPollType type;
+
+};
+
+
 /**
  * Details about a transcation we (as the simulated bank) received.
  */
@@ -74,6 +142,16 @@ struct Account
    */
   struct Transaction *out_tail;
 
+  /**
+   * Kept in a DLL.
+   */
+  struct LongPoller *lp_head;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct LongPoller *lp_tail;
+
   /**
    * Account name (string, not payto!)
    */
@@ -256,6 +334,23 @@ struct TALER_FAKEBANK_Handle
    */
   struct GNUNET_SCHEDULER_Task *mhd_task;
 
+  /**
+   * Task for expiring long-polling connections,
+   * unless we are using a thread pool (then NULL).
+   */
+  struct GNUNET_SCHEDULER_Task *lp_task;
+
+  /**
+   * Task for expiring long-polling connections, unless we are using the
+   * GNUnet scheduler (then NULL).
+   */
+  pthread_t lp_thread;
+
+  /**
+   * MIN-heap of long pollers, sorted by timeout.
+   */
+  struct GNUNET_CONTAINER_Heap *lp_heap;
+
   /**
    * Hashmap of reserve public keys to
    * `struct Transaction` with that reserve public
@@ -319,6 +414,17 @@ struct TALER_FAKEBANK_Handle
    */
   uint16_t port;
 
+  /**
+   * Event FD to signal @a lp_thread a change in
+   * @a lp_heap.
+   */
+  int lp_event;
+
+  /**
+   * Set to true once we are shutting down.
+   */
+  bool in_shutdown;
+
 #if EPOLL_SUPPORT
   /**
    * Boxed @e mhd_fd.
@@ -333,6 +439,145 @@ struct TALER_FAKEBANK_Handle
 };
 
 
+/**
+ * Special address "con_cls" can point to to indicate that the handler has
+ * been called more than once already (was previously suspended).
+ */
+static int special_ptr;
+
+
+/**
+ * Task run whenever HTTP server operations are pending.
+ *
+ * @param cls the `struct TALER_FAKEBANK_Handle`
+ */
+static void
+run_mhd (void *cls);
+
+
+/**
+ * Trigger the @a lp. Frees associated resources,
+ * except the entry of @a lp in the timeout heap.
+ * Must be called while the ``big lock`` is held.
+ *
+ * @param[in] lp long poller to trigger
+ * @param[in,out] h fakebank handle
+ */
+static void
+lp_trigger (struct LongPoller *lp,
+            struct TALER_FAKEBANK_Handle *h)
+{
+  struct Account *acc = lp->account;
+
+  GNUNET_CONTAINER_DLL_remove (acc->lp_head,
+                               acc->lp_tail,
+                               lp);
+  MHD_resume_connection (lp->conn);
+  GNUNET_free (lp);
+  if (NULL != h->mhd_task)
+    GNUNET_SCHEDULER_cancel (h->mhd_task);
+  h->mhd_task =
+    GNUNET_SCHEDULER_add_now (&run_mhd,
+                              h);
+}
+
+
+/**
+ * Thread that is run to wake up connections that have hit their timeout. Runs
+ * until in_shutdown is set to true. Must be send signals via lp_event on
+ * shutdown and/or whenever the heap changes to an earlier timeout.
+ *
+ * @param cls a `struct TALER_FAKEBANK_Handle *`
+ * @return NULL
+ */
+static void *
+lp_expiration_thread (void *cls)
+{
+  struct TALER_FAKEBANK_Handle *h = cls;
+
+  GNUNET_assert (0 ==
+                 pthread_mutex_lock (&h->big_lock));
+  while (! h->in_shutdown)
+  {
+    struct LongPoller *lp;
+    int timeout_ms;
+
+    lp = GNUNET_CONTAINER_heap_peek (h->lp_heap);
+    while ( (NULL != lp) &&
+            GNUNET_TIME_absolute_is_past (lp->timeout))
+    {
+      GNUNET_assert (lp ==
+                     GNUNET_CONTAINER_heap_remove_root (h->lp_heap));
+      GNUNET_assert (0 ==
+                     pthread_mutex_lock (&h->big_lock));
+      lp_trigger (lp,
+                  h);
+      GNUNET_assert (0 ==
+                     pthread_mutex_unlock (&h->big_lock));
+      lp = GNUNET_CONTAINER_heap_peek (h->lp_heap);
+    }
+    if (NULL != lp)
+    {
+      struct GNUNET_TIME_Relative rem;
+      unsigned long long left_ms;
+
+      rem = GNUNET_TIME_absolute_get_remaining (lp->timeout);
+      left_ms = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
+      if (left_ms > INT_MAX)
+        timeout_ms = INT_MAX;
+      else
+        timeout_ms = (int) left_ms;
+    }
+    else
+    {
+      timeout_ms = -1; /* infinity */
+    }
+    GNUNET_assert (0 ==
+                   pthread_mutex_unlock (&h->big_lock));
+    {
+      struct pollfd p = {
+        .fd = h->lp_event,
+        .events = POLLIN
+      };
+      int ret;
+
+      ret = poll (&p,
+                  1,
+                  timeout_ms);
+      if (-1 == ret)
+      {
+        GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                             "poll");
+      }
+      else if (1 == ret)
+      {
+        /* clear event */
+        uint64_t ev;
+        ssize_t iret;
+
+        iret = read (h->lp_event,
+                     &ev,
+                     sizeof (ev));
+        if (-1 == iret)
+        {
+          GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                               "read");
+        }
+        else
+        {
+          GNUNET_break (sizeof (uint64_t) == iret);
+        }
+      }
+    }
+    GNUNET_assert (0 ==
+                   pthread_mutex_lock (&h->big_lock));
+  }
+  GNUNET_assert (0 ==
+                 pthread_mutex_unlock (&h->big_lock));
+  return NULL;
+}
+
+
 /**
  * Lookup account with @a name, and if it does not exist, create it.
  *
@@ -626,6 +871,36 @@ post_transaction (struct TALER_FAKEBANK_Handle *h,
                                   ca->in_tail,
                                   old);
   }
+  {
+    struct LongPoller *nxt;
+
+    for (struct LongPoller *lp = debit_acc->lp_head;
+         NULL != lp;
+         lp = nxt)
+    {
+      nxt = lp->next;
+      if (LP_DEBIT == lp->type)
+      {
+        GNUNET_assert (lp ==
+                       GNUNET_CONTAINER_heap_remove_node (lp->hn));
+        lp_trigger (lp,
+                    h);
+      }
+    }
+    for (struct LongPoller *lp = credit_acc->lp_head;
+         NULL != lp;
+         lp = nxt)
+    {
+      nxt = lp->next;
+      if (LP_CREDIT == lp->type)
+      {
+        GNUNET_assert (lp ==
+                       GNUNET_CONTAINER_heap_remove_node (lp->hn));
+        lp_trigger (lp,
+                    h);
+      }
+    }
+  }
   GNUNET_assert (0 ==
                  pthread_mutex_unlock (&h->big_lock));
   if ( (NULL != old) &&
@@ -884,6 +1159,7 @@ free_account (void *cls,
 {
   struct Account *account = val;
 
+  GNUNET_assert (NULL == account->lp_head);
   GNUNET_free (account->account_name);
   GNUNET_free (account);
   return GNUNET_OK;
@@ -898,6 +1174,11 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
     GNUNET_SCHEDULER_cancel (h->mhd_task);
     h->mhd_task = NULL;
   }
+  if (NULL != h->lp_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->lp_task);
+    h->lp_task = NULL;
+  }
 #if EPOLL_SUPPORT
   if (NULL != h->mhd_rfd)
   {
@@ -910,6 +1191,39 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
     MHD_stop_daemon (h->mhd_bank);
     h->mhd_bank = NULL;
   }
+  if (-1 != h->lp_event)
+  {
+    uint64_t val = 1;
+    void *ret;
+    struct LongPoller *lp;
+
+    GNUNET_assert (0 ==
+                   pthread_mutex_lock (&h->big_lock));
+    h->in_shutdown = true;
+    while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap)))
+      lp_trigger (lp,
+                  h);
+    GNUNET_break (sizeof (val) ==
+                  write (h->lp_event,
+                         &val,
+                         sizeof (val)));
+    GNUNET_assert (0 ==
+                   pthread_mutex_unlock (&h->big_lock));
+    GNUNET_break (0 ==
+                  pthread_join (h->lp_thread,
+                                &ret));
+    GNUNET_break (NULL == ret);
+    GNUNET_break (0 == close (h->lp_event));
+    h->lp_event = -1;
+  }
+  else
+  {
+    struct LongPoller *lp;
+
+    while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap)))
+      lp_trigger (lp,
+                  h);
+  }
   if (NULL != h->accounts)
   {
     GNUNET_CONTAINER_multihashmap_iterate (h->accounts,
@@ -919,6 +1233,7 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
   }
   GNUNET_CONTAINER_multihashmap_destroy (h->uuid_map);
   GNUNET_CONTAINER_multipeermap_destroy (h->rpubs);
+  GNUNET_CONTAINER_heap_destroy (h->lp_heap);
   GNUNET_assert (0 ==
                  pthread_mutex_destroy (&h->big_lock));
   GNUNET_assert (0 ==
@@ -960,6 +1275,10 @@ handle_mhd_completion_callback (void *cls,
   (void) cls;
   (void) connection;
   (void) toe;
+  if (NULL == *con_cls)
+    return;
+  if (&special_ptr == *con_cls)
+    return;
   GNUNET_JSON_post_parser_cleanup (*con_cls);
   *con_cls = NULL;
 }
@@ -988,7 +1307,6 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h,
   json_t *json;
   uint64_t row_id;
   struct GNUNET_TIME_Absolute timestamp;
-  enum GNUNET_GenericReturnValue ret;
 
   pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
                                 connection,
@@ -1017,6 +1335,7 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle 
*h,
     struct TALER_Amount amount;
     struct TALER_ReservePublicKeyP reserve_pub;
     char *debit;
+    enum GNUNET_GenericReturnValue ret;
     struct GNUNET_JSON_Specification spec[] = {
       GNUNET_JSON_spec_fixed_auto ("reserve_pub",
                                    &reserve_pub),
@@ -1029,14 +1348,13 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle 
*h,
     };
 
     if (GNUNET_OK !=
-        GNUNET_JSON_parse (json,
-                           spec,
-                           NULL, NULL))
+        (ret = TALER_MHD_parse_json_data (connection,
+                                          json,
+                                          spec)))
     {
-      GNUNET_break (0);
+      GNUNET_break_op (0);
       json_decref (json);
-      /* We're fakebank, no need for nice error handling */
-      return MHD_NO;
+      return (GNUNET_NO == ret) ? MHD_YES : MHD_NO;
     }
     if (0 != strcasecmp (amount.currency,
                          h->currency))
@@ -1141,6 +1459,7 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
     char *credit;
     const char *base_url;
     struct TALER_Amount amount;
+    enum GNUNET_GenericReturnValue ret;
     struct GNUNET_JSON_Specification spec[] = {
       GNUNET_JSON_spec_fixed_auto ("request_uid",
                                    &uuid),
@@ -1157,14 +1476,13 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
     };
 
     if (GNUNET_OK !=
-        GNUNET_JSON_parse (json,
-                           spec,
-                           NULL, NULL))
+        (ret = TALER_MHD_parse_json_data (connection,
+                                          json,
+                                          spec)))
     {
-      GNUNET_break (0);
+      GNUNET_break_op (0);
       json_decref (json);
-      /* We are fakebank, no need for nice error handling */
-      return MHD_NO;
+      return (GNUNET_NO == ret) ? MHD_YES : MHD_NO;
     }
     {
       int ret;
@@ -1223,20 +1541,17 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
  *
  * @param h the fakebank handle
  * @param connection the connection
- * @param con_cls place to store state, not used
  * @return MHD result code
  */
 static MHD_RESULT
 handle_home_page (struct TALER_FAKEBANK_Handle *h,
-                  struct MHD_Connection *connection,
-                  void **con_cls)
+                  struct MHD_Connection *connection)
 {
   MHD_RESULT ret;
   struct MHD_Response *resp;
 #define HELLOMSG "Hello, Fakebank!"
 
   (void) h;
-  (void) con_cls;
   resp = MHD_create_response_from_buffer (
     strlen (HELLOMSG),
     HELLOMSG,
@@ -1292,9 +1607,11 @@ struct HistoryArgs
  * @param h bank handle to work on
  * @param connection MHD connection.
  * @param[out] ha will contain the parsed values.
- * @return #GNUNET_OK only if the parsing succeeds.
+ * @return #GNUNET_OK only if the parsing succeeds,
+ *         #GNUNET_SYSERR if it failed,
+ *         #GNUNET_NO if it failed and an error was returned
  */
-static int
+static enum GNUNET_GenericReturnValue
 parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
                            struct MHD_Connection *connection,
                            struct HistoryArgs *ha)
@@ -1305,6 +1622,7 @@ parse_history_common_args (const struct 
TALER_FAKEBANK_Handle *h,
   unsigned long long lp_timeout;
   unsigned long long sval;
   long long d;
+  char dummy;
 
   start = MHD_lookup_connection_value (connection,
                                        MHD_GET_ARGUMENT_KIND,
@@ -1319,23 +1637,60 @@ parse_history_common_args (const struct 
TALER_FAKEBANK_Handle *h,
   lp_timeout = 0;
   if ( (NULL == delta) ||
        (1 != sscanf (delta,
-                     "%lld",
-                     &d)) ||
-       ( (NULL != long_poll_ms) &&
-         (1 != sscanf (long_poll_ms,
-                       "%llu",
-                       &lp_timeout)) ) ||
-       ( (NULL != start) &&
-         (1 != sscanf (start,
-                       "%llu",
-                       &sval)) ) )
+                     "%lld%c",
+                     &d,
+                     &dummy)) )
   {
     /* Fail if one of the above failed.  */
     /* Invalid request, given that this is fakebank we impolitely
      * just kill the connection instead of returning a nice error.
      */
-    GNUNET_break (0);
-    return GNUNET_NO;
+    GNUNET_break_op (0);
+    return (MHD_YES ==
+            TALER_MHD_reply_with_error (connection,
+                                        MHD_HTTP_BAD_REQUEST,
+                                        TALER_EC_GENERIC_PARAMETER_MALFORMED,
+                                        "delta"))
+      ? GNUNET_NO
+      : GNUNET_SYSERR;
+  }
+  if ( (NULL != long_poll_ms) &&
+       (1 != sscanf (long_poll_ms,
+                     "%llu%c",
+                     &lp_timeout,
+                     &dummy)) )
+  {
+    /* Fail if one of the above failed.  */
+    /* Invalid request, given that this is fakebank we impolitely
+     * just kill the connection instead of returning a nice error.
+     */
+    GNUNET_break_op (0);
+    return (MHD_YES ==
+            TALER_MHD_reply_with_error (connection,
+                                        MHD_HTTP_BAD_REQUEST,
+                                        TALER_EC_GENERIC_PARAMETER_MALFORMED,
+                                        "long_poll_ms"))
+      ? GNUNET_NO
+      : GNUNET_SYSERR;
+  }
+  if ( (NULL != start) &&
+       (1 != sscanf (start,
+                     "%llu%c",
+                     &sval,
+                     &dummy)) )
+  {
+    /* Fail if one of the above failed.  */
+    /* Invalid request, given that this is fakebank we impolitely
+     * just kill the connection instead of returning a nice error.
+     */
+    GNUNET_break_op (0);
+    return (MHD_YES ==
+            TALER_MHD_reply_with_error (connection,
+                                        MHD_HTTP_BAD_REQUEST,
+                                        TALER_EC_GENERIC_PARAMETER_MALFORMED,
+                                        "start"))
+      ? GNUNET_NO
+      : GNUNET_SYSERR;
   }
   if (NULL == start)
     ha->start_idx = (d > 0) ? 0 : h->serial_counter;
@@ -1344,8 +1699,14 @@ parse_history_common_args (const struct 
TALER_FAKEBANK_Handle *h,
   ha->delta = (int64_t) d;
   if (0 == ha->delta)
   {
-    GNUNET_break (0);
-    return GNUNET_NO;
+    GNUNET_break_op (0);
+    return (MHD_YES ==
+            TALER_MHD_reply_with_error (connection,
+                                        MHD_HTTP_BAD_REQUEST,
+                                        TALER_EC_GENERIC_PARAMETER_MALFORMED,
+                                        "delta"))
+      ? GNUNET_NO
+      : GNUNET_SYSERR;
   }
   ha->lp_timeout
     = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
@@ -1358,34 +1719,147 @@ parse_history_common_args (const struct 
TALER_FAKEBANK_Handle *h,
 }
 
 
+/**
+ * Task run when a long poller is about to time out.
+ * Only used in single-threaded mode.
+ *
+ * @param cls a `struct TALER_FAKEBANK_Handle *`
+ */
+static void
+lp_timeout (void *cls)
+{
+  struct TALER_FAKEBANK_Handle *h = cls;
+  struct LongPoller *lp;
+
+  h->lp_task = NULL;
+  while (NULL != (lp = GNUNET_CONTAINER_heap_peek (h->lp_heap)))
+  {
+    if (GNUNET_TIME_absolute_is_future (lp->timeout))
+      break;
+    GNUNET_assert (lp ==
+                   GNUNET_CONTAINER_heap_remove_root (h->lp_heap));
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Timeout reached for long poller %p\n",
+                lp->conn);
+    lp_trigger (lp,
+                h);
+  }
+  if (NULL == lp)
+    return;
+  h->lp_task = GNUNET_SCHEDULER_add_at (lp->timeout,
+                                        &lp_timeout,
+                                        h);
+}
+
+
+/**
+ * Reschedule the timeout task of @a h for time @a t.
+ *
+ * @param h fakebank handle
+ * @param t when will the next connection timeout expire
+ */
+static void
+reschedule_lp_timeout (struct TALER_FAKEBANK_Handle *h,
+                       struct GNUNET_TIME_Absolute t)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Scheduling timeout task for %s\n",
+              GNUNET_STRINGS_absolute_time_to_string (t));
+  if (-1 != h->lp_event)
+  {
+    uint64_t num = 1;
+
+    GNUNET_break (sizeof (num) ==
+                  write (h->lp_event,
+                         &num,
+                         sizeof (num)));
+  }
+  else
+  {
+    if (NULL != h->lp_task)
+      GNUNET_SCHEDULER_cancel (h->lp_task);
+    h->lp_task = GNUNET_SCHEDULER_add_at (t,
+                                          &lp_timeout,
+                                          h);
+  }
+}
+
+
+/**
+ * Start long-polling for @a connection and @a acc
+ * for transfers in @a dir. Must be called with the
+ * "big lock" held.
+ *
+ * @param[in,out] h fakebank handle
+ * @param[in,out] connection to suspend
+ * @param[in,out] acc account affected
+ * @param lp_timeout how long to suspend
+ * @param dir direction of transfers to watch for
+ */
+static void
+start_lp (struct TALER_FAKEBANK_Handle *h,
+          struct MHD_Connection *connection,
+          struct Account *acc,
+          struct GNUNET_TIME_Relative lp_timeout,
+          enum LongPollType dir)
+{
+  struct LongPoller *lp;
+  bool toc;
+
+  lp = GNUNET_new (struct LongPoller);
+  lp->account = acc;
+  lp->conn = connection;
+  lp->timeout = GNUNET_TIME_relative_to_absolute (lp_timeout);
+  lp->type = dir;
+  lp->hn = GNUNET_CONTAINER_heap_insert (h->lp_heap,
+                                         lp,
+                                         lp->timeout.abs_value_us);
+  toc = (lp ==
+         GNUNET_CONTAINER_heap_peek (h->lp_heap));
+  GNUNET_CONTAINER_DLL_insert (acc->lp_head,
+                               acc->lp_tail,
+                               lp);
+  MHD_suspend_connection (connection);
+  if (toc)
+    reschedule_lp_timeout (h,
+                           lp->timeout);
+
+}
+
+
 /**
  * Handle incoming HTTP request for /history/outgoing
  *
  * @param h the fakebank handle
  * @param connection the connection
  * @param account which account the request is about
- * @return MHD result code
+ * @param con_cls closure for request (NULL or &special_ptr)
  */
 static MHD_RESULT
 handle_debit_history (struct TALER_FAKEBANK_Handle *h,
                       struct MHD_Connection *connection,
-                      const char *account)
+                      const char *account,
+                      void **con_cls)
 {
   struct HistoryArgs ha;
   struct Account *acc;
   struct Transaction *pos;
   json_t *history;
   char *debit_payto;
+  enum GNUNET_GenericReturnValue ret;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Handling /history/outgoing connection %p\n",
+              connection);
   if (GNUNET_OK !=
-      parse_history_common_args (h,
-                                 connection,
-                                 &ha))
+      (ret = parse_history_common_args (h,
+                                        connection,
+                                        &ha)))
   {
-    GNUNET_break (0);
-    return MHD_NO;
+    return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES;
   }
-
+  if (&special_ptr == *con_cls)
+    ha.lp_timeout = GNUNET_TIME_UNIT_ZERO;
   acc = lookup_account (h,
                         account);
   GNUNET_asprintf (&debit_payto,
@@ -1430,16 +1904,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
     if ( (NULL == t) ||
          overflow)
     {
+      GNUNET_free (debit_payto);
+      if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) &&
+          (0 < ha.delta))
+      {
+        GNUNET_assert (0 ==
+                       pthread_mutex_unlock (&h->big_lock));
+        return TALER_MHD_REPLY_JSON_PACK (
+          connection,
+          MHD_HTTP_OK,
+          GNUNET_JSON_pack_array_steal (
+            "outgoing_transactions",
+            history));
+      }
+      *con_cls = &special_ptr;
+      start_lp (h,
+                connection,
+                acc,
+                ha.lp_timeout,
+                LP_DEBIT);
       GNUNET_assert (0 ==
                      pthread_mutex_unlock (&h->big_lock));
-      GNUNET_free (debit_payto);
-      /* FIXME: suspend for long-polling instead */
-      return TALER_MHD_REPLY_JSON_PACK (
-        connection,
-        MHD_HTTP_OK,
-        GNUNET_JSON_pack_array_steal (
-          "outgoing_transactions",
-          history));
+      json_decref (history);
+      return MHD_YES;
     }
     if (t->debit_account != acc)
     {
@@ -1524,6 +2011,21 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
     if (0 < ha.delta)
       pos = pos->next_out;
   }
+  if ( (0 == json_array_size (history)) &&
+       (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) &&
+       (0 < ha.delta))
+  {
+    *con_cls = &special_ptr;
+    start_lp (h,
+              connection,
+              acc,
+              ha.lp_timeout,
+              LP_DEBIT);
+    GNUNET_assert (0 ==
+                   pthread_mutex_unlock (&h->big_lock));
+    json_decref (history);
+    return MHD_YES;
+  }
   GNUNET_assert (0 ==
                  pthread_mutex_unlock (&h->big_lock));
   GNUNET_free (debit_payto);
@@ -1546,22 +2048,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
 static MHD_RESULT
 handle_credit_history (struct TALER_FAKEBANK_Handle *h,
                        struct MHD_Connection *connection,
-                       const char *account)
+                       const char *account,
+                       void **con_cls)
 {
   struct HistoryArgs ha;
   struct Account *acc;
   const struct Transaction *pos;
   json_t *history;
   char *credit_payto;
+  enum GNUNET_GenericReturnValue ret;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Handling /history/incoming connection %p\n",
+              connection);
   if (GNUNET_OK !=
-      parse_history_common_args (h,
-                                 connection,
-                                 &ha))
+      (ret = parse_history_common_args (h,
+                                        connection,
+                                        &ha)))
   {
-    GNUNET_break (0);
-    return MHD_NO;
+    return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES;
   }
+  if (&special_ptr == *con_cls)
+    ha.lp_timeout = GNUNET_TIME_UNIT_ZERO;
+  *con_cls = &special_ptr;
   acc = lookup_account (h,
                         account);
   history = json_array ();
@@ -1601,15 +2110,28 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
     if ( (NULL == t) ||
          overflow)
     {
+      GNUNET_free (credit_payto);
+      if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) &&
+          (0 < ha.delta))
+      {
+        GNUNET_assert (0 ==
+                       pthread_mutex_unlock (&h->big_lock));
+        return TALER_MHD_REPLY_JSON_PACK (connection,
+                                          MHD_HTTP_OK,
+                                          GNUNET_JSON_pack_array_steal (
+                                            "incoming_transactions",
+                                            history));
+      }
+      *con_cls = &special_ptr;
+      start_lp (h,
+                connection,
+                acc,
+                ha.lp_timeout,
+                LP_CREDIT);
       GNUNET_assert (0 ==
                      pthread_mutex_unlock (&h->big_lock));
-      GNUNET_free (credit_payto);
-      /* FIXME: suspend for long-polling instead */
-      return TALER_MHD_REPLY_JSON_PACK (connection,
-                                        MHD_HTTP_OK,
-                                        GNUNET_JSON_pack_array_steal (
-                                          "incoming_transactions",
-                                          history));
+      json_decref (history);
+      return MHD_YES;
     }
     if (skip)
     {
@@ -1681,6 +2203,21 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
     if (0 < ha.delta)
       pos = pos->next_in;
   }
+  if ( (0 == json_array_size (history)) &&
+       (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) &&
+       (0 < ha.delta))
+  {
+    *con_cls = &special_ptr;
+    start_lp (h,
+              connection,
+              acc,
+              ha.lp_timeout,
+              LP_CREDIT);
+    GNUNET_assert (0 ==
+                   pthread_mutex_unlock (&h->big_lock));
+    json_decref (history);
+    return MHD_YES;
+  }
   GNUNET_assert (0 ==
                  pthread_mutex_unlock (&h->big_lock));
   GNUNET_free (credit_payto);
@@ -1702,7 +2239,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
  * @param account which account should process the request
  * @param upload_data request data
  * @param upload_data_size size of @a upload_data in bytes
- * @param con_cls closure for request (a `struct Buffer *`)
+ * @param con_cls closure
  * @return MHD result code
  */
 static MHD_RESULT
@@ -1727,18 +2264,19 @@ serve (struct TALER_FAKEBANK_Handle *h,
          (NULL != account) )
       return handle_credit_history (h,
                                     connection,
-                                    account);
+                                    account,
+                                    con_cls);
     if ( (0 == strcmp (url,
                        "/history/outgoing")) &&
          (NULL != account) )
       return handle_debit_history (h,
                                    connection,
-                                   account);
+                                   account,
+                                   con_cls);
     if (0 == strcmp (url,
                      "/"))
       return handle_home_page (h,
-                               connection,
-                               con_cls);
+                               connection);
   }
   else if (0 == strcasecmp (method,
                             MHD_HTTP_METHOD_POST))
@@ -1762,12 +2300,15 @@ serve (struct TALER_FAKEBANK_Handle *h,
                               con_cls);
   }
   /* Unexpected URL path, just close the connection. */
-  /* We're rather impolite here, but it's a testcase. */
   TALER_LOG_ERROR ("Breaking URL: %s %s\n",
                    method,
                    url);
   GNUNET_break_op (0);
-  return MHD_NO;
+  return TALER_MHD_reply_with_error (
+    connection,
+    MHD_HTTP_NOT_FOUND,
+    TALER_EC_GENERIC_ENDPOINT_UNKNOWN,
+    url);
 }
 
 
@@ -1781,7 +2322,7 @@ serve (struct TALER_FAKEBANK_Handle *h,
  * @param version HTTP version (ignored)
  * @param upload_data request data
  * @param upload_data_size size of @a upload_data in bytes
- * @param con_cls closure for request (a `struct Buffer *`)
+ * @param con_cls closure for request
  * @return MHD result code
  */
 static MHD_RESULT
@@ -1823,15 +2364,6 @@ handle_mhd_request (void *cls,
 }
 
 
-/**
- * Task run whenever HTTP server operations are pending.
- *
- * @param cls the `struct TALER_FAKEBANK_Handle`
- */
-static void
-run_mhd (void *cls);
-
-
 #if EPOLL_SUPPORT
 /**
  * Schedule MHD.  This function should be called initially when an
@@ -1982,6 +2514,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
   }
   GNUNET_assert (strlen (currency) < TALER_CURRENCY_LEN);
   h = GNUNET_new (struct TALER_FAKEBANK_Handle);
+  h->lp_event = -1;
   h->port = port;
   h->ram_limit = ram_limit;
   h->serial_counter = 0;
@@ -2027,6 +2560,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
     TALER_FAKEBANK_stop (h);
     return NULL;
   }
+  h->lp_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
   h->currency = GNUNET_strdup (currency);
   GNUNET_asprintf (&h->my_baseurl,
                    "http://localhost:%u/";,
@@ -2061,6 +2595,28 @@ TALER_FAKEBANK_start2 (uint16_t port,
   }
   else
   {
+    h->lp_event = eventfd (0,
+                           EFD_CLOEXEC);
+    if (-1 == h->lp_event)
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+                           "eventfd");
+      TALER_FAKEBANK_stop (h);
+      return NULL;
+    }
+    if (0 !=
+        pthread_create (&h->lp_thread,
+                        NULL,
+                        &lp_expiration_thread,
+                        h))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+                           "pthread_create");
+      GNUNET_break (0 == close (h->lp_event));
+      h->lp_event = -1;
+      TALER_FAKEBANK_stop (h);
+      return NULL;
+    }
     h->mhd_bank = MHD_start_daemon (MHD_USE_DEBUG
                                     | MHD_USE_AUTO_INTERNAL_THREAD
                                     | MHD_ALLOW_SUSPEND_RESUME

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]