gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 02/02: wallet-core: fix deposit tx states, long-poll


From: gnunet
Subject: [taler-wallet-core] 02/02: wallet-core: fix deposit tx states, long-poll on kyc
Date: Mon, 26 Jun 2023 12:48:26 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit fca893038dc61267c9861186041e129c88b46da8
Author: Florian Dold <florian@dold.me>
AuthorDate: Mon Jun 26 12:48:20 2023 +0200

    wallet-core: fix deposit tx states, long-poll on kyc
---
 packages/taler-wallet-core/src/db.ts               |  26 +-
 .../taler-wallet-core/src/operations/balance.ts    |   1 -
 .../taler-wallet-core/src/operations/deposits.ts   | 732 +++++++++++++--------
 .../src/operations/pay-peer-push-credit.ts         |   2 +-
 .../taler-wallet-core/src/operations/pending.ts    |   6 +-
 .../taler-wallet-core/src/operations/withdraw.ts   |  12 +-
 6 files changed, 463 insertions(+), 316 deletions(-)

diff --git a/packages/taler-wallet-core/src/db.ts 
b/packages/taler-wallet-core/src/db.ts
index 9d0efbc6a..ab2e95c23 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -910,14 +910,6 @@ export enum RefreshOperationStatus {
   Failed = 51 /* DORMANT_START + 1 */,
 }
 
-export enum DepositGroupOperationStatus {
-  Pending = 10,
-  AbortingWithRefresh = 11,
-
-  Finished = 50,
-  Failed = 51,
-}
-
 /**
  * Status of a single element of a deposit group.
  */
