gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-cashless2ecash] branch master updated: fix: transfer


From: gnunet
Subject: [taler-cashless2ecash] branch master updated: fix: transfer
Date: Sat, 18 May 2024 09:25:29 +0200

This is an automated email from the git hooks/post-receive script.

joel-haeberli pushed a commit to branch master
in repository cashless2ecash.

The following commit(s) were added to refs/heads/master by this push:
     new 5bef918  fix: transfer
5bef918 is described below

commit 5bef9183bf0fe7fd329e934b9e73b9c758ac8098
Author: Joel-Haeberli <haebu@rubigen.ch>
AuthorDate: Sat May 18 09:25:15 2024 +0200

    fix: transfer
---
 c2ec/api-wire-gateway.go                  |  2 +-
 c2ec/c2ec-config.yaml                     |  2 +-
 c2ec/db-postgres.go                       | 33 ++++++++++++++++++++++++++++++-
 c2ec/db/test_c2ec_simulation.sql          | 19 ------------------
 c2ec/db/test_c2ec_simulation_rollback.sql | 20 -------------------
 c2ec/exponential-backoff.go               |  2 +-
 c2ec/logger.go                            |  4 ++++
 c2ec/main.go                              |  4 ++--
 c2ec/proc-attestor.go                     | 24 +++++++++++-----------
 c2ec/proc-retrier.go                      | 19 +++++++++++-------
 c2ec/proc-transfer.go                     | 15 +++++++-------
 c2ec/wallee-client.go                     |  4 ++++
 12 files changed, 76 insertions(+), 72 deletions(-)

diff --git a/c2ec/api-wire-gateway.go b/c2ec/api-wire-gateway.go
index 4bb6abb..1f250a3 100644
--- a/c2ec/api-wire-gateway.go
+++ b/c2ec/api-wire-gateway.go
@@ -3,7 +3,7 @@ package main
 import (
        "errors"
        "log"
-       http "net/http"
+       "net/http"
        "strconv"
        "time"
 )
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 1e98218..dc9e79e 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -8,7 +8,7 @@ c2ec:
   fail-on-missing-attestors: false # forced if prod=true
   credit-account: "payto://IBAN/CH50030202099498" # this account must be 
specified at the providers backends as well
   currency: "CHF"
-  max-retries: 3
+  max-retries: 100
   retry-delay-ms: 1000
   wire-gateway:
     username: "wire"
diff --git a/c2ec/db-postgres.go b/c2ec/db-postgres.go
index 384022d..a92df66 100644
--- a/c2ec/db-postgres.go
+++ b/c2ec/db-postgres.go
@@ -140,6 +140,9 @@ const PS_GET_TRANSFERS_DESC_MAX = "SELECT * FROM " + 
TRANSFER_TABLE_NAME +
        " OFFSET ((SELECT COUNT(*) FROM " + TRANSFER_TABLE_NAME +
        " WHERE " + TRANSFER_FIELD_NAME_STATUS + "=0)-1)" // TODO Timestamp 
based offset (-time since request)
 
+const PS_GET_TRANSFERS_BY_STATUS = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+       " WHERE " + TRANSFER_FIELD_NAME_STATUS + "=$1"
+
 // Postgres implementation of the C2ECDatabase
 type C2ECPostgres struct {
        C2ECDatabase
@@ -686,7 +689,6 @@ func (db *C2ECPostgres) GetTransferById(requestUid []byte) 
(*Transfer, error) {
                LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
                return transfer, nil
        }
-
 }
 
 func (db *C2ECPostgres) AddTransfer(
@@ -808,6 +810,35 @@ func (db *C2ECPostgres) GetTransfers(start int, delta int) 
([]*Transfer, error)
        }
 }
 
+func (db *C2ECPostgres) GetTransfersByState(status int) ([]*Transfer, error) {
+
+       if rows, err := db.pool.Query(
+               db.ctx,
+               PS_GET_TRANSFERS_BY_STATUS,
+               status,
+       ); err != nil {
+               LogError("postgres", err)
+               if rows != nil {
+                       rows.Close()
+               }
+               return nil, err
+       } else {
+
+               defer rows.Close()
+
+               transfers, err := pgx.CollectRows(rows, 
pgx.RowToAddrOfStructByName[Transfer])
+               if err != nil {
+                       LogError("postgres", err)
+                       return nil, err
+               }
+
+               // this will fill up the logs...
+               // LogInfo("postgres", "query="+PS_GET_TRANSFERS_BY_STATUS)
+               // LogInfo("postgres", "size of transfer 
list="+strconv.Itoa(len(transfers)))
+               return removeNulls(transfers), nil
+       }
+}
+
 // Sets up a a listener for the given channel.
 // Notifications will be sent through the out channel.
 func (db *C2ECPostgres) NewListener(
diff --git a/c2ec/db/test_c2ec_simulation.sql b/c2ec/db/test_c2ec_simulation.sql
deleted file mode 100644
index 4492798..0000000
--- a/c2ec/db/test_c2ec_simulation.sql
+++ /dev/null
@@ -1,19 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_id;
-
-INSERT INTO provider (name, payto_target_type, backend_base_url, 
backend_credentials) 
-    VALUES ('Simulation', 'void', 'will be simulated', 'no creds');
-
-SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation';
-
-INSERT INTO terminal (access_token, description, provider_id) 
-    VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated 
terminal', (SELECT * FROM p_id));
-
-DROP TABLE IF EXISTS p_id;
-
-COMMIT;
-
-SELECT * FROM provider;
diff --git a/c2ec/db/test_c2ec_simulation_rollback.sql 
b/c2ec/db/test_c2ec_simulation_rollback.sql
deleted file mode 100644
index ab31112..0000000
--- a/c2ec/db/test_c2ec_simulation_rollback.sql
+++ /dev/null
@@ -1,20 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation';
-SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT * 
FROM p_r_id);
-
-DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id);
-DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
-DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id);
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-COMMIT;
-
-SELECT * FROM provider;
\ No newline at end of file
diff --git a/c2ec/exponential-backoff.go b/c2ec/exponential-backoff.go
index b503ee6..074fb7a 100644
--- a/c2ec/exponential-backoff.go
+++ b/c2ec/exponential-backoff.go
@@ -61,7 +61,7 @@ func randomizeBackoff(backoff int64) int64 {
                if subtracted < 0 {
                        return 0
                }
-               return backoff - randomizedThreshold
+               return subtracted
        }
        return backoff + randomizedThreshold
 }
diff --git a/c2ec/logger.go b/c2ec/logger.go
index 89dd7e8..86b5d85 100644
--- a/c2ec/logger.go
+++ b/c2ec/logger.go
@@ -37,6 +37,10 @@ func LogInfo(src string, msg string) {
 
 func logAppendError(src string, level LogLevel, err error) {
 
+       if err == nil {
+               fmt.Println("wanted to log from " + src + " but err was nil")
+               return
+       }
        logAppend(src, level, err.Error())
 }
 
diff --git a/c2ec/main.go b/c2ec/main.go
index 2287c30..e47cf1c 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -87,7 +87,7 @@ func main() {
        transferCtx, transferCancel := context.WithCancel(context.Background())
        defer transferCancel()
        transferErrs := make(chan error)
-       RunRefunder(transferCtx, transferErrs)
+       RunTransferrer(transferCtx, transferErrs)
        LogInfo("main", "refunder is running")
 
        router := http.NewServeMux()
@@ -127,7 +127,7 @@ func main() {
                case <-transferCtx.Done():
                        transferCancel() // first run old cancellation function
                        transferCtx, transferCancel = 
context.WithCancel(context.Background())
-                       RunRefunder(transferCtx, transferErrs)
+                       RunTransferrer(transferCtx, transferErrs)
                case attestationError := <-attestorErrs:
                        LogError("main-from-proc-attestor", attestationError)
                case retryError := <-retryErrs:
diff --git a/c2ec/proc-attestor.go b/c2ec/proc-attestor.go
index 98d2b63..c535607 100644
--- a/c2ec/proc-attestor.go
+++ b/c2ec/proc-attestor.go
@@ -30,7 +30,7 @@ func RunAttestor(
 
 func attestationCallback(notification *Notification, errs chan error) {
 
-       LogInfo("attestor", fmt.Sprintf("retrieved information on channel=%s 
with payload=%s", notification.Channel, notification.Payload))
+       LogInfo("proc-attestor", fmt.Sprintf("retrieved information on 
channel=%s with payload=%s", notification.Channel, notification.Payload))
 
        // The payload is formatted like: 
"{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}"
        // the validation is strict. This means, that the dispatcher emits an 
error
@@ -60,7 +60,7 @@ func attestationCallback(notification *Notification, errs 
chan error) {
 
        transaction, err := client.GetTransaction(providerTransactionId)
        if err != nil {
-               LogError("attestor", err)
+               LogError("proc-attestor", err)
                prepareRetryOrAbort(withdrawalRowId, errs)
                return
        }
@@ -80,7 +80,7 @@ func finaliseOrSetRetry(
 
        if transaction == nil {
                err := errors.New("transaction was nil. will set retry or 
abort")
-               LogError("attestor", err)
+               LogError("proc-attestor", err)
                errs <- err
                prepareRetryOrAbort(withdrawalRowId, errs)
                return
@@ -94,7 +94,7 @@ func finaliseOrSetRetry(
 
                        err := DB.FinaliseWithdrawal(withdrawalRowId, 
CONFIRMED, completionProof)
                        if err != nil {
-                               LogError("attestor", err)
+                               LogError("proc-attestor", err)
                                prepareRetryOrAbort(withdrawalRowId, errs)
                        }
                } else {
@@ -104,7 +104,7 @@ func finaliseOrSetRetry(
                        if transaction.AbortWithdrawal() {
                                err := DB.FinaliseWithdrawal(withdrawalRowId, 
ABORTED, completionProof)
                                if err != nil {
-                                       LogError("attestor", err)
+                                       LogError("proc-attestor", err)
                                        prepareRetryOrAbort(withdrawalRowId, 
errs)
                                        return
                                }
@@ -130,26 +130,24 @@ func prepareRetryOrAbort(
 
        withdrawal, err := DB.GetWithdrawalById(withdrawalRowId)
        if err != nil {
-               LogError("attestor", err)
+               LogError("proc-attestor", err)
                errs <- err
                return
        }
 
-       // TODO retry will not work like this at the moment
-       execRetry := ShouldStartRetry(time.Unix(*withdrawal.LastRetryTs, 0), 
int(withdrawal.RetryCounter), MAX_BACKOFF_MS)
-       if !execRetry {
-               LogInfo("attestor", fmt.Sprintf("max retries for withdrawal 
with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId))
+       if withdrawal.RetryCounter >= CONFIG.Server.MaxRetries {
+
+               LogInfo("proc-attestor", fmt.Sprintf("max retries for 
withdrawal with id=%d was reached. withdrawal is aborted.", 
withdrawal.WithdrawalRowId))
                err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, 
make([]byte, 0))
                if err != nil {
-                       LogError("attestor", err)
+                       LogError("proc-attestor", err)
                }
        } else {
 
                lastRetryTs := time.Now().Unix()
                err := DB.SetLastRetry(withdrawalRowId, lastRetryTs)
                if err != nil {
-                       LogError("attestor", err)
+                       LogError("proc-attestor", err)
                }
        }
-
 }
diff --git a/c2ec/proc-retrier.go b/c2ec/proc-retrier.go
index f00313e..24bd6b0 100644
--- a/c2ec/proc-retrier.go
+++ b/c2ec/proc-retrier.go
@@ -18,44 +18,49 @@ func RunRetrier(ctx context.Context, errs chan error) {
                make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE),
                errs,
        )
+
+       go func() {
+               for {
+                       time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * 
time.Millisecond)
+
+               }
+       }()
 }
 
 func retryCallback(n *Notification, errs chan error) {
 
        withdrawalId, err := strconv.Atoi(n.Payload)
        if err != nil {
-               LogError("retrier", err)
+               LogError("proc-retrier", err)
                errs <- err
                return
        }
 
        withdrawal, err := DB.GetWithdrawalById(withdrawalId)
        if err != nil {
-               LogError("retrier", err)
+               LogError("proc-retrier", err)
                errs <- err
                return
        }
 
        provider, err := DB.GetProviderByTerminal(withdrawal.TerminalId)
        if err != nil {
-               LogError("retrier", err)
+               LogError("proc-retrier", err)
                errs <- err
                return
        }
 
        err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
        if err != nil {
-               LogError("retrier", err)
+               LogError("proc-retrier", err)
                errs <- err
                return
        }
 
-       time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
-
        client := PROVIDER_CLIENTS[provider.Name]
        transaction, err := 
client.GetTransaction(*withdrawal.ProviderTransactionId)
        if err != nil {
-               LogError("retrier", err)
+               LogError("proc-retrier", err)
                errs <- err
                return
        }
diff --git a/c2ec/proc-transfer.go b/c2ec/proc-transfer.go
index 770bfa4..2b17a8a 100644
--- a/c2ec/proc-transfer.go
+++ b/c2ec/proc-transfer.go
@@ -20,7 +20,7 @@ const TRANSFER_STATUS_FAILED = -1
 const MAX_TRANSFER_BACKOFF_MS = 24 * 60 * 60 * 1000 // 1 day
 
 // Sets up and runs an attestor in the background. This must be called at 
startup.
-func RunRefunder(
+func RunTransferrer(
        ctx context.Context,
        errs chan error,
 ) {
@@ -34,7 +34,6 @@ func RunRefunder(
        )
 
        go func() {
-
                for {
                        time.Sleep(REFUND_RETRY_INTERVAL_SECONDS * time.Second)
                        executePendingTransfers(errs)
@@ -44,7 +43,7 @@ func RunRefunder(
 
 func transferCallback(notification *Notification, errs chan error) {
 
-       LogInfo("refunder", fmt.Sprintf("retrieved information on channel=%s 
with payload=%s", notification.Channel, notification.Payload))
+       LogInfo("proc-transfer", fmt.Sprintf("retrieved information on 
channel=%s with payload=%s", notification.Channel, notification.Payload))
 
        transferRequestUidBase64 := notification.Payload
        if transferRequestUidBase64 == "" {
@@ -60,7 +59,7 @@ func transferCallback(notification *Notification, errs chan 
error) {
 
        transfer, err := DB.GetTransferById(transferRequestUid)
        if err != nil {
-               LogError("refunder", err)
+               LogError("proc-transfer", err)
                transferFailed(transfer, errs)
                errs <- err
        }
@@ -74,7 +73,7 @@ func transferCallback(notification *Notification, errs chan 
error) {
 
        provider, err := 
DB.GetTerminalProviderByPaytoTargetType(paytoTargetType)
        if err != nil {
-               LogError("refunder", err)
+               LogError("proc-transfer", err)
                transferFailed(transfer, errs)
                errs <- err
        }
@@ -86,7 +85,7 @@ func transferCallback(notification *Notification, errs chan 
error) {
 
        err = client.Refund(tid)
        if err != nil {
-               LogError("refunder", err)
+               LogError("proc-transfer", err)
                transferFailed(transfer, errs)
                return
        }
@@ -107,6 +106,7 @@ func executePendingTransfers(errs chan error) {
        transfers, err := DB.GetTransfersByState(TRANSFER_STATUS_RETRY)
        if err != nil {
                LogError("proc-transfer", err)
+               errs <- err
                return
        }
 
@@ -115,6 +115,7 @@ func executePendingTransfers(errs chan error) {
                shouldRetry := ShouldStartRetry(time.Unix(t.TransferTs, 0), 
int(t.Retries), MAX_TRANSFER_BACKOFF_MS)
                if !shouldRetry {
                        LogInfo("proc-transfer", fmt.Sprintf("not retrying 
transfer %d, because backoff not yet exceeded", t.RowId))
+                       continue
                }
 
                paytoTargetType, tid, err := 
ParsePaytoWalleeTransaction(t.CreditAccount)
@@ -140,11 +141,11 @@ func executePendingTransfers(errs chan error) {
                err = client.Refund(tid)
                if err != nil {
                        LogError("proc-transfer", err)
+                       transferFailed(t, errs)
                        errs <- err
                        continue
                }
        }
-       close(errs)
 }
 
 func transferFailed(
diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go
index a459028..95a9292 100644
--- a/c2ec/wallee-client.go
+++ b/c2ec/wallee-client.go
@@ -147,6 +147,10 @@ func (w *WalleeClient) GetTransaction(transactionId 
string) (ProviderTransaction
 
 func (sc *WalleeClient) FormatPayto(w *Withdrawal) string {
 
+       if w == nil || w.ProviderTransactionId == nil {
+               LogError("wallee-client", errors.New("withdrawal or provider 
transaction identifier was nil"))
+               return ""
+       }
        return fmt.Sprintf("payto://wallee-transaction/%s", 
*w.ProviderTransactionId)
 }
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]