gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 02/02: wallet-core: fix peer-(push,pull)-debit withd


From: gnunet
Subject: [taler-wallet-core] 02/02: wallet-core: fix peer-(push,pull)-debit withdrawal states
Date: Mon, 19 Jun 2023 16:03:11 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit 54f0c82999833132baf83995526025ac56d6fe06
Author: Florian Dold <florian@dold.me>
AuthorDate: Mon Jun 19 16:03:06 2023 +0200

    wallet-core: fix peer-(push,pull)-debit withdrawal states
---
 packages/taler-harness/src/harness/harness.ts      |  16 +-
 packages/taler-harness/src/harness/helpers.ts      |   5 +-
 packages/taler-wallet-core/src/db.ts               |   2 +-
 .../taler-wallet-core/src/operations/common.ts     |  13 +
 .../src/operations/pay-peer-pull-credit.ts         | 256 +++++++++++++-------
 .../src/operations/pay-peer-pull-debit.ts          |  11 +-
 .../src/operations/pay-peer-push-credit.ts         | 267 ++++++++++++++-------
 .../src/operations/pay-peer-push-debit.ts          |  25 +-
 .../src/operations/transactions.ts                 |  12 +-
 .../taler-wallet-core/src/operations/withdraw.ts   | 191 +++++++++++----
 10 files changed, 555 insertions(+), 243 deletions(-)

diff --git a/packages/taler-harness/src/harness/harness.ts 
b/packages/taler-harness/src/harness/harness.ts
index b0f411a8c..a2ff451d8 100644
--- a/packages/taler-harness/src/harness/harness.ts
+++ b/packages/taler-harness/src/harness/harness.ts
@@ -2179,6 +2179,20 @@ export class WalletService {
     return unixPath;
   }
 