@@ -1653,11 +1645,15 @@ export interface BackupProviderRecord {
 }
 
 export enum DepositOperationStatus {
-  Pending = 10,
+  PendingDeposit = 10,
   Aborting = 11,
+  PendingTrack = 12,
+  PendingKyc = 13,
 
-  Suspended = 20,
+  SuspendedDeposit = 20,
   SuspendedAborting = 21,
+  SuspendedTrack = 22,
+  SuspendedKyc = 23,
 
   Finished = 50,
   Failed = 51,
@@ -1737,12 +1733,22 @@ export interface DepositGroupRecord {
    */
   abortRefreshGroupId?: string;
 
+  kycInfo?: DepositKycInfo;
+
   // FIXME: Do we need this and should it be in this object store?
   trackingState?: {
     [signature: string]: DepositTrackingInfo;
   };
 }
 
+export interface DepositKycInfo {
+  kycUrl: string;
+  requirementRow: number;
+  paytoHash: string;
+  exchangeBaseUrl: string;
+}
+
+
 /**
  * Record for a deposits that the wallet observed
  * as a result of double spending, but which is not
diff --git a/packages/taler-wallet-core/src/operations/balance.ts 
b/packages/taler-wallet-core/src/operations/balance.ts
index 59c49deaa..3ab6649d7 100644
--- a/packages/taler-wallet-core/src/operations/balance.ts
+++ b/packages/taler-wallet-core/src/operations/balance.ts
@@ -108,7 +108,6 @@ function computeRefreshGroupAvailableAmount(r: 
RefreshGroupRecord): AmountJson {
 export async function getBalancesInsideTransaction(
   ws: InternalWalletState,
   tx: GetReadOnlyAccess<{
-    coins: typeof WalletStoresV1.coins;
     coinAvailability: typeof WalletStoresV1.coinAvailability;
     refreshGroups: typeof WalletStoresV1.refreshGroups;
     withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts 
b/packages/taler-wallet-core/src/operations/deposits.ts
index 8fd49858f..b771fc009 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -1,6 +1,6 @@
 /*
  This file is part of GNU Taler
- (C) 2021 Taler Systems S.A.
+ (C) 2021-2023 Taler Systems S.A.
 
  GNU Taler is free software; you can redistribute it and/or modify it under the
  terms of the GNU General Public License as published by the Free Software
@@ -83,6 +83,7 @@ import { readSuccessResponseJsonOrThrow } from 
"@gnu-taler/taler-util/http";
 import {
   constructTaskIdentifier,
   OperationAttemptResult,
+  runLongpollAsync,
   spendCoins,
   TombstoneTag,
 } from "./common.js";
@@ -100,6 +101,7 @@ import {
   stopLongpolling,
 } from "./transactions.js";
 import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
+import { assertUnreachable } from "../util/assertUnreachable.js";
 
 /**
  * Logger.
@@ -114,51 +116,36 @@ export function computeDepositTransactionStatus(
   dg: DepositGroupRecord,
 ): TransactionState {
   switch (dg.operationStatus) {
-    case DepositOperationStatus.Finished: {
+    case DepositOperationStatus.Finished:
       return {
         major: TransactionMajorState.Done,
       };
-    }
-    // FIXME: We should actually use separate pending states for this!
-    case DepositOperationStatus.Pending: {
-      const numTotal = dg.payCoinSelection.coinPubs.length;
-      let numDeposited = 0;
-      let numKycRequired = 0;
-      let numWired = 0;
-      for (let i = 0; i < numTotal; i++) {
-        if (dg.depositedPerCoin[i]) {
-          numDeposited++;
-        }
-        switch (dg.transactionPerCoin[i]) {
-          case DepositElementStatus.KycRequired:
-            numKycRequired++;
-            break;
-          case DepositElementStatus.Wired:
-            numWired++;
-            break;
-        }
-      }
-
-      if (numKycRequired > 0) {
-        return {
-          major: TransactionMajorState.Pending,
-          minor: TransactionMinorState.KycRequired,
-        };
-      }
-
-      if (numDeposited == numTotal) {
-        return {
-          major: TransactionMajorState.Pending,
-          minor: TransactionMinorState.Track,
-        };
-      }
-
+    case DepositOperationStatus.PendingDeposit:
       return {
         major: TransactionMajorState.Pending,
         minor: TransactionMinorState.Deposit,
       };
-    }
-    case DepositOperationStatus.Suspended:
+    case DepositOperationStatus.PendingKyc:
+      return {
+        major: TransactionMajorState.Pending,
+        minor: TransactionMinorState.KycRequired,
+      };
+    case DepositOperationStatus.PendingTrack:
+      return {
+        major: TransactionMajorState.Pending,
+        minor: TransactionMinorState.Track,
+      };
+    case DepositOperationStatus.SuspendedKyc:
+      return {
+        major: TransactionMajorState.Suspended,
+        minor: TransactionMinorState.KycRequired,
+      };
+    case DepositOperationStatus.SuspendedTrack:
+      return {
+        major: TransactionMajorState.Suspended,
+        minor: TransactionMinorState.Track,
+      };
+    case DepositOperationStatus.SuspendedDeposit:
       return {
         major: TransactionMajorState.Suspended,
       };
@@ -179,7 +166,7 @@ export function computeDepositTransactionStatus(
         major: TransactionMajorState.SuspendedAborting,
       };
     default:
-      throw Error(`unexpected deposit group state (${dg.operationStatus})`);
+      assertUnreachable(dg.operationStatus);
   }
 }
 
@@ -187,39 +174,11 @@ export function computeDepositTransactionActions(
   dg: DepositGroupRecord,
 ): TransactionAction[] {
   switch (dg.operationStatus) {
-    case DepositOperationStatus.Finished: {
+    case DepositOperationStatus.Finished:
       return [TransactionAction.Delete];
-    }
-    case DepositOperationStatus.Pending: {
-      const numTotal = dg.payCoinSelection.coinPubs.length;
-      let numDeposited = 0;
-      let numKycRequired = 0;
-      let numWired = 0;
-      for (let i = 0; i < numTotal; i++) {
-        if (dg.depositedPerCoin[i]) {
-          numDeposited++;
-        }
-        switch (dg.transactionPerCoin[i]) {
-          case DepositElementStatus.KycRequired:
-            numKycRequired++;
-            break;
-          case DepositElementStatus.Wired:
-            numWired++;
-            break;
-        }
-      }
-
-      if (numKycRequired > 0) {
-        return [TransactionAction.Suspend, TransactionAction.Fail];
-      }
-
-      if (numDeposited == numTotal) {
-        return [TransactionAction.Suspend, TransactionAction.Fail];
-      }
-
+    case DepositOperationStatus.PendingDeposit:
       return [TransactionAction.Suspend, TransactionAction.Abort];
-    }
-    case DepositOperationStatus.Suspended:
+    case DepositOperationStatus.SuspendedDeposit:
       return [TransactionAction.Resume];
     case DepositOperationStatus.Aborting:
       return [TransactionAction.Fail, TransactionAction.Suspend];
@@ -229,8 +188,16 @@ export function computeDepositTransactionActions(
       return [TransactionAction.Delete];
     case DepositOperationStatus.SuspendedAborting:
       return [TransactionAction.Resume, TransactionAction.Fail];
+    case DepositOperationStatus.PendingKyc:
+      return [TransactionAction.Suspend, TransactionAction.Fail];
+    case DepositOperationStatus.PendingTrack:
+      return [TransactionAction.Suspend, TransactionAction.Abort];
+    case DepositOperationStatus.SuspendedKyc:
+      return [TransactionAction.Resume, TransactionAction.Fail];
+    case DepositOperationStatus.SuspendedTrack:
+      return [TransactionAction.Resume, TransactionAction.Abort];
     default:
-      throw Error(`unexpected deposit group state (${dg.operationStatus})`);
+      assertUnreachable(dg.operationStatus);
   }
 }
 
@@ -260,15 +227,15 @@ export async function suspendDepositGroup(
       switch (dg.operationStatus) {
         case DepositOperationStatus.Finished:
           return undefined;
-        case DepositOperationStatus.Pending: {
-          dg.operationStatus = DepositOperationStatus.Suspended;
+        case DepositOperationStatus.PendingDeposit: {
+          dg.operationStatus = DepositOperationStatus.SuspendedDeposit;
           await tx.depositGroups.put(dg);
           return {
             oldTxState: oldState,
             newTxState: computeDepositTransactionStatus(dg),
           };
         }
-        case DepositOperationStatus.Suspended:
+        case DepositOperationStatus.SuspendedDeposit:
           return undefined;
       }
       return undefined;
@@ -299,11 +266,11 @@ export async function resumeDepositGroup(
       switch (dg.operationStatus) {
         case DepositOperationStatus.Finished:
           return;
-        case DepositOperationStatus.Pending: {
+        case DepositOperationStatus.PendingDeposit: {
           return;
         }
-        case DepositOperationStatus.Suspended:
-          dg.operationStatus = DepositOperationStatus.Pending;
+        case DepositOperationStatus.SuspendedDeposit:
+          dg.operationStatus = DepositOperationStatus.PendingDeposit;
           await tx.depositGroups.put(dg);
           return {
             oldTxState: oldState,
@@ -342,7 +309,7 @@ export async function abortDepositGroup(
       switch (dg.operationStatus) {
         case DepositOperationStatus.Finished:
           return undefined;
-        case DepositOperationStatus.Pending: {
+        case DepositOperationStatus.PendingDeposit: {
           dg.operationStatus = DepositOperationStatus.Aborting;
           await tx.depositGroups.put(dg);
           return {
@@ -350,7 +317,7 @@ export async function abortDepositGroup(
             newTxState: computeDepositTransactionStatus(dg),
           };
         }
-        case DepositOperationStatus.Suspended:
+        case DepositOperationStatus.SuspendedDeposit:
           // FIXME: Can we abort a suspended transaction?!
           return undefined;
       }
@@ -633,251 +600,431 @@ async function refundDepositGroup(
   return OperationAttemptResult.pendingEmpty();
 }
 
-/**
- * Process a deposit group that is not in its final state yet.
- */
-export async function processDepositGroup(
+async function processDepositGroupAborting(
   ws: InternalWalletState,
-  depositGroupId: string,
-  options: {
-    cancellationToken?: CancellationToken;
-  } = {},
+  depositGroup: DepositGroupRecord,
 ): Promise<OperationAttemptResult> {
-  const depositGroup = await ws.db
-    .mktx((x) => [x.depositGroups])
-    .runReadOnly(async (tx) => {
-      return tx.depositGroups.get(depositGroupId);
-    });
-  if (!depositGroup) {
-    logger.warn(`deposit group ${depositGroupId} not found`);
-    return OperationAttemptResult.finishedEmpty();
-  }
-  if (depositGroup.timestampFinished) {
-    logger.trace(`deposit group ${depositGroupId} already finished`);
-    return OperationAttemptResult.finishedEmpty();
+  logger.info("processing deposit tx in 'aborting'");
+  const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
+  if (!abortRefreshGroupId) {
+    logger.info("refunding deposit group");
+    return refundDepositGroup(ws, depositGroup);
   }
+  logger.info("waiting for refresh");
+  return waitForRefreshOnDepositGroup(ws, depositGroup);
+}
 
+async function processDepositGroupPendingKyc(
+  ws: InternalWalletState,
+  depositGroup: DepositGroupRecord,
+): Promise<OperationAttemptResult> {
+  const { depositGroupId } = depositGroup;
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.Deposit,
     depositGroupId,
   });
+  const retryTag = constructTaskIdentifier({
+    tag: PendingTaskType.Deposit,
+    depositGroupId,
+  });
 
-  const txStateOld = computeDepositTransactionStatus(depositGroup);
+  const kycInfo = depositGroup.kycInfo;
+  const userType = "individual";
 
-  if (depositGroup.operationStatus === DepositOperationStatus.Pending) {
-    const contractData = extractContractData(
-      depositGroup.contractTermsRaw,
-      depositGroup.contractTermsHash,
-      "",
-    );
+  if (!kycInfo) {
+    throw Error("invalid DB state, in pending(kyc), but no kycInfo present");
+  }
 
-    // Check for cancellation before expensive operations.
-    options.cancellationToken?.throwIfCancelled();
-    // FIXME: Cache these!
-    const depositPermissions = await generateDepositPermissions(
-      ws,
-      depositGroup.payCoinSelection,
-      contractData,
+  runLongpollAsync(ws, retryTag, async (ct) => {
+    const url = new URL(
+      `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+      kycInfo.exchangeBaseUrl,
     );
-
-    for (let i = 0; i < depositPermissions.length; i++) {
-      const perm = depositPermissions[i];
-
-      let didDeposit: boolean = false;
-
-      if (!depositGroup.depositedPerCoin[i]) {
-        const requestBody: ExchangeDepositRequest = {
-          contribution: Amounts.stringify(perm.contribution),
-          merchant_payto_uri: depositGroup.wire.payto_uri,
-          wire_salt: depositGroup.wire.salt,
-          h_contract_terms: depositGroup.contractTermsHash,
-          ub_sig: perm.ub_sig,
-          timestamp: depositGroup.contractTermsRaw.timestamp,
-          wire_transfer_deadline:
-            depositGroup.contractTermsRaw.wire_transfer_deadline,
-          refund_deadline: depositGroup.contractTermsRaw.refund_deadline,
-          coin_sig: perm.coin_sig,
-          denom_pub_hash: perm.h_denom,
-          merchant_pub: depositGroup.merchantPub,
-          h_age_commitment: perm.h_age_commitment,
-        };
-        // Check for cancellation before making network request.
-        options.cancellationToken?.throwIfCancelled();
-        const url = new URL(
-          `coins/${perm.coin_pub}/deposit`,
-          perm.exchange_url,
-        );
-        logger.info(`depositing to ${url}`);
-        const httpResp = await ws.http.fetch(url.href, {
-          method: "POST",
-          body: requestBody,
-          cancellationToken: options.cancellationToken,
+    url.searchParams.set("timeout_ms", "10000");
+    logger.info(`kyc url ${url.href}`);
+    const kycStatusRes = await ws.http.fetch(url.href, {
+      method: "GET",
+      cancellationToken: ct,
+    });
+    if (
+      kycStatusRes.status === HttpStatusCode.Ok ||
+      //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+      // remove after the exchange is fixed or clarified
+      kycStatusRes.status === HttpStatusCode.NoContent
+    ) {
+      const transitionInfo = await ws.db
+        .mktx((x) => [x.depositGroups])
+        .runReadWrite(async (tx) => {
+          const newDg = await tx.depositGroups.get(depositGroupId);
+          if (!newDg) {
+            return;
+          }
+          if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) {
+            return;
+          }
+          const oldTxState = computeDepositTransactionStatus(newDg);
+          newDg.operationStatus = DepositOperationStatus.PendingTrack;
+          const newTxState = computeDepositTransactionStatus(newDg);
+          await tx.depositGroups.put(newDg);
+          return { oldTxState, newTxState };
         });
-        await readSuccessResponseJsonOrThrow(
-          httpResp,
-          codecForDepositSuccess(),
-        );
-        didDeposit = true;
-      }
+      notifyTransition(ws, transactionId, transitionInfo);
+      return { ready: true };
+    } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+      // FIXME: Do we have to update the URL here?
+      return { ready: false };
+    } else {
+      throw Error(
+        `unexpected response from kyc-check (${kycStatusRes.status})`,
+      );
+    }
+  });
+  return OperationAttemptResult.longpoll();
+}
 
-      let updatedTxStatus: DepositElementStatus | undefined = undefined;
+/**
+ * Tracking information from the exchange indicated that
+ * KYC is required.  We need to check the KYC info
+ * and transition the transaction to the KYC required state.
+ */
+async function transitionToKycRequired(
+  ws: InternalWalletState,
+  depositGroup: DepositGroupRecord,
+  kycInfo: KycPendingInfo,
+  exchangeUrl: string,
+): Promise<OperationAttemptResult> {
+  const { depositGroupId } = depositGroup;
+  const userType = "individual";
 
-      let newWiredCoin:
-        | {
-            id: string;
-            value: DepositTrackingInfo;
-          }
-        | undefined;
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Deposit,
+    depositGroupId,
+  });
 
-      if (depositGroup.transactionPerCoin[i] !== DepositElementStatus.Wired) {
-        const track = await trackDeposit(ws, depositGroup, perm);
+  const url = new URL(
+    `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+    exchangeUrl,
+  );
+  logger.info(`kyc url ${url.href}`);
+  const kycStatusReq = await ws.http.fetch(url.href, {
+    method: "GET",
+  });
+  if (kycStatusReq.status === HttpStatusCode.Ok) {
+    logger.warn("kyc requested, but already fulfilled");
+    return OperationAttemptResult.finishedEmpty();
+  } else if (kycStatusReq.status === HttpStatusCode.Accepted) {
+    const kycStatus = await kycStatusReq.json();
+    logger.info(`kyc status: ${j2s(kycStatus)}`);
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.depositGroups])
+      .runReadWrite(async (tx) => {
+        const dg = await tx.depositGroups.get(depositGroupId);
+        if (!dg) {
+          return undefined;
+        }
+        if (dg.operationStatus !== DepositOperationStatus.PendingTrack) {
+          return undefined;
+        }
+        const oldTxState = computeDepositTransactionStatus(dg);
+        dg.kycInfo = {
+          exchangeBaseUrl: exchangeUrl,
+          kycUrl: kycStatus.kyc_url,
+          paytoHash: kycInfo.paytoHash,
+          requirementRow: kycInfo.requirementRow,
+        };
+        await tx.depositGroups.put(dg);
+        const newTxState = computeDepositTransactionStatus(dg);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+    return OperationAttemptResult.finishedEmpty();
+  } else {
+    throw Error(`unexpected response from kyc-check (${kycStatusReq.status})`);
+  }
+}
 
-        if (track.type === "accepted") {
-          if (!track.kyc_ok && track.requirement_row !== undefined) {
-            updatedTxStatus = DepositElementStatus.KycRequired;
-            const { requirement_row: requirementRow } = track;
-            const paytoHash = encodeCrock(
-              hashTruncate32(stringToBytes(depositGroup.wire.payto_uri + 
"\0")),
-            );
-            await checkDepositKycStatus(
-              ws,
-              perm.exchange_url,
-              { paytoHash, requirementRow },
-              "individual",
-            );
-          } else {
-            updatedTxStatus = DepositElementStatus.Accepted;
-          }
-        } else if (track.type === "wired") {
-          updatedTxStatus = DepositElementStatus.Wired;
+async function processDepositGroupPendingTrack(
+  ws: InternalWalletState,
+  depositGroup: DepositGroupRecord,
+  cancellationToken?: CancellationToken,
+): Promise<OperationAttemptResult> {
+  const { depositGroupId } = depositGroup;
+  for (let i = 0; i < depositGroup.depositedPerCoin.length; i++) {
+    const coinPub = depositGroup.payCoinSelection.coinPubs[i];
+    // FIXME: Make the URL part of the coin selection?
+    const exchangeBaseUrl = await ws.db
+      .mktx((x) => [x.coins])
+      .runReadWrite(async (tx) => {
+        const coinRecord = await tx.coins.get(coinPub);
+        checkDbInvariant(!!coinRecord);
+        return coinRecord.exchangeBaseUrl;
+      });
 
-          const payto = parsePaytoUri(depositGroup.wire.payto_uri);
-          if (!payto) {
-            throw Error(`unparsable payto: ${depositGroup.wire.payto_uri}`);
-          }
+    let updatedTxStatus: DepositElementStatus | undefined = undefined;
+    let newWiredCoin:
+      | {
+          id: string;
+          value: DepositTrackingInfo;
+        }
+      | undefined;
+
+    if (depositGroup.transactionPerCoin[i] !== DepositElementStatus.Wired) {
+      const track = await trackDeposit(
+        ws,
+        depositGroup,
+        coinPub,
+        exchangeBaseUrl,
+      );
 
-          const fee = await getExchangeWireFee(
-            ws,
-            payto.targetType,
-            perm.exchange_url,
-            track.execution_time,
+      if (track.type === "accepted") {
+        if (!track.kyc_ok && track.requirement_row !== undefined) {
+          const paytoHash = encodeCrock(
+            hashTruncate32(stringToBytes(depositGroup.wire.payto_uri + "\0")),
           );
-          const raw = Amounts.parseOrThrow(track.coin_contribution);
-          const wireFee = Amounts.parseOrThrow(fee.wireFee);
-
-          newWiredCoin = {
-            value: {
-              amountRaw: Amounts.stringify(raw),
-              wireFee: Amounts.stringify(wireFee),
-              exchangePub: track.exchange_pub,
-              timestampExecuted: track.execution_time,
-              wireTransferId: track.wtid,
-            },
-            id: track.exchange_sig,
+          const { requirement_row: requirementRow } = track;
+          const kycInfo: KycPendingInfo = {
+            paytoHash,
+            requirementRow,
           };
+          return transitionToKycRequired(
+            ws,
+            depositGroup,
+            kycInfo,
+            exchangeBaseUrl,
+          );
         } else {
-          updatedTxStatus = DepositElementStatus.Unknown;
+          updatedTxStatus = DepositElementStatus.Accepted;
         }
+      } else if (track.type === "wired") {
+        updatedTxStatus = DepositElementStatus.Wired;
+
+        const payto = parsePaytoUri(depositGroup.wire.payto_uri);
+        if (!payto) {
+          throw Error(`unparsable payto: ${depositGroup.wire.payto_uri}`);
+        }
+
+        const fee = await getExchangeWireFee(
+          ws,
+          payto.targetType,
+          exchangeBaseUrl,
+          track.execution_time,
+        );
+        const raw = Amounts.parseOrThrow(track.coin_contribution);
+        const wireFee = Amounts.parseOrThrow(fee.wireFee);
+
+        newWiredCoin = {
+          value: {
+            amountRaw: Amounts.stringify(raw),
+            wireFee: Amounts.stringify(wireFee),
+            exchangePub: track.exchange_pub,
+            timestampExecuted: track.execution_time,
+            wireTransferId: track.wtid,
+          },
+          id: track.exchange_sig,
+        };
+      } else {
+        updatedTxStatus = DepositElementStatus.Unknown;
       }
+    }
 
-      if (updatedTxStatus !== undefined || didDeposit) {
-        await ws.db
-          .mktx((x) => [x.depositGroups])
-          .runReadWrite(async (tx) => {
-            const dg = await tx.depositGroups.get(depositGroupId);
-            if (!dg) {
-              return;
-            }
-            if (didDeposit) {
-              dg.depositedPerCoin[i] = didDeposit;
-            }
-            if (updatedTxStatus !== undefined) {
-              dg.transactionPerCoin[i] = updatedTxStatus;
-            }
-            if (newWiredCoin) {
-              /**
-               * FIXME: if there is a new wire information from the exchange
-               * it should add up to the previous tracking states.
-               *
-               * This may loose information by overriding prev state.
-               *
-               * And: add checks to integration tests
-               */
-              if (!dg.trackingState) {
-                dg.trackingState = {};
-              }
-
-              dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+    if (updatedTxStatus !== undefined) {
+      await ws.db
+        .mktx((x) => [x.depositGroups])
+        .runReadWrite(async (tx) => {
+          const dg = await tx.depositGroups.get(depositGroupId);
+          if (!dg) {
+            return;
+          }
+          if (updatedTxStatus !== undefined) {
+            dg.transactionPerCoin[i] = updatedTxStatus;
+          }
+          if (newWiredCoin) {
+            /**
+             * FIXME: if there is a new wire information from the exchange
+             * it should add up to the previous tracking states.
+             *
+             * This may loose information by overriding prev state.
+             *
+             * And: add checks to integration tests
+             */
+            if (!dg.trackingState) {
+              dg.trackingState = {};
             }
-            await tx.depositGroups.put(dg);
-          });
+
+            dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+          }
+          await tx.depositGroups.put(dg);
+        });
+    }
+  }
+
+  let allWired = true;
+
+  const transitionInfo = await ws.db
+    .mktx((x) => [x.depositGroups])
+    .runReadWrite(async (tx) => {
+      const dg = await tx.depositGroups.get(depositGroupId);
+      if (!dg) {
+        return undefined;
+      }
+      const oldTxState = computeDepositTransactionStatus(dg);
+      for (let i = 0; i < depositGroup.depositedPerCoin.length; i++) {
+        if (depositGroup.transactionPerCoin[i] !== DepositElementStatus.Wired) 
{
+          allWired = false;
+          break;
+        }
+      }
+      if (allWired) {
+        dg.timestampFinished = TalerPreciseTimestamp.now();
+        dg.operationStatus = DepositOperationStatus.Finished;
+        await tx.depositGroups.put(dg);
       }
+      const newTxState = computeDepositTransactionStatus(dg);
+      return { oldTxState, newTxState };
+    });
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Deposit,
+    depositGroupId,
+  });
+  notifyTransition(ws, transactionId, transitionInfo);
+  if (allWired) {
+    return OperationAttemptResult.finishedEmpty();
+  } else {
+    // FIXME: Use long-polling.
+    return OperationAttemptResult.pendingEmpty();
+  }
+}
+
+async function processDepositGroupPendingDeposit(
+  ws: InternalWalletState,
+  depositGroup: DepositGroupRecord,
+  cancellationToken?: CancellationToken,
+): Promise<OperationAttemptResult> {
+  logger.info("processing deposit group in pending(deposit)");
+  const depositGroupId = depositGroup.depositGroupId;
+  const contractData = extractContractData(
+    depositGroup.contractTermsRaw,
+    depositGroup.contractTermsHash,
+    "",
+  );
+
+  const transactionId = constructTransactionIdentifier({
+    tag: TransactionType.Deposit,
+    depositGroupId,
+  });
+
+  // Check for cancellation before expensive operations.
+  cancellationToken?.throwIfCancelled();
+
+  // FIXME: Cache these!
+  const depositPermissions = await generateDepositPermissions(
+    ws,
+    depositGroup.payCoinSelection,
+    contractData,
+  );
+
+  for (let i = 0; i < depositPermissions.length; i++) {
+    const perm = depositPermissions[i];
+
+    if (depositGroup.depositedPerCoin[i]) {
+      continue;
     }
 
-    const txStatusNew = await ws.db
+    const requestBody: ExchangeDepositRequest = {
+      contribution: Amounts.stringify(perm.contribution),
+      merchant_payto_uri: depositGroup.wire.payto_uri,
+      wire_salt: depositGroup.wire.salt,
+      h_contract_terms: depositGroup.contractTermsHash,
+      ub_sig: perm.ub_sig,
+      timestamp: depositGroup.contractTermsRaw.timestamp,
+      wire_transfer_deadline:
+        depositGroup.contractTermsRaw.wire_transfer_deadline,
+      refund_deadline: depositGroup.contractTermsRaw.refund_deadline,
+      coin_sig: perm.coin_sig,
+      denom_pub_hash: perm.h_denom,
+      merchant_pub: depositGroup.merchantPub,
+      h_age_commitment: perm.h_age_commitment,
+    };
+    // Check for cancellation before making network request.
+    cancellationToken?.throwIfCancelled();
+    const url = new URL(`coins/${perm.coin_pub}/deposit`, perm.exchange_url);
+    logger.info(`depositing to ${url}`);
+    const httpResp = await ws.http.fetch(url.href, {
+      method: "POST",
+      body: requestBody,
+      cancellationToken: cancellationToken,
+    });
+    await readSuccessResponseJsonOrThrow(httpResp, codecForDepositSuccess());
+
+    await ws.db
       .mktx((x) => [x.depositGroups])
       .runReadWrite(async (tx) => {
         const dg = await tx.depositGroups.get(depositGroupId);
         if (!dg) {
-          return undefined;
-        }
-        let allDepositedAndWired = true;
-        for (let i = 0; i < depositGroup.depositedPerCoin.length; i++) {
-          if (
-            !depositGroup.depositedPerCoin[i] ||
-            depositGroup.transactionPerCoin[i] !== DepositElementStatus.Wired
-          ) {
-            allDepositedAndWired = false;
-            break;
-          }
-        }
-        if (allDepositedAndWired) {
-          dg.timestampFinished = TalerPreciseTimestamp.now();
-          dg.operationStatus = DepositOperationStatus.Finished;
-          await tx.depositGroups.put(dg);
+          return;
         }
-        return computeDepositTransactionStatus(dg);
+        dg.depositedPerCoin[i] = true;
+        await tx.depositGroups.put(dg);
       });
+  }
 
-    if (!txStatusNew) {
-      // Doesn't exist anymore!
-      return OperationAttemptResult.finishedEmpty();
-    }
+  const transitionInfo = await ws.db
+    .mktx((x) => [x.depositGroups])
+    .runReadWrite(async (tx) => {
+      const dg = await tx.depositGroups.get(depositGroupId);
+      if (!dg) {
+        return undefined;
+      }
+      const oldTxState = computeDepositTransactionStatus(dg);
+      dg.operationStatus = DepositOperationStatus.PendingTrack;
+      await tx.depositGroups.put(dg);
+      const newTxState = computeDepositTransactionStatus(dg);
+      return { oldTxState, newTxState };
+    });
 
-    // Notify if state transitioned
-    if (
-      txStateOld.major !== txStatusNew.major ||
-      txStateOld.minor !== txStatusNew.minor
-    ) {
-      ws.notify({
-        type: NotificationType.TransactionStateTransition,
-        transactionId,
-        oldTxState: txStateOld,
-        newTxState: txStatusNew,
-      });
-    }
+  notifyTransition(ws, transactionId, transitionInfo);
+  return OperationAttemptResult.finishedEmpty();
+}
 
-    // FIXME: consider other cases like aborting, suspend, ...
-    if (
-      txStatusNew.major === TransactionMajorState.Pending ||
-      txStatusNew.major === TransactionMajorState.Aborting
-    ) {
-      return OperationAttemptResult.pendingEmpty();
-    } else {
-      return OperationAttemptResult.finishedEmpty();
-    }
+/**
+ * Process a deposit group that is not in its final state yet.
+ */
+export async function processDepositGroup(
+  ws: InternalWalletState,
+  depositGroupId: string,
+  options: {
+    cancellationToken?: CancellationToken;
+  } = {},
+): Promise<OperationAttemptResult> {
+  const depositGroup = await ws.db
+    .mktx((x) => [x.depositGroups])
+    .runReadOnly(async (tx) => {
+      return tx.depositGroups.get(depositGroupId);
+    });
+  if (!depositGroup) {
+    logger.warn(`deposit group ${depositGroupId} not found`);
+    return OperationAttemptResult.finishedEmpty();
   }
 
-  if (depositGroup.operationStatus === DepositOperationStatus.Aborting) {
-    logger.info("processing deposit tx in 'aborting'");
-    const abortRefreshGroupId = depositGroup.abortRefreshGroupId;
-    if (!abortRefreshGroupId) {
-      logger.info("refunding deposit group");
-      return refundDepositGroup(ws, depositGroup);
-    }
-    logger.info("waiting for refresh");
-    return waitForRefreshOnDepositGroup(ws, depositGroup);
+  switch (depositGroup.operationStatus) {
+    case DepositOperationStatus.PendingTrack:
+      return processDepositGroupPendingTrack(
+        ws,
+        depositGroup,
+        options.cancellationToken,
+      );
+    case DepositOperationStatus.PendingKyc:
+      return processDepositGroupPendingKyc(ws, depositGroup);
+    case DepositOperationStatus.PendingDeposit:
+      return processDepositGroupPendingDeposit(
+        ws,
+        depositGroup,
+        options.cancellationToken,
+      );
+    case DepositOperationStatus.Aborting:
+      return processDepositGroupAborting(ws, depositGroup);
   }
+
   return OperationAttemptResult.finishedEmpty();
 }
 
@@ -928,16 +1075,17 @@ async function getExchangeWireFee(
 async function trackDeposit(
   ws: InternalWalletState,
   depositGroup: DepositGroupRecord,
-  dp: CoinDepositPermission,
+  coinPub: string,
+  exchangeUrl: string,
 ): Promise<TrackTransaction> {
   const wireHash = depositGroup.contractTermsRaw.h_wire;
 
   const url = new URL(
-    
`deposits/${wireHash}/${depositGroup.merchantPub}/${depositGroup.contractTermsHash}/${dp.coin_pub}`,
-    dp.exchange_url,
+    
`deposits/${wireHash}/${depositGroup.merchantPub}/${depositGroup.contractTermsHash}/${coinPub}`,
+    exchangeUrl,
   );
   const sigResp = await ws.cryptoApi.signTrackTransaction({
-    coinPub: dp.coin_pub,
+    coinPub,
     contractTermsHash: depositGroup.contractTermsHash,
     merchantPriv: depositGroup.merchantPriv,
     merchantPub: depositGroup.merchantPub,
@@ -1224,7 +1372,7 @@ export async function createDepositGroup(
       payto_uri: req.depositPaytoUri,
       salt: wireSalt,
     },
-    operationStatus: DepositOperationStatus.Pending,
+    operationStatus: DepositOperationStatus.PendingDeposit,
   };
 
   const transactionId = constructTransactionIdentifier({
@@ -1263,6 +1411,10 @@ export async function createDepositGroup(
     newTxState,
   });
 
+  ws.notify({
+    type: NotificationType.BalanceChange,
+  });
+
   return {
     depositGroupId,
     transactionId,
@@ -1332,7 +1484,7 @@ export async function 
getCounterpartyEffectiveDepositAmount(
  * Get the fee amount that will be charged when trying to deposit the
  * specified amount using the selected coins and the wire method.
  */
-export async function getTotalFeesForDepositAmount(
+async function getTotalFeesForDepositAmount(
   ws: InternalWalletState,
   wireType: string,
   total: AmountJson,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
index b2f641b80..58a1c2b3c 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
@@ -427,7 +427,7 @@ async function handlePendingMerge(
     const respJson = await mergeHttpResp.json();
     const kycPending = codecForWalletKycUuid().decode(respJson);
     logger.info(`kyc uuid response: ${j2s(kycPending)}`);
-    processPeerPushCreditKycRequired(ws, peerInc, kycPending);
+    return processPeerPushCreditKycRequired(ws, peerInc, kycPending);
   }
 
   logger.trace(`merge request: ${j2s(mergeReq)}`);
diff --git a/packages/taler-wallet-core/src/operations/pending.ts 
b/packages/taler-wallet-core/src/operations/pending.ts
index e7e7ffcfc..e3f7d0fab 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -32,8 +32,8 @@ import {
   PeerPushPaymentIncomingStatus,
   PeerPullPaymentInitiationStatus,
   WithdrawalGroupStatus,
-  DepositGroupOperationStatus,
   TipRecordStatus,
+  DepositOperationStatus,
 } from "../db.js";
 import {
   PendingOperationsResponse,
@@ -198,8 +198,8 @@ async function gatherDepositPending(
 ): Promise<void> {
   const dgs = await tx.depositGroups.indexes.byStatus.getAll(
     GlobalIDB.KeyRange.bound(
-      DepositGroupOperationStatus.Pending,
-      DepositGroupOperationStatus.AbortingWithRefresh,
+      DepositOperationStatus.PendingDeposit,
+      DepositOperationStatus.PendingKyc,
     ),
   );
   for (const dg of dgs) {
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts 
b/packages/taler-wallet-core/src/operations/withdraw.ts
index 28f4eeebb..8eb7f6457 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -1108,14 +1108,6 @@ async function processPlanchetVerifyAndStoreCoin(
 
   wgContext.planchetsFinished.add(planchet.coinPub);
 
-  // We create the notification here, as the async transaction below
-  // allows other planchet withdrawals to change wgContext.planchetsFinished
-  const notification: WalletNotification = {
-    type: NotificationType.CoinWithdrawn,
-    numTotal: wgContext.numPlanchets,
-    numWithdrawn: wgContext.planchetsFinished.size,
-  };
-
   // Check if this is the first time that the whole
   // withdrawal succeeded.  If so, mark the withdrawal
   // group as finished.
@@ -1138,9 +1130,7 @@ async function processPlanchetVerifyAndStoreCoin(
       return true;
     });
 
-  if (firstSuccess) {
-    ws.notify(notification);
-  }
+  ws.notify({ type: NotificationType.BalanceChange });
 }
 
 /**

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]