gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: wallet-core: support long-pol


From: gnunet
Subject: [taler-wallet-core] branch master updated: wallet-core: support long-polling for peer push credit
Date: Mon, 20 Feb 2023 21:26:17 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new a49959d2c wallet-core: support long-polling for peer push credit
a49959d2c is described below

commit a49959d2c8bf82575c5d232217a33d91e7b008e8
Author: Florian Dold <florian@dold.me>
AuthorDate: Mon Feb 20 21:26:08 2023 +0100

    wallet-core: support long-polling for peer push credit
---
 packages/taler-wallet-core/src/db.ts               |  12 +-
 .../taler-wallet-core/src/operations/common.ts     |  41 +++++
 .../taler-wallet-core/src/operations/pay-peer.ts   | 182 ++++++++++++++++-----
 .../taler-wallet-core/src/operations/pending.ts    |   3 +-
 .../taler-wallet-core/src/operations/withdraw.ts   |  56 +++----
 5 files changed, 216 insertions(+), 78 deletions(-)

diff --git a/packages/taler-wallet-core/src/db.ts 
b/packages/taler-wallet-core/src/db.ts
index cbf49c4ca..29e97cd90 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -1774,6 +1774,16 @@ export interface PeerPushPaymentInitiationRecord {
   status: PeerPushPaymentInitiationStatus;
 }
 
+export enum PeerPullPaymentInitiationStatus {
+  Initial = 10 /* ACTIVE_START */,
+  /**
+   * Purse created, waiting for the other party to accept the
+   * invoice and deposit money into it.
+   */
+  PurseCreated = 11 /* ACTIVE_START + 1 */,
+  PurseDeposited = 50 /* DORMANT_START */,
+}
+
 export interface PeerPullPaymentInitiationRecord {
   /**
    * What exchange are we using for the payment request?
@@ -1817,7 +1827,7 @@ export interface PeerPullPaymentInitiationRecord {
   /**
    * Status of the peer pull payment initiation.
    */
-  status: OperationStatus;
+  status: PeerPullPaymentInitiationStatus;
 
   withdrawalGroupId: string | undefined;
 }
diff --git a/packages/taler-wallet-core/src/operations/common.ts 
b/packages/taler-wallet-core/src/operations/common.ts
index e5eda074c..3905eaf3e 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -21,11 +21,13 @@ import {
   AgeRestriction,
   AmountJson,
   Amounts,
+  CancellationToken,
   CoinRefreshRequest,
   CoinStatus,
   ExchangeEntryStatus,
   ExchangeListItem,
   ExchangeTosStatus,
+  getErrorDetailFromException,
   j2s,
   Logger,
   OperationErrorInfo,
@@ -453,3 +455,42 @@ export function makeExchangeListItem(
     lastUpdateErrorInfo,
   };
 }
+
+export interface LongpollResult {
+  ready: boolean;
+}
+
+export function runLongpollAsync(
+  ws: InternalWalletState,
+  retryTag: string,
+  reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
+): void {
+  const asyncFn = async () => {
+    if (ws.stopped) {
+      logger.trace("not long-polling reserve, wallet already stopped");
+      await storeOperationPending(ws, retryTag);
+      return;
+    }
+    const cts = CancellationToken.create();
+    let res: { ready: boolean } | undefined = undefined;
+    try {
+      ws.activeLongpoll[retryTag] = {
+        cancel: () => {
+          logger.trace("cancel of reserve longpoll requested");
+          cts.cancel();
+        },
+      };
+      res = await reqFn(cts.token);
+    } catch (e) {
+      await storeOperationError(ws, retryTag, getErrorDetailFromException(e));
+      return;
+    } finally {
+      delete ws.activeLongpoll[retryTag];
+    }
+    if (!res.ready) {
+      await storeOperationPending(ws, retryTag);
+    }
+    ws.latch.trigger();
+  };
+  asyncFn();
+}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts 
b/packages/taler-wallet-core/src/operations/pay-peer.ts
index 4f65ec7ea..4dcc06076 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer.ts
@@ -69,12 +69,17 @@ import {
   TransactionType,
   UnblindedSignature,
   WalletAccountMergeFlags,
+  codecOptional,
+  codecForTimestamp,
+  CancellationToken,
 } from "@gnu-taler/taler-util";
 import { SpendCoinDetails } from "../crypto/cryptoImplementation.js";
 import {
   DenominationRecord,
   OperationStatus,
   PeerPullPaymentIncomingStatus,
+  PeerPullPaymentInitiationRecord,
+  PeerPullPaymentInitiationStatus,
   PeerPushPaymentCoinSelection,
   PeerPushPaymentIncomingRecord,
   PeerPushPaymentIncomingStatus,
@@ -86,12 +91,19 @@ import {
 import { TalerError } from "@gnu-taler/taler-util";
 import { InternalWalletState } from "../internal-wallet-state.js";
 import {
+  LongpollResult,
   makeTransactionId,
   resetOperationTimeout,
+  runLongpollAsync,
   runOperationWithErrorReporting,
   spendCoins,
+  storeOperationPending,
 } from "../operations/common.js";
-import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
+import {
+  readSuccessResponseJsonOrErrorCode,
+  readSuccessResponseJsonOrThrow,
+  throwUnexpectedRequestError,
+} from "@gnu-taler/taler-util/http";
 import { checkDbInvariant } from "../util/invariants.js";
 import {
   constructTaskIdentifier,
@@ -622,11 +634,13 @@ export async function initiatePeerPushPayment(
 
 interface ExchangePurseStatus {
   balance: AmountString;
+  deposit_timestamp?: TalerProtocolTimestamp;
 }
 
 export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> =>
   buildCodecForObject<ExchangePurseStatus>()
     .property("balance", codecForAmountString())
+    .property("deposit_timestamp", codecOptional(codecForTimestamp))
     .build("ExchangePurseStatus");
 
 export async function preparePeerPushCredit(
@@ -1255,6 +1269,87 @@ export async function preparePeerPullCredit(
   };
 }
 
+export async function queryPurseForPeerPullCredit(
+  ws: InternalWalletState,
+  pullIni: PeerPullPaymentInitiationRecord,
+  cancellationToken: CancellationToken,
+): Promise<LongpollResult> {
+  const purseDepositUrl = new URL(
+    `purses/${pullIni.pursePub}/merge`,
+    pullIni.exchangeBaseUrl,
+  );
+  purseDepositUrl.searchParams.set("timeout_ms", "30000");
+  logger.info(`querying purse status via ${purseDepositUrl.href}`);
+  const resp = await ws.http.get(purseDepositUrl.href, {
+    timeout: { d_ms: 60000 },
+    cancellationToken,
+  });
+
+  logger.info(`purse status code: HTTP ${resp.status}`);
+
+  const result = await readSuccessResponseJsonOrErrorCode(
+    resp,
+    codecForExchangePurseStatus(),
+  );
+
+  if (result.isError) {
+    logger.info(`got purse status error, 
EC=${result.talerErrorResponse.code}`);
+    if (resp.status === 404) {
+      return { ready: false };
+    } else {
+      throwUnexpectedRequestError(resp, result.talerErrorResponse);
+    }
+  }
+
+  if (!result.response.deposit_timestamp) {
+    logger.info("purse not ready yet (no deposit)");
+    return { ready: false };
+  }
+
+  const reserve = await ws.db
+    .mktx((x) => [x.reserves])
+    .runReadOnly(async (tx) => {
+      return await tx.reserves.get(pullIni.mergeReserveRowId);
+    });
+
+  if (!reserve) {
+    throw Error("reserve for peer pull credit not found in wallet DB");
+  }
+
+  await internalCreateWithdrawalGroup(ws, {
+    amount: Amounts.parseOrThrow(pullIni.amount),
+    wgInfo: {
+      withdrawalType: WithdrawalRecordType.PeerPullCredit,
+      contractTerms: pullIni.contractTerms,
+      contractPriv: pullIni.contractPriv,
+    },
+    forcedWithdrawalGroupId: pullIni.withdrawalGroupId,
+    exchangeBaseUrl: pullIni.exchangeBaseUrl,
+    reserveStatus: WithdrawalGroupStatus.QueryingStatus,
+    reserveKeyPair: {
+      priv: reserve.reservePriv,
+      pub: reserve.reservePub,
+    },
+  });
+
+  await ws.db
+    .mktx((x) => [x.peerPullPaymentInitiations])
+    .runReadWrite(async (tx) => {
+      const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub);
+      if (!finPi) {
+        logger.warn("peerPullPaymentInitiation not found anymore");
+        return;
+      }
+      if (finPi.status === PeerPullPaymentInitiationStatus.PurseCreated) {
+        finPi.status = PeerPullPaymentInitiationStatus.PurseDeposited;
+      }
+      await tx.peerPullPaymentInitiations.put(finPi);
+    });
+  return {
+    ready: true,
+  };
+}
+
 export async function processPeerPullCredit(
   ws: InternalWalletState,
   pursePub: string,
@@ -1268,28 +1363,52 @@ export async function processPeerPullCredit(
     throw Error("peer pull payment initiation not found in database");
   }
 
-  if (pullIni.status === OperationStatus.Finished) {
-    logger.warn(
-      "peer pull payment initiation is already finished, retrying withdrawal",
-    );
+  const retryTag = constructTaskIdentifier({
+    tag: PendingTaskType.PeerPullInitiation,
+    pursePub,
+  });
 
-    const withdrawalGroupId = pullIni.withdrawalGroupId;
+  switch (pullIni.status) {
+    case PeerPullPaymentInitiationStatus.PurseDeposited: {
+      // We implement this case so that the "retry" action on a 
peer-pull-credit transaction
+      // also retries the withdrawal task.
 
-    if (withdrawalGroupId) {
-      const taskId = constructTaskIdentifier({
-        tag: PendingTaskType.Withdraw,
-        withdrawalGroupId,
-      });
-      stopLongpolling(ws, taskId);
-      await resetOperationTimeout(ws, taskId);
-      await runOperationWithErrorReporting(ws, taskId, () =>
-        processWithdrawalGroup(ws, withdrawalGroupId),
+      logger.warn(
+        "peer pull payment initiation is already finished, retrying 
withdrawal",
       );
+
+      const withdrawalGroupId = pullIni.withdrawalGroupId;
+
+      if (withdrawalGroupId) {
+        const taskId = constructTaskIdentifier({
+          tag: PendingTaskType.Withdraw,
+          withdrawalGroupId,
+        });
+        stopLongpolling(ws, taskId);
+        await resetOperationTimeout(ws, taskId);
+        await runOperationWithErrorReporting(ws, taskId, () =>
+          processWithdrawalGroup(ws, withdrawalGroupId),
+        );
+      }
+      return {
+        type: OperationAttemptResultType.Finished,
+        result: undefined,
+      };
     }
-    return {
-      type: OperationAttemptResultType.Finished,
-      result: undefined,
-    };
+    case PeerPullPaymentInitiationStatus.PurseCreated:
+      runLongpollAsync(ws, retryTag, async (cancellationToken) =>
+        queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
+      );
+      logger.trace(
+        "returning early from processPeerPullCredit for long-polling in 
background",
+      );
+      return {
+        type: OperationAttemptResultType.Longpoll,
+      };
+    case PeerPullPaymentInitiationStatus.Initial:
+      break;
+    default:
+      throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`);
   }
 
   const mergeReserve = await ws.db
@@ -1370,7 +1489,7 @@ export async function processPeerPullCredit(
       if (!pi2) {
         return;
       }
-      pi2.status = OperationStatus.Finished;
+      pi2.status = PeerPullPaymentInitiationStatus.PurseCreated;
       await tx.peerPullPaymentInitiations.put(pi2);
     });
 
@@ -1518,7 +1637,7 @@ export async function initiatePeerPullPayment(
         pursePub: pursePair.pub,
         mergePriv: mergePair.priv,
         mergePub: mergePair.pub,
-        status: OperationStatus.Pending,
+        status: PeerPullPaymentInitiationStatus.Initial,
         contractTerms: contractTerms,
         mergeTimestamp,
         mergeReserveRowId: mergeReserveRowId,
@@ -1545,27 +1664,6 @@ export async function initiatePeerPullPayment(
     return processPeerPullCredit(ws, pursePair.pub);
   });
 
-  // FIXME: Why do we create this only here?
-  // What if the previous operation didn't succeed?
-  // We actually should create it once we know the
-  // money arrived (via long-polling).
-
-  await internalCreateWithdrawalGroup(ws, {
-    amount: instructedAmount,
-    wgInfo: {
-      withdrawalType: WithdrawalRecordType.PeerPullCredit,
-      contractTerms,
-      contractPriv: contractKeyPair.priv,
-    },
-    forcedWithdrawalGroupId: withdrawalGroupId,
-    exchangeBaseUrl: exchangeBaseUrl,
-    reserveStatus: WithdrawalGroupStatus.QueryingStatus,
-    reserveKeyPair: {
-      priv: mergeReserveInfo.reservePriv,
-      pub: mergeReserveInfo.reservePub,
-    },
-  });
-
   return {
     talerUri: constructPayPullUri({
       exchangeBaseUrl: exchangeBaseUrl,
diff --git a/packages/taler-wallet-core/src/operations/pending.ts 
b/packages/taler-wallet-core/src/operations/pending.ts
index 2e3a5c9dc..458448b31 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -31,6 +31,7 @@ import {
   PeerPushPaymentInitiationStatus,
   PeerPullPaymentIncomingStatus,
   PeerPushPaymentIncomingStatus,
+  PeerPullPaymentInitiationStatus,
 } from "../db.js";
 import {
   PendingOperationsResponse,
@@ -363,7 +364,7 @@ async function gatherPeerPullInitiationPending(
   resp: PendingOperationsResponse,
 ): Promise<void> {
   await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
-    if (pi.status === OperationStatus.Finished) {
+    if (pi.status === PeerPullPaymentInitiationStatus.PurseDeposited) {
       return;
     }
     const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts 
b/packages/taler-wallet-core/src/operations/withdraw.ts
index 5729b8458..aba2948cd 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -90,6 +90,7 @@ import { InternalWalletState } from 
"../internal-wallet-state.js";
 import {
   makeCoinAvailable,
   makeExchangeListItem,
+  runLongpollAsync,
   runOperationWithErrorReporting,
 } from "../operations/common.js";
 import { walletCoreDebugFlags } from "../util/debugFlags.js";
@@ -1022,8 +1023,7 @@ export interface WithdrawalGroupContext {
 export async function processWithdrawalGroup(
   ws: InternalWalletState,
   withdrawalGroupId: string,
-  options: {
-  } = {},
+  options: {} = {},
 ): Promise<OperationAttemptResult> {
   logger.trace("processing withdrawal group", withdrawalGroupId);
   const withdrawalGroup = await ws.db
@@ -1053,38 +1053,9 @@ export async function processWithdrawalGroup(
         forceNow: true,
       });
     case WithdrawalGroupStatus.QueryingStatus: {
-      const doQueryAsync = async () => {
-        if (ws.stopped) {
-          logger.trace("not long-polling reserve, wallet already stopped");
-          await storeOperationPending(ws, retryTag);
-          return;
-        }
-        const cts = CancellationToken.create();
-        let res: { ready: boolean } | undefined = undefined;
-        try {
-          ws.activeLongpoll[retryTag] = {
-            cancel: () => {
-              logger.trace("cancel of reserve longpoll requested");
-              cts.cancel();
-            },
-          };
-          res = await queryReserve(ws, withdrawalGroupId, cts.token);
-        } catch (e) {
-          await storeOperationError(
-            ws,
-            retryTag,
-            getErrorDetailFromException(e),
-          );
-          return;
-        } finally {
-          delete ws.activeLongpoll[retryTag];
-        }
-        if (!res.ready) {
-          await storeOperationPending(ws, retryTag);
-        }
-        ws.latch.trigger();
-      };
-      doQueryAsync();
+      runLongpollAsync(ws, retryTag, (ct) => {
+        return queryReserve(ws, withdrawalGroupId, ct);
+      });
       logger.trace(
         "returning early from withdrawal for long-polling in background",
       );
@@ -1832,6 +1803,14 @@ async function processReserveBankStatus(
   }
 }
 
+/**
+ * Create a withdrawal group.
+ *
+ * If a forcedWithdrawalGroupId is given and a
+ * withdrawal group with this ID already exists,
+ * the existing one is returned.  No conflict checking
+ * of the other arguments is done in that case.
+ */
 export async function internalCreateWithdrawalGroup(
   ws: InternalWalletState,
   args: {
@@ -1856,6 +1835,15 @@ export async function internalCreateWithdrawalGroup(
 
   if (args.forcedWithdrawalGroupId) {
     withdrawalGroupId = args.forcedWithdrawalGroupId;
+    const wgId = withdrawalGroupId;
+    const existingWg = await ws.db
+      .mktx((x) => [x.withdrawalGroups])
+      .runReadOnly(async (tx) => {
+        return tx.withdrawalGroups.get(wgId);
+      });
+    if (existingWg) {
+      return existingWg;
+    }
   } else {
     withdrawalGroupId = encodeCrock(getRandomBytes(32));
   }

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]