+  get dbPath() {
+    return path.join(
+      this.globalState.testDir,
+      `walletdb-${this.opts.name}.json`,
+    );
+  }
+
+  async stop(): Promise<void> {
+    if (this.walletProc) {
+      this.walletProc.proc.kill("SIGTERM");
+      await this.walletProc.wait();
+    }
+  }
+
   async start(): Promise<void> {
     let dbPath: string;
     if (this.opts.useInMemoryDb) {
@@ -2190,7 +2204,7 @@ export class WalletService {
       );
     }
     const unixPath = this.socketPath;
-    this.globalState.spawnService(
+    this.walletProc = this.globalState.spawnService(
       "taler-wallet-cli",
       [
         "--wallet-db",
diff --git a/packages/taler-harness/src/harness/helpers.ts 
b/packages/taler-harness/src/harness/helpers.ts
index b13fa9cf4..6f70b9455 100644
--- a/packages/taler-harness/src/harness/helpers.ts
+++ b/packages/taler-harness/src/harness/helpers.ts
@@ -331,6 +331,7 @@ export async function createSimpleTestkudosEnvironmentV2(
 export interface CreateWalletArgs {
   handleNotification?(wn: WalletNotification): void;
   name: string;
+  persistent?: boolean;
 }
 
 export async function createWalletDaemonWithClient(
@@ -338,8 +339,8 @@ export async function createWalletDaemonWithClient(
   args: CreateWalletArgs,
 ): Promise<{ walletClient: WalletClient; walletService: WalletService }> {
   const walletService = new WalletService(t, {
-    name: "wallet",
-    useInMemoryDb: true,
+    name: args.name,
+    useInMemoryDb: !args.persistent,
   });
   await walletService.start();
   await walletService.pingUntilAvailable();
diff --git a/packages/taler-wallet-core/src/db.ts 
b/packages/taler-wallet-core/src/db.ts
index 9905fa370..005b23985 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -1881,7 +1881,7 @@ export enum PeerPullPaymentInitiationStatus {
   SuspendedWithdrawing = 33,
   SuspendedAbortingDeletePurse = 34,
 
-  DonePurseDeposited = 50 /* DORMANT_START */,
+  Done = 50 /* DORMANT_START */,
   Failed = 51,
   Aborted = 52,
 }
diff --git a/packages/taler-wallet-core/src/operations/common.ts 
b/packages/taler-wallet-core/src/operations/common.ts
index a64f78b03..ad18767c4 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -474,3 +474,16 @@ export function constructTombstone(p: ParsedTombstone): 
TombstoneIdStr {
       return `tmb:${p.tag}:${p.refundGroupId}` as TombstoneIdStr;
   }
 }
+
+/**
+ * Uniform interface for a particular wallet transaction.
+ */
+export interface TransactionManager {
+  get taskId(): TaskId;
+  get transactionId(): TransactionIdStr;
+  fail(): Promise<void>;
+  abort(): Promise<void>;
+  suspend(): Promise<void>;
+  resume(): Promise<void>;
+  process(): Promise<OperationAttemptResult>;
+}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index 447ffce8f..48b81d6c2 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -91,7 +91,7 @@ import {
 
 const logger = new Logger("pay-peer-pull-credit.ts");
 
-export async function queryPurseForPeerPullCredit(
+async function queryPurseForPeerPullCredit(
   ws: InternalWalletState,
   pullIni: PeerPullPaymentInitiationRecord,
   cancellationToken: CancellationToken,
@@ -102,7 +102,7 @@ export async function queryPurseForPeerPullCredit(
   );
   purseDepositUrl.searchParams.set("timeout_ms", "30000");
   logger.info(`querying purse status via ${purseDepositUrl.href}`);
-  const resp = await ws.http.get(purseDepositUrl.href, {
+  const resp = await ws.http.fetch(purseDepositUrl.href, {
     timeout: { d_ms: 60000 },
     cancellationToken,
   });
@@ -153,8 +153,11 @@ export async function queryPurseForPeerPullCredit(
       pub: reserve.reservePub,
     },
   });
-
-  await ws.db
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.PeerPullCredit,
+    pursePub: pullIni.pursePub,
+  });
+  const transitionInfo = await ws.db
     .mktx((x) => [x.peerPullPaymentInitiations])
     .runReadWrite(async (tx) => {
       const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub);
@@ -162,11 +165,15 @@ export async function queryPurseForPeerPullCredit(
         logger.warn("peerPullPaymentInitiation not found anymore");
         return;
       }
+      const oldTxState = computePeerPullCreditTransactionState(finPi);
       if (finPi.status === PeerPullPaymentInitiationStatus.PendingReady) {
-        finPi.status = PeerPullPaymentInitiationStatus.DonePurseDeposited;
+        finPi.status = PeerPullPaymentInitiationStatus.PendingWithdrawing;
       }
       await tx.peerPullPaymentInitiations.put(finPi);
+      const newTxState = computePeerPullCreditTransactionState(finPi);
+      return { oldTxState, newTxState };
     });
+  notifyTransition(ws, transactionId, transitionInfo);
   return {
     ready: true,
   };
@@ -293,91 +300,68 @@ async function processPeerPullCreditAbortingDeletePurse(
   return OperationAttemptResult.pendingEmpty();
 }
 
-export async function processPeerPullCredit(
+async function handlePeerPullCreditWithdrawing(
   ws: InternalWalletState,
-  pursePub: string,
+  pullIni: PeerPullPaymentInitiationRecord,
 ): Promise<OperationAttemptResult> {
-  const pullIni = await ws.db
-    .mktx((x) => [x.peerPullPaymentInitiations])
-    .runReadOnly(async (tx) => {
-      return tx.peerPullPaymentInitiations.get(pursePub);
-    });
-  if (!pullIni) {
-    throw Error("peer pull payment initiation not found in database");
+  if (!pullIni.withdrawalGroupId) {
+    throw Error("invalid db state (withdrawing, but no withdrawal group ID");
   }
-
-  const retryTag = constructTaskIdentifier({
-    tag: PendingTaskType.PeerPullCredit,
-    pursePub,
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.PeerPullCredit,
+    pursePub: pullIni.pursePub,
   });
-
-  // We're already running!
-  if (ws.activeLongpoll[retryTag]) {
-    logger.info("peer-pull-credit already in long-polling, returning!");
-    return {
-      type: OperationAttemptResultType.Longpoll,
-    };
-  }
-
-  logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
-
-  switch (pullIni.status) {
-    case PeerPullPaymentInitiationStatus.DonePurseDeposited: {
-      // We implement this case so that the "retry" action on a 
peer-pull-credit transaction
-      // also retries the withdrawal task.
-
-      logger.warn(
-        "peer pull payment initiation is already finished, retrying 
withdrawal",
+  const wgId = pullIni.withdrawalGroupId;
+  let finished: boolean = false;
+  const transitionInfo = await ws.db
+    .mktx((x) => [x.peerPullPaymentInitiations, x.withdrawalGroups])
+    .runReadWrite(async (tx) => {
+      const ppi = await tx.peerPullPaymentInitiations.get(
+        pullIni.pursePub,
       );
-
-      const withdrawalGroupId = pullIni.withdrawalGroupId;
-
-      if (withdrawalGroupId) {
-        const taskId = constructTaskIdentifier({
-          tag: PendingTaskType.Withdraw,
-          withdrawalGroupId,
-        });
-        stopLongpolling(ws, taskId);
-        await resetOperationTimeout(ws, taskId);
-        await runOperationWithErrorReporting(ws, taskId, () =>
-          processWithdrawalGroup(ws, withdrawalGroupId),
-        );
+      if (!ppi) {
+        finished = true;
+        return;
       }
+      if (ppi.status !== PeerPullPaymentInitiationStatus.PendingWithdrawing) {
+        finished = true;
+        return;
+      }
+      const oldTxState = computePeerPullCreditTransactionState(ppi);
+      const wg = await tx.withdrawalGroups.get(wgId);
+      if (!wg) {
+        // FIXME: Fail the operation instead?
+        return undefined;
+      }
+      switch (wg.status) {
+        case WithdrawalGroupStatus.Finished:
+          finished = true;
+          ppi.status = PeerPullPaymentInitiationStatus.Done;
+          break;
+        // FIXME: Also handle other final states!
+      }
+      await tx.peerPullPaymentInitiations.put(ppi);
+      const newTxState = computePeerPullCreditTransactionState(ppi);
       return {
-        type: OperationAttemptResultType.Finished,
-        result: undefined,
-      };
-    }
-    case PeerPullPaymentInitiationStatus.PendingReady:
-      runLongpollAsync(ws, retryTag, async (cancellationToken) =>
-        queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
-      );
-      logger.trace(
-        "returning early from processPeerPullCredit for long-polling in 
background",
-      );
-      return {
-        type: OperationAttemptResultType.Longpoll,
+        oldTxState,
+        newTxState,
       };
-    case PeerPullPaymentInitiationStatus.PendingMergeKycRequired: {
-      if (!pullIni.kycInfo) {
-        throw Error("invalid state, kycInfo required");
-      }
-      return await longpollKycStatus(
-        ws,
-        pursePub,
-        pullIni.exchangeBaseUrl,
-        pullIni.kycInfo,
-        "individual",
-      );
-    }
-    case PeerPullPaymentInitiationStatus.PendingCreatePurse:
-      break;
-    case PeerPullPaymentInitiationStatus.AbortingDeletePurse:
-      return await processPeerPullCreditAbortingDeletePurse(ws, pullIni);
-    default:
-      throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`);
+    });
+  notifyTransition(ws, transactionId, transitionInfo);
+  if (finished) {
+    return OperationAttemptResult.finishedEmpty();
+  } else {
+    // FIXME: Return indicator that we depend on the other operation!
+    return OperationAttemptResult.pendingEmpty();
   }
+}
 
+async function handlePeerPullCreditCreatePurse(
+  ws: InternalWalletState,
+  pullIni: PeerPullPaymentInitiationRecord,
+): Promise<OperationAttemptResult> {
+  const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
+  const pursePub = pullIni.pursePub;
   const mergeReserve = await ws.db
     .mktx((x) => [x.reserves])
     .runReadOnly(async (tx) => {
@@ -388,8 +372,6 @@ export async function processPeerPullCredit(
     throw Error("merge reserve for peer pull payment not found in database");
   }
 
-  const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
-
   const reservePayto = talerPaytoFromExchangeReserve(
     pullIni.exchangeBaseUrl,
     mergeReserve.reservePub,
@@ -474,6 +456,104 @@ export async function processPeerPullCredit(
   };
 }
 
+export async function processPeerPullCredit(
+  ws: InternalWalletState,
+  pursePub: string,
+): Promise<OperationAttemptResult> {
+  const pullIni = await ws.db
+    .mktx((x) => [x.peerPullPaymentInitiations])
+    .runReadOnly(async (tx) => {
+      return tx.peerPullPaymentInitiations.get(pursePub);
+    });
+  if (!pullIni) {
+    throw Error("peer pull payment initiation not found in database");
+  }
+
+  const retryTag = constructTaskIdentifier({
+    tag: PendingTaskType.PeerPullCredit,
+    pursePub,
+  });
+
+  // We're already running!
+  if (ws.activeLongpoll[retryTag]) {
+    logger.info("peer-pull-credit already in long-polling, returning!");
+    return {
+      type: OperationAttemptResultType.Longpoll,
+    };
+  }
+
+  logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
+
+  switch (pullIni.status) {
+    case PeerPullPaymentInitiationStatus.Done: {
+      // We implement this case so that the "retry" action on a 
peer-pull-credit transaction
+      // also retries the withdrawal task.
+
+      logger.warn(
+        "peer pull payment initiation is already finished, retrying 
withdrawal",
+      );
+
+      const withdrawalGroupId = pullIni.withdrawalGroupId;
+
+      if (withdrawalGroupId) {
+        const taskId = constructTaskIdentifier({
+          tag: PendingTaskType.Withdraw,
+          withdrawalGroupId,
+        });
+        stopLongpolling(ws, taskId);
+        await resetOperationTimeout(ws, taskId);
+        await runOperationWithErrorReporting(ws, taskId, () =>
+          processWithdrawalGroup(ws, withdrawalGroupId),
+        );
+      }
+      return {
+        type: OperationAttemptResultType.Finished,
+        result: undefined,
+      };
+    }
+    case PeerPullPaymentInitiationStatus.PendingReady:
+      runLongpollAsync(ws, retryTag, async (cancellationToken) =>
+        queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
+      );
+      logger.trace(
+        "returning early from processPeerPullCredit for long-polling in 
background",
+      );
+      return {
+        type: OperationAttemptResultType.Longpoll,
+      };
+    case PeerPullPaymentInitiationStatus.PendingMergeKycRequired: {
+      if (!pullIni.kycInfo) {
+        throw Error("invalid state, kycInfo required");
+      }
+      return await longpollKycStatus(
+        ws,
+        pursePub,
+        pullIni.exchangeBaseUrl,
+        pullIni.kycInfo,
+        "individual",
+      );
+    }
+    case PeerPullPaymentInitiationStatus.PendingCreatePurse:
+      return handlePeerPullCreditCreatePurse(ws, pullIni);
+    case PeerPullPaymentInitiationStatus.AbortingDeletePurse:
+      return await processPeerPullCreditAbortingDeletePurse(ws, pullIni);
+    case PeerPullPaymentInitiationStatus.PendingWithdrawing:
+      return handlePeerPullCreditWithdrawing(ws, pullIni);
+    case PeerPullPaymentInitiationStatus.Aborted:
+    case PeerPullPaymentInitiationStatus.Failed:
+    case PeerPullPaymentInitiationStatus.SuspendedAbortingDeletePurse:
+    case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
+    case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
+    case PeerPullPaymentInitiationStatus.SuspendedReady:
+    case PeerPullPaymentInitiationStatus.SuspendedWithdrawing:
+      break;
+    default:
+      assertUnreachable(pullIni.status);
+  }
+
+  return OperationAttemptResult.finishedEmpty();
+}
+
 async function processPeerPullCreditKycRequired(
   ws: InternalWalletState,
   peerIni: PeerPullPaymentInitiationRecord,
@@ -789,7 +869,7 @@ export async function suspendPeerPullCreditTransaction(
           newStatus =
             PeerPullPaymentInitiationStatus.SuspendedAbortingDeletePurse;
           break;
-        case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+        case PeerPullPaymentInitiationStatus.Done:
         case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
         case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
         case PeerPullPaymentInitiationStatus.SuspendedReady:
@@ -848,7 +928,7 @@ export async function abortPeerPullCreditTransaction(
         case PeerPullPaymentInitiationStatus.PendingReady:
           newStatus = PeerPullPaymentInitiationStatus.AbortingDeletePurse;
           break;
-        case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+        case PeerPullPaymentInitiationStatus.Done:
         case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
         case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
         case PeerPullPaymentInitiationStatus.SuspendedReady:
@@ -903,7 +983,7 @@ export async function failPeerPullCreditTransaction(
         case PeerPullPaymentInitiationStatus.PendingMergeKycRequired:
         case PeerPullPaymentInitiationStatus.PendingWithdrawing:
         case PeerPullPaymentInitiationStatus.PendingReady:
-        case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+        case PeerPullPaymentInitiationStatus.Done:
         case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
         case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
         case PeerPullPaymentInitiationStatus.SuspendedReady:
@@ -961,7 +1041,7 @@ export async function resumePeerPullCreditTransaction(
         case PeerPullPaymentInitiationStatus.PendingWithdrawing:
         case PeerPullPaymentInitiationStatus.PendingReady:
         case PeerPullPaymentInitiationStatus.AbortingDeletePurse:
-        case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+        case PeerPullPaymentInitiationStatus.Done:
         case PeerPullPaymentInitiationStatus.Failed:
         case PeerPullPaymentInitiationStatus.Aborted:
           break;
@@ -1018,7 +1098,7 @@ export function computePeerPullCreditTransactionState(
         major: TransactionMajorState.Pending,
         minor: TransactionMinorState.Ready,
       };
-    case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+    case PeerPullPaymentInitiationStatus.Done:
       return {
         major: TransactionMajorState.Done,
       };
@@ -1078,7 +1158,7 @@ export function computePeerPullCreditTransactionActions(
       return [TransactionAction.Abort, TransactionAction.Suspend];
     case PeerPullPaymentInitiationStatus.PendingReady:
       return [TransactionAction.Abort, TransactionAction.Suspend];
-    case PeerPullPaymentInitiationStatus.DonePurseDeposited:
+    case PeerPullPaymentInitiationStatus.Done:
       return [TransactionAction.Delete];
     case PeerPullPaymentInitiationStatus.PendingWithdrawing:
       return [TransactionAction.Abort, TransactionAction.Suspend];
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index 280ad567f..0595a9e67 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -113,11 +113,18 @@ async function handlePurseCreationConflict(
   }
 
   const repair: PeerCoinRepair = {
-    coinPubs: sel.coinPubs,
-    contribs: sel.contributions.map((x) => Amounts.parseOrThrow(x)),
+    coinPubs: [],
+    contribs: [],
     exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
   };
 
+  for (let i = 0; i < sel.coinPubs.length; i++) {
+    if (sel.coinPubs[i] != brokenCoinPub) {
+      repair.coinPubs.push(sel.coinPubs[i]);
+      repair.contribs.push(Amounts.parseOrThrow(sel.contributions[i]));
+    }
+  }
+
   const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
 
   if (coinSelRes.type == "failure") {
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
index 1a79c7b87..9b563b37e 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
@@ -15,76 +15,74 @@
  */
 
 import {
-  PreparePeerPushCredit,
-  PreparePeerPushCreditResponse,
-  parsePayPushUri,
-  codecForPeerContractTerms,
-  TransactionType,
-  encodeCrock,
-  eddsaGetPublic,
-  decodeCrock,
-  codecForExchangeGetContractResponse,
-  getRandomBytes,
-  ContractTermsUtil,
-  Amounts,
-  TalerPreciseTimestamp,
   AcceptPeerPushPaymentResponse,
+  Amounts,
   ConfirmPeerPushCreditRequest,
+  ContractTermsUtil,
   ExchangePurseMergeRequest,
   HttpStatusCode,
+  Logger,
   PeerContractTerms,
+  PreparePeerPushCredit,
+  PreparePeerPushCreditResponse,
+  TalerErrorCode,
+  TalerPreciseTimestamp,
   TalerProtocolTimestamp,
-  WalletAccountMergeFlags,
-  codecForAny,
-  codecForWalletKycUuid,
-  j2s,
-  Logger,
-  ExchangePurseDeposits,
   TransactionAction,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
-  TalerError,
-  TalerErrorCode,
+  TransactionType,
+  WalletAccountMergeFlags,
   WalletKycUuid,
+  codecForAny,
+  codecForExchangeGetContractResponse,
+  codecForPeerContractTerms,
+  codecForWalletKycUuid,
+  decodeCrock,
+  eddsaGetPublic,
+  encodeCrock,
+  getRandomBytes,
+  j2s,
   makeErrorDetail,
+  parsePayPushUri,
 } from "@gnu-taler/taler-util";
 import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
 import {
   InternalWalletState,
   KycPendingInfo,
   KycUserType,
-  PeerPullDebitRecordStatus,
   PeerPushPaymentIncomingRecord,
   PeerPushPaymentIncomingStatus,
   PendingTaskType,
   WithdrawalGroupStatus,
   WithdrawalRecordType,
 } from "../index.js";
+import { assertUnreachable } from "../util/assertUnreachable.js";
+import { checkDbInvariant } from "../util/invariants.js";
+import {
+  OperationAttemptResult,
+  OperationAttemptResultType,
+  constructTaskIdentifier,
+} from "../util/retries.js";
+import { runLongpollAsync } from "./common.js";
 import { updateExchangeFromUrl } from "./exchanges.js";
 import {
   codecForExchangePurseStatus,
   getMergeReserveInfo,
-  queryCoinInfosForSelection,
   talerPaytoFromExchangeReserve,
 } from "./pay-peer-common.js";
 import {
+  TransitionInfo,
   constructTransactionIdentifier,
   notifyTransition,
   stopLongpolling,
 } from "./transactions.js";
 import {
   getExchangeWithdrawalInfo,
-  internalCreateWithdrawalGroup,
+  internalPerformCreateWithdrawalGroup,
+  internalPrepareCreateWithdrawalGroup,
 } from "./withdraw.js";
-import { checkDbInvariant } from "../util/invariants.js";
-import {
-  OperationAttemptResult,
-  OperationAttemptResultType,
-  constructTaskIdentifier,
-} from "../util/retries.js";
-import { assertUnreachable } from "../util/assertUnreachable.js";
-import { runLongpollAsync } from "./common.js";
 
 const logger = new Logger("pay-peer-push-credit.ts");
 
@@ -148,7 +146,7 @@ export async function preparePeerPushCredit(
 
   const getContractUrl = new URL(`contracts/${contractPub}`, exchangeBaseUrl);
 
-  const contractHttpResp = await ws.http.get(getContractUrl.href);
+  const contractHttpResp = await ws.http.fetch(getContractUrl.href);
 
   const contractResp = await readSuccessResponseJsonOrThrow(
     contractHttpResp,
@@ -375,51 +373,19 @@ async function processPeerPushCreditKycRequired(
   }
 }
 
-export async function processPeerPushCredit(
+async function handlePendingMerge(
   ws: InternalWalletState,
-  peerPushPaymentIncomingId: string,
+  peerInc: PeerPushPaymentIncomingRecord,
+  contractTerms: PeerContractTerms,
 ): Promise<OperationAttemptResult> {
-  let peerInc: PeerPushPaymentIncomingRecord | undefined;
-  let contractTerms: PeerContractTerms | undefined;
-  await ws.db
-    .mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
-    .runReadWrite(async (tx) => {
-      peerInc = await 
tx.peerPushPaymentIncoming.get(peerPushPaymentIncomingId);
-      if (!peerInc) {
-        return;
-      }
-      const ctRec = await tx.contractTerms.get(peerInc.contractTermsHash);
-      if (ctRec) {
-        contractTerms = ctRec.contractTermsRaw;
-      }
-      await tx.peerPushPaymentIncoming.put(peerInc);
-    });
-
-  if (!peerInc) {
-    throw Error(
-      `can't accept unknown incoming p2p push payment 
(${peerPushPaymentIncomingId})`,
-    );
-  }
-
-  checkDbInvariant(!!contractTerms);
+  const { peerPushPaymentIncomingId } = peerInc;
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.PeerPushCredit,
+    peerPushPaymentIncomingId,
+  });
 
   const amount = Amounts.parseOrThrow(contractTerms.amount);
 
-  if (
-    peerInc.status === PeerPushPaymentIncomingStatus.PendingMergeKycRequired
-  ) {
-    if (!peerInc.kycInfo) {
-      throw Error("invalid state, kycInfo required");
-    }
-    return await longpollKycStatus(
-      ws,
-      peerPushPaymentIncomingId,
-      peerInc.exchangeBaseUrl,
-      peerInc.kycInfo,
-      "individual",
-    );
-  }
-
   const mergeReserveInfo = await getMergeReserveInfo(ws, {
     exchangeBaseUrl: peerInc.exchangeBaseUrl,
   });
@@ -475,7 +441,7 @@ export async function processPeerPushCredit(
   );
   logger.trace(`merge response: ${j2s(res)}`);
 
-  await internalCreateWithdrawalGroup(ws, {
+  const withdrawalGroupPrep = await internalPrepareCreateWithdrawalGroup(ws, {
     amount,
     wgInfo: {
       withdrawalType: WithdrawalRecordType.PeerPushCredit,
@@ -490,23 +456,51 @@ export async function processPeerPushCredit(
     },
   });
 
-  await ws.db
-    .mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
+  const txRes = await ws.db
+    .mktx((x) => [
+      x.contractTerms,
+      x.peerPushPaymentIncoming,
+      x.withdrawalGroups,
+      x.reserves,
+      x.exchanges,
+      x.exchangeDetails,
+      x.exchangeTrust,
+    ])
     .runReadWrite(async (tx) => {
       const peerInc = await tx.peerPushPaymentIncoming.get(
         peerPushPaymentIncomingId,
       );
       if (!peerInc) {
-        return;
+        return undefined;
       }
-      if (
-        peerInc.status === PeerPushPaymentIncomingStatus.PendingMerge ||
-        peerInc.status === 
PeerPushPaymentIncomingStatus.PendingMergeKycRequired
-      ) {
-        peerInc.status = PeerPushPaymentIncomingStatus.Done;
+      let withdrawalTransition: TransitionInfo | undefined;
+      const oldTxState = computePeerPushCreditTransactionState(peerInc);
+      switch (peerInc.status) {
+        case PeerPushPaymentIncomingStatus.PendingMerge:
+        case PeerPushPaymentIncomingStatus.PendingMergeKycRequired: {
+          peerInc.status = PeerPushPaymentIncomingStatus.PendingWithdrawing;
+          const wgRes = await internalPerformCreateWithdrawalGroup(
+            ws,
+            tx,
+            withdrawalGroupPrep,
+          );
+          peerInc.withdrawalGroupId = wgRes.withdrawalGroup.withdrawalGroupId;
+          break;
+        }
       }
       await tx.peerPushPaymentIncoming.put(peerInc);
+      const newTxState = computePeerPushCreditTransactionState(peerInc);
+      return {
+        peerPushCreditTransition: { oldTxState, newTxState },
+        withdrawalTransition,
+      };
     });
+  notifyTransition(
+    ws,
+    withdrawalGroupPrep.transactionId,
+    txRes?.withdrawalTransition,
+  );
+  notifyTransition(ws, transactionId, txRes?.peerPushCreditTransition);
 
   return {
     type: OperationAttemptResultType.Finished,
@@ -514,6 +508,115 @@ export async function processPeerPushCredit(
   };
 }
 
+async function handlePendingWithdrawing(
+  ws: InternalWalletState,
+  peerInc: PeerPushPaymentIncomingRecord,
+): Promise<OperationAttemptResult> {
+  if (!peerInc.withdrawalGroupId) {
+    throw Error("invalid db state (withdrawing, but no withdrawal group ID");
+  }
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.PeerPushCredit,
+    peerPushPaymentIncomingId: peerInc.peerPushPaymentIncomingId,
+  });
+  const wgId = peerInc.withdrawalGroupId;
+  let finished: boolean = false;
+  const transitionInfo = await ws.db
+    .mktx((x) => [x.peerPushPaymentIncoming, x.withdrawalGroups])
+    .runReadWrite(async (tx) => {
+      const ppi = await tx.peerPushPaymentIncoming.get(
+        peerInc.peerPushPaymentIncomingId,
+      );
+      if (!ppi) {
+        finished = true;
+        return;
+      }
+      if (ppi.status !== PeerPushPaymentIncomingStatus.PendingWithdrawing) {
+        finished = true;
+        return;
+      }
+      const oldTxState = computePeerPushCreditTransactionState(ppi);
+      const wg = await tx.withdrawalGroups.get(wgId);
+      if (!wg) {
+        // FIXME: Fail the operation instead?
+        return undefined;
+      }
+      switch (wg.status) {
+        case WithdrawalGroupStatus.Finished:
+          finished = true;
+          ppi.status = PeerPushPaymentIncomingStatus.Done;
+          break;
+        // FIXME: Also handle other final states!
+      }
+      await tx.peerPushPaymentIncoming.put(ppi);
+      const newTxState = computePeerPushCreditTransactionState(ppi);
+      return {
+        oldTxState,
+        newTxState,
+      };
+    });
+  notifyTransition(ws, transactionId, transitionInfo);
+  if (finished) {
+    return OperationAttemptResult.finishedEmpty();
+  } else {
+    // FIXME: Return indicator that we depend on the other operation!
+    return OperationAttemptResult.pendingEmpty();
+  }
+}
+
+export async function processPeerPushCredit(
+  ws: InternalWalletState,
+  peerPushPaymentIncomingId: string,
+): Promise<OperationAttemptResult> {
+  let peerInc: PeerPushPaymentIncomingRecord | undefined;
+  let contractTerms: PeerContractTerms | undefined;
+  await ws.db
+    .mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
+    .runReadWrite(async (tx) => {
+      peerInc = await 
tx.peerPushPaymentIncoming.get(peerPushPaymentIncomingId);
+      if (!peerInc) {
+        return;
+      }
+      const ctRec = await tx.contractTerms.get(peerInc.contractTermsHash);
+      if (ctRec) {
+        contractTerms = ctRec.contractTermsRaw;
+      }
+      await tx.peerPushPaymentIncoming.put(peerInc);
+    });
+
+  checkDbInvariant(!!contractTerms);
+
+  if (!peerInc) {
+    throw Error(
+      `can't accept unknown incoming p2p push payment 
(${peerPushPaymentIncomingId})`,
+    );
+  }
+
+  switch (peerInc.status) {
+    case PeerPushPaymentIncomingStatus.PendingMergeKycRequired: {
+      if (!peerInc.kycInfo) {
+        throw Error("invalid state, kycInfo required");
+      }
+      return await longpollKycStatus(
+        ws,
+        peerPushPaymentIncomingId,
+        peerInc.exchangeBaseUrl,
+        peerInc.kycInfo,
+        "individual",
+      );
+    }
+
+    case PeerPushPaymentIncomingStatus.PendingMerge:
+      return handlePendingMerge(ws, peerInc, contractTerms);
+
+    case PeerPushPaymentIncomingStatus.PendingWithdrawing:
+      return handlePendingWithdrawing(ws, peerInc);
+
+    default:
+      return OperationAttemptResult.finishedEmpty();
+  }
+}
+
 export async function confirmPeerPushCredit(
   ws: InternalWalletState,
   req: ConfirmPeerPushCreditRequest,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
index 33d317c6f..fc7e868dc 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
@@ -125,15 +125,21 @@ async function handlePurseCreationConflict(
   }
 
   const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount);
+  const sel = peerPushInitiation.coinSel;
 
   const repair: PeerCoinRepair = {
-    coinPubs: peerPushInitiation.coinSel.coinPubs,
-    contribs: peerPushInitiation.coinSel.contributions.map((x) =>
-      Amounts.parseOrThrow(x),
-    ),
+    coinPubs: [],
+    contribs: [],
     exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
   };
 
+  for (let i = 0; i < sel.coinPubs.length; i++) {
+    if (sel.coinPubs[i] != brokenCoinPub) {
+      repair.coinPubs.push(sel.coinPubs[i]);
+      repair.contribs.push(Amounts.parseOrThrow(sel.contributions[i]));
+    }
+  }
+
   const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
 
   if (coinSelRes.type == "failure") {
@@ -244,9 +250,10 @@ async function processPeerPushDebitCreateReserve(
     body: reqBody,
   });
 
-  const resp = await httpResp.json();
-
-  logger.info(`resp: ${j2s(resp)}`);
+  {
+    const resp = await httpResp.json();
+    logger.info(`resp: ${j2s(resp)}`);
+  }
 
   switch (httpResp.status) {
     case HttpStatusCode.Ok:
@@ -258,10 +265,10 @@ async function processPeerPushDebitCreateReserve(
     }
     case HttpStatusCode.Conflict: {
       // Handle double-spending
-      return handlePurseCreationConflict(ws, peerPushInitiation, resp);
+      return handlePurseCreationConflict(ws, peerPushInitiation, httpResp);
     }
     default: {
-      const errResp = await readTalerErrorResponse(resp);
+      const errResp = await readTalerErrorResponse(httpResp);
       return {
         type: OperationAttemptResultType.Error,
         errorDetail: errResp,
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts 
b/packages/taler-wallet-core/src/operations/transactions.ts
index b3bc0ebfc..b6dc2e8bd 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -1887,19 +1887,19 @@ export interface TransitionInfo {
 export function notifyTransition(
   ws: InternalWalletState,
   transactionId: string,
-  ti: TransitionInfo | undefined,
+  transitionInfo: TransitionInfo | undefined,
 ): void {
   if (
-    ti &&
+    transitionInfo &&
     !(
-      ti.oldTxState.major === ti.newTxState.major &&
-      ti.oldTxState.minor === ti.newTxState.minor
+      transitionInfo.oldTxState.major === transitionInfo.newTxState.major &&
+      transitionInfo.oldTxState.minor === transitionInfo.newTxState.minor
     )
   ) {
     ws.notify({
       type: NotificationType.TransactionStateTransition,
-      oldTxState: ti.oldTxState,
-      newTxState: ti.newTxState,
+      oldTxState: transitionInfo.oldTxState,
+      newTxState: transitionInfo.newTxState,
       transactionId,
     });
   }
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts 
b/packages/taler-wallet-core/src/operations/withdraw.ts
index 26149bd06..88389fd99 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -109,7 +109,11 @@ import {
   checkLogicInvariant,
   InvariantViolatedError,
 } from "../util/invariants.js";
-import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
+import {
+  DbAccess,
+  GetReadOnlyAccess,
+  GetReadWriteAccess,
+} from "../util/query.js";
 import {
   OperationAttemptResult,
   OperationAttemptResultType,
@@ -130,8 +134,13 @@ import {
   selectForcedWithdrawalDenominations,
   selectWithdrawalDenominations,
 } from "../util/coinSelection.js";
-import { PendingTaskType, isWithdrawableDenom } from "../index.js";
 import {
+  ExchangeDetailsRecord,
+  PendingTaskType,
+  isWithdrawableDenom,
+} from "../index.js";
+import {
+  TransitionInfo,
   constructTransactionIdentifier,
   notifyTransition,
   stopLongpolling,
@@ -2202,15 +2211,19 @@ async function processReserveBankStatus(
   }
 }
 
-/**
- * Create a withdrawal group.
- *
- * If a forcedWithdrawalGroupId is given and a
- * withdrawal group with this ID already exists,
- * the existing one is returned.  No conflict checking
- * of the other arguments is done in that case.
- */
-export async function internalCreateWithdrawalGroup(
+export interface PrepareCreateWithdrawalGroupResult {
+  withdrawalGroup: WithdrawalGroupRecord;
+  transactionId: string;
+  creationInfo?: {
+    isTrusted: boolean;
+    isAudited: boolean;
+    amount: AmountJson;
+    canonExchange: string;
+    exchangeDetails: ExchangeDetailsRecord;
+  };
+}
+
+export async function internalPrepareCreateWithdrawalGroup(
   ws: InternalWalletState,
   args: {
     reserveStatus: WithdrawalGroupStatus;
@@ -2222,7 +2235,7 @@ export async function internalCreateWithdrawalGroup(
     restrictAge?: number;
     wgInfo: WgInfo;
   },
-): Promise<WithdrawalGroupRecord> {
+): Promise<PrepareCreateWithdrawalGroupResult> {
   const reserveKeyPair =
     args.reserveKeyPair ?? (await ws.cryptoApi.createEddsaKeypair({}));
   const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now());
@@ -2240,18 +2253,18 @@ export async function internalCreateWithdrawalGroup(
       .runReadOnly(async (tx) => {
         return tx.withdrawalGroups.get(wgId);
       });
+
     if (existingWg) {
-      return existingWg;
+      const transactionId = constructTransactionIdentifier({
+        tag: TransactionType.Withdrawal,
+        withdrawalGroupId: existingWg.withdrawalGroupId,
+      });
+      return { withdrawalGroup: existingWg, transactionId };
     }
   } else {
     withdrawalGroupId = encodeCrock(getRandomBytes(32));
   }
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Withdrawal,
-    withdrawalGroupId,
-  });
-
   await updateWithdrawalDenoms(ws, canonExchange);
   const denoms = await getCandidateWithdrawalDenoms(ws, canonExchange);
 
@@ -2302,8 +2315,112 @@ export async function internalCreateWithdrawalGroup(
     ws,
     exchangeInfo.exchange,
   );
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Withdrawal,
+    withdrawalGroupId: withdrawalGroup.withdrawalGroupId,
+  });
 
-  const transitionInfo = await ws.db
+  return {
+    withdrawalGroup,
+    transactionId,
+    creationInfo: {
+      isAudited,
+      isTrusted,
+      canonExchange,
+      amount,
+      exchangeDetails,
+    },
+  };
+}
+
+export interface PerformCreateWithdrawalGroupResult {
+  withdrawalGroup: WithdrawalGroupRecord;
+  transitionInfo: TransitionInfo | undefined;
+}
+
+export async function internalPerformCreateWithdrawalGroup(
+  ws: InternalWalletState,
+  tx: GetReadWriteAccess<{
+    withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+    reserves: typeof WalletStoresV1.reserves;
+    exchanges: typeof WalletStoresV1.exchanges;
+    exchangeTrust: typeof WalletStoresV1.exchangeTrust;
+  }>,
+  prep: PrepareCreateWithdrawalGroupResult,
+): Promise<PerformCreateWithdrawalGroupResult> {
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Withdrawal,
+    withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId,
+  });
+  const { withdrawalGroup } = prep;
+  if (!prep.creationInfo) {
+    return { withdrawalGroup, transitionInfo: undefined };
+  }
+  const { isAudited, isTrusted, amount, canonExchange, exchangeDetails } =
+    prep.creationInfo;
+
+  await tx.withdrawalGroups.add(withdrawalGroup);
+  await tx.reserves.put({
+    reservePub: withdrawalGroup.reservePub,
+    reservePriv: withdrawalGroup.reservePriv,
+  });
+
+  const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
+  if (exchange) {
+    exchange.lastWithdrawal = TalerPreciseTimestamp.now();
+    await tx.exchanges.put(exchange);
+  }
+
+  if (!isAudited && !isTrusted) {
+    await tx.exchangeTrust.put({
+      currency: amount.currency,
+      exchangeBaseUrl: canonExchange,
+      exchangeMasterPub: exchangeDetails.masterPublicKey,
+      uids: [encodeCrock(getRandomBytes(32))],
+    });
+  }
+
+  const oldTxState = {
+    major: TransactionMajorState.None,
+    minor: undefined,
+  };
+  const newTxState = computeWithdrawalTransactionStatus(withdrawalGroup);
+  const transitionInfo = {
+    oldTxState,
+    newTxState,
+  };
+  notifyTransition(ws, transactionId, transitionInfo);
+
+  return { withdrawalGroup, transitionInfo };
+}
+
+/**
+ * Create a withdrawal group.
+ *
+ * If a forcedWithdrawalGroupId is given and a
+ * withdrawal group with this ID already exists,
+ * the existing one is returned.  No conflict checking
+ * of the other arguments is done in that case.
+ */
+export async function internalCreateWithdrawalGroup(
+  ws: InternalWalletState,
+  args: {
+    reserveStatus: WithdrawalGroupStatus;
+    amount: AmountJson;
+    exchangeBaseUrl: string;
+    forcedWithdrawalGroupId?: string;
+    forcedDenomSel?: ForcedDenomSel;
+    reserveKeyPair?: EddsaKeypair;
+    restrictAge?: number;
+    wgInfo: WgInfo;
+  },
+): Promise<WithdrawalGroupRecord> {
+  const prep = await internalPrepareCreateWithdrawalGroup(ws, args);
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Withdrawal,
+    withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId,
+  });
+  const res = await ws.db
     .mktx((x) => [
       x.withdrawalGroups,
       x.reserves,
@@ -2312,40 +2429,10 @@ export async function internalCreateWithdrawalGroup(
       x.exchangeTrust,
     ])
     .runReadWrite(async (tx) => {
-      await tx.withdrawalGroups.add(withdrawalGroup);
-      await tx.reserves.put({
-        reservePub: withdrawalGroup.reservePub,
-        reservePriv: withdrawalGroup.reservePriv,
-      });
-
-      const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
-      if (exchange) {
-        exchange.lastWithdrawal = TalerPreciseTimestamp.now();
-        await tx.exchanges.put(exchange);
-      }
-
-      if (!isAudited && !isTrusted) {
-        await tx.exchangeTrust.put({
-          currency: amount.currency,
-          exchangeBaseUrl: canonExchange,
-          exchangeMasterPub: exchangeDetails.masterPublicKey,
-          uids: [encodeCrock(getRandomBytes(32))],
-        });
-      }
-
-      const oldTxState = {
-        major: TransactionMajorState.None,
-      };
-      const newTxState = computeWithdrawalTransactionStatus(withdrawalGroup);
-      return {
-        oldTxState,
-        newTxState,
-      };
+      return await internalPerformCreateWithdrawalGroup(ws, tx, prep);
     });
-
-  notifyTransition(ws, transactionId, transitionInfo);
-
-  return withdrawalGroup;
+  notifyTransition(ws, transactionId, res.transitionInfo);
+  return res.withdrawalGroup;
 }
 
 export async function acceptWithdrawalFromUri(

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]