gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: clean up aggregator logic, make


From: gnunet
Subject: [taler-exchange] branch master updated: clean up aggregator logic, make it more robust against invariant failures
Date: Sat, 14 Mar 2020 22:56:18 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new ce44b4a0 clean up aggregator logic, make it more robust against 
invariant failures
ce44b4a0 is described below

commit ce44b4a02849a4f9f3e9cf3fd2e76da4ab2b0e64
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Mar 14 22:56:14 2020 +0100

    clean up aggregator logic, make it more robust against invariant failures
---
 doc/prebuilt                             |   2 +-
 src/exchange/taler-exchange-aggregator.c | 221 ++++++++++++++++---------------
 2 files changed, 116 insertions(+), 107 deletions(-)

diff --git a/doc/prebuilt b/doc/prebuilt
index 934a6a18..ca53235c 160000
--- a/doc/prebuilt
+++ b/doc/prebuilt
@@ -1 +1 @@
-Subproject commit 934a6a18301e81c4fd1b3a8cda2dc13dca4741cc
+Subproject commit ca53235ccfa0458ebf11c204888ca370e20ec3f5
diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index 59db4dae..c3b94b3d 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -16,7 +16,7 @@
 
 /**
  * @file taler-exchange-aggregator.c
- * @brief Process that aggregates outgoing transactions and executes them
+ * @brief Process that aggregates outgoing transactions and prepares their 
execution
  * @author Christian Grothoff
  */
 #include "platform.h"
@@ -70,7 +70,7 @@ struct AggregationUnit
   /**
    * Row ID of the transaction that started it all.
    */
-  unsigned long long row_id;
+  uint64_t row_id;
 
   /**
    * The current time (which triggered the aggregation and
@@ -100,10 +100,9 @@ struct AggregationUnit
   struct TALER_BANK_PrepareHandle *ph;
 
   /**
-   * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the
-   * aggregation.
+   * Array of row_ids from the aggregation.
    */
-  unsigned long long *additional_rows;
+  uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
 
   /**
    * Offset specifying how many @e additional_rows are in use.
@@ -123,11 +122,6 @@ struct AggregationUnit
 };
 
 
-/**
- * Which currency is used by this exchange?
- */
-static char *exchange_currency_string;
-
 /**
  * What is the smallest unit we support for wire transfers?
  * We will need to round down to a multiple of this amount.
@@ -162,10 +156,23 @@ static struct GNUNET_SCHEDULER_Task *task;
 static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
 
 /**
- * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
- * on serious errors.
+ * Value to return from main(). 0 on success, non-zero on erorrs.
  */
-static int global_ret;
+static enum
+{
+  GR_SUCCESS = 0,
+  GR_DATABASE_SESSION_FAIL = 1,
+  GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
+  GR_DATABASE_READY_DEPOSIT_HARD_FAIL = 3,
+  GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL = 4,
+  GR_DATABASE_TINY_MARK_HARD_FAIL = 5,
+  GR_DATABASE_PREPARE_HARD_FAIL = 6,
+  GR_DATABASE_PREPARE_COMMIT_HARD_FAIL = 7,
+  GR_INVARIANT_FAILURE = 8,
+  GR_CONFIGURATION_INVALID = 9,
+  GR_CMD_LINE_UTF8_ERROR = 9,
+  GR_CMD_LINE_OPTIONS_WRONG = 10,
+} global_ret;
 
 /**
  * #GNUNET_YES if we are in test mode and should exit when idle.
@@ -192,7 +199,6 @@ static void
 cleanup_au (struct AggregationUnit *au)
 {
   GNUNET_assert (NULL != au);
-  GNUNET_free_non_null (au->additional_rows);
   if (NULL != au->wire)
     json_decref (au->wire);
   memset (au,
@@ -230,7 +236,7 @@ shutdown_task (void *cls)
  * @return #GNUNET_OK on success
  */
 static int
-parse_wirewatch_config ()
+parse_wirewatch_config (void)
 {
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -254,33 +260,11 @@ parse_wirewatch_config ()
                                "AGGREGATOR_IDLE_SLEEP_INTERVAL");
     return GNUNET_SYSERR;
   }
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_string (cfg,
-                                             "taler",
-                                             "CURRENCY",
-                                             &exchange_currency_string))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
-                               "taler",
-                               "CURRENCY");
-    return GNUNET_SYSERR;
-  }
-  if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Currency `%s' longer than the allowed limit of %u 
characters.",
-                exchange_currency_string,
-                (unsigned int) TALER_CURRENCY_LEN);
-    return GNUNET_SYSERR;
-  }
-
   if ( (GNUNET_OK !=
         TALER_config_get_amount (cfg,
                                  "taler",
                                  "CURRENCY_ROUND_UNIT",
                                  &currency_round_unit)) ||
-       (0 != strcasecmp (exchange_currency_string,
-                         currency_round_unit.currency)) ||
        ( (0 != currency_round_unit.fraction) &&
          (0 != currency_round_unit.value) ) )
   {
@@ -396,19 +380,29 @@ deposit_cb (void *cls,
   }
   if (GNUNET_NO == au->have_refund)
   {
+    struct TALER_Amount ntotal;
+
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Non-refunded transaction, subtracting deposit fee %s\n",
                 TALER_amount2s (deposit_fee));
     if (GNUNET_SYSERR ==
-        TALER_amount_subtract (&au->total_amount,
+        TALER_amount_subtract (&ntotal,
                                amount_with_fee,
                                deposit_fee))
     {
+      /* This should never happen, issue a warning, but continue processing
+         with an amount of zero, least we hang here for good. */
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Fatally malformed record at row %llu over %s\n",
+                  "Fatally malformed record at row %llu over %s (deposit fee 
exceeds deposited value)\n",
                   (unsigned long long) row_id,
                   TALER_amount2s (amount_with_fee));
-      return GNUNET_DB_STATUS_HARD_ERROR;
+      GNUNET_assert (GNUNET_OK ==
+                     TALER_amount_get_zero (au->total_amount.currency,
+                                            &au->total_amount));
+    }
+    else
+    {
+      au->total_amount = ntotal;
     }
   }
 
@@ -440,13 +434,16 @@ deposit_cb (void *cls,
 
     url = TALER_JSON_wire_to_payto (au->wire);
     au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
+    if (NULL == au->wa)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "No exchange account configured for `%s', please fix your 
setup to continue!\n",
+                  url);
+      GNUNET_free (url);
+      return GNUNET_DB_STATUS_HARD_ERROR;
+    }
     GNUNET_free (url);
   }
-  if (NULL == au->wa)
-  {
-    GNUNET_break (0);
-    return GNUNET_DB_STATUS_HARD_ERROR;
-  }
 
   /* make sure we have current fees */
   au->execution_time = GNUNET_TIME_absolute_get ();
@@ -462,7 +459,8 @@ deposit_cb (void *cls,
     if (NULL == af)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Could not get or persist wire fees. Aborting run.\n");
+                  "Could not get or persist wire fees for %s. Aborting run.\n",
+                  GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
       return GNUNET_DB_STATUS_HARD_ERROR;
     }
     au->wire_fee = af->wire_fee;
@@ -549,17 +547,6 @@ aggregate_cb (void *cls,
               "Adding transaction amount %s from row %llu to aggregation\n",
               TALER_amount2s (amount_with_fee),
               (unsigned long long) row_id);
-  if (GNUNET_OK !=
-      TALER_amount_add (&au->total_amount,
-                        &au->total_amount,
-                        amount_with_fee))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Overflow or currency incompatibility during aggregation at 
%llu\n",
-                (unsigned long long) row_id);
-    /* Skip this one, but keep going! */
-    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-  }
   au->have_refund = GNUNET_NO;
   qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
                                           au->session,
@@ -580,22 +567,43 @@ aggregate_cb (void *cls,
                 TALER_amount2s (deposit_fee));
     if (GNUNET_SYSERR ==
         TALER_amount_subtract (&delta,
-                               &au->total_amount,
+                               amount_with_fee,
                                deposit_fee))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Fatally malformed record at %llu over amount %s\n",
+                  "Fatally malformed record at %llu over amount %s (deposit 
fee exceeds deposited value)\n",
                   (unsigned long long) row_id,
                   TALER_amount2s (&au->total_amount));
-      return GNUNET_DB_STATUS_HARD_ERROR;
     }
-    au->total_amount = delta;
+    else
+    {
+      GNUNET_assert (GNUNET_OK ==
+                     TALER_amount_get_zero (au->total_amount.currency,
+                                            &delta));
+    }
+  }
+  else
+  {
+    delta = *amount_with_fee;
+  }
+
+  {
+    struct TALER_Amount tmp;
+
+    if (GNUNET_OK !=
+        TALER_amount_add (&tmp,
+                          &au->total_amount,
+                          &delta))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Overflow or currency incompatibility during aggregation at 
%llu\n",
+                  (unsigned long long) row_id);
+      /* Skip this one, but keep going! */
+      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+    }
+    au->total_amount = tmp;
   }
 
-  if (NULL == au->additional_rows)
-    au->additional_rows = GNUNET_new_array (
-      TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT,
-      unsigned long long);
   /* "append" to our list of rows */
   au->additional_rows[au->rows_offset++] = row_id;
   /* insert into aggregation tracking table */
@@ -659,22 +667,16 @@ run_aggregation (void *cls)
   struct AggregationUnit au_active;
   struct TALER_EXCHANGEDB_Session *session;
   enum GNUNET_DB_QueryStatus qs;
-  const struct GNUNET_SCHEDULER_TaskContext *tc;
-  void *buf;
-  size_t buf_size;
 
   (void) cls;
   task = NULL;
-  tc = GNUNET_SCHEDULER_get_task_context ();
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Checking for ready deposits to aggregate\n");
   if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to obtain database session!\n");
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_SESSION_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -684,7 +686,7 @@ run_aggregation (void *cls)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start database transaction!\n");
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -705,7 +707,7 @@ run_aggregation (void *cls)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to execute deposit iteration!\n");
-      global_ret = GNUNET_SYSERR;
+      global_ret = GR_DATABASE_READY_DEPOSIT_HARD_FAIL;
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
@@ -754,7 +756,7 @@ run_aggregation (void *cls)
     cleanup_au (&au_active);
     db_plugin->rollback (db_plugin->cls,
                          session);
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -803,7 +805,7 @@ run_aggregation (void *cls)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to start database transaction!\n");
-      global_ret = GNUNET_SYSERR;
+      global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
       cleanup_au (&au_active);
       GNUNET_SCHEDULER_shutdown ();
       return;
@@ -841,6 +843,7 @@ run_aggregation (void *cls)
       db_plugin->rollback (db_plugin->cls,
                            session);
       cleanup_au (&au_active);
+      global_ret = GR_DATABASE_TINY_MARK_HARD_FAIL;
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
@@ -864,30 +867,35 @@ run_aggregation (void *cls)
                 TALER_B2S (&au_active.merchant_pub));
     GNUNET_free (amount_s);
   }
+
   {
-    char *url;
+    void *buf;
+    size_t buf_size;
 
-    url = TALER_JSON_wire_to_payto (au_active.wire);
-    TALER_BANK_prepare_transfer (url,
-                                 &au_active.final_amount,
-                                 exchange_base_url,
-                                 &au_active.wtid,
-                                 &buf,
-                                 &buf_size);
-    GNUNET_free (url);
+    {
+      char *url;
+
+      url = TALER_JSON_wire_to_payto (au_active.wire);
+      TALER_BANK_prepare_transfer (url,
+                                   &au_active.final_amount,
+                                   exchange_base_url,
+                                   &au_active.wtid,
+                                   &buf,
+                                   &buf_size);
+      GNUNET_free (url);
+    }
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Storing %u bytes of wire prepare data\n",
+                (unsigned int) buf_size);
+    /* Commit our intention to execute the wire transfer! */
+    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+                                              session,
+                                              au_active.wa->method,
+                                              buf,
+                                              buf_size);
+    GNUNET_free (buf);
   }
-  GNUNET_free_non_null (au_active.additional_rows);
-  au_active.additional_rows = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Storing %u bytes of wire prepare data\n",
-              (unsigned int) buf_size);
-  /* Commit our intention to execute the wire transfer! */
-  qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
-                                            session,
-                                            au_active.wa->method,
-                                            buf,
-                                            buf_size);
-  GNUNET_free (buf);
   /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
      table constraints */
   if (qs >= 0)
@@ -918,7 +926,7 @@ run_aggregation (void *cls)
     db_plugin->rollback (db_plugin->cls,
                          session);
     /* die hard */
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_PREPARE_HARD_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -940,7 +948,7 @@ run_aggregation (void *cls)
     return;
   case GNUNET_DB_STATUS_HARD_ERROR:
     GNUNET_break (0);
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_PREPARE_COMMIT_HARD_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -952,7 +960,7 @@ run_aggregation (void *cls)
     return;
   default:
     GNUNET_break (0);
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_INVARIANT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -981,7 +989,7 @@ run (void *cls,
   if (GNUNET_OK != parse_wirewatch_config ())
   {
     cfg = NULL;
-    global_ret = 1;
+    global_ret = GR_CONFIGURATION_INVALID;
     return;
   }
   GNUNET_assert (NULL == task);
@@ -997,7 +1005,7 @@ run (void *cls,
  *
  * @param argc number of arguments from the command line
  * @param argv command line arguments
- * @return 0 ok, 1 on error
+ * @return 0 ok, non-zero on error, see #global_ret
  */
 int
 main (int argc,
@@ -1014,9 +1022,10 @@ main (int argc,
     GNUNET_GETOPT_OPTION_END
   };
 
-  if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
-                                                 &argc, &argv))
-    return 2;
+  if (GNUNET_OK !=
+      GNUNET_STRINGS_get_utf8_args (argc, argv,
+                                    &argc, &argv))
+    return GR_CMD_LINE_UTF8_ERROR;
   if (GNUNET_OK !=
       GNUNET_PROGRAM_run (argc, argv,
                           "taler-exchange-aggregator",
@@ -1026,7 +1035,7 @@ main (int argc,
                           &run, NULL))
   {
     GNUNET_free ((void *) argv);
-    return 1;
+    return GR_CMD_LINE_OPTIONS_WRONG;
   }
   GNUNET_free ((void *) argv);
   return global_ret;

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]