gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 03/03: wallet-core: towards event-based waiting in r


From: gnunet
Subject: [taler-wallet-core] 03/03: wallet-core: towards event-based waiting in runIntegrationTestV2
Date: Fri, 30 Jun 2023 23:01:54 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit 7a18e12a175856b3d17d2bb70ec549004c281ff5
Author: Florian Dold <florian@dold.me>
AuthorDate: Fri Jun 30 23:01:48 2023 +0200

    wallet-core: towards event-based waiting in runIntegrationTestV2
---
 .../taler-wallet-core/src/internal-wallet-state.ts |  8 ++++
 .../src/operations/pay-peer-pull-credit.ts         | 48 +++++++++++++---------
 .../taler-wallet-core/src/operations/testing.ts    | 44 ++++++++++++++++++++
 packages/taler-wallet-core/src/wallet.ts           | 48 +++++++++++++++++++---
 4 files changed, 124 insertions(+), 24 deletions(-)

diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts 
b/packages/taler-wallet-core/src/internal-wallet-state.ts
index d97703dc1..a2ca34a86 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -184,6 +184,8 @@ export interface InternalWalletState {
   merchantOps: MerchantOperations;
   refreshOps: RefreshOperations;
 
+  isTaskLoopRunning: boolean;
+
   getTransactionState(
     ws: InternalWalletState,
     tx: GetReadOnlyAccess<typeof WalletStoresV1>,
@@ -218,4 +220,10 @@ export interface InternalWalletState {
   runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
 
   runUntilDone(req?: { maxRetries?: number }): Promise<void>;
+
+  /**
+   * Ensure that a task loop is currently running.
+   * Starts one if no task loop is running.
+   */
+  ensureTaskLoopRunning(): void;
 }
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index 4c00ed592..c7e13754f 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -746,31 +746,37 @@ export async function initiatePeerPullPayment(
     undefined,
   );
 
-  await ws.db
+  const transitionInfo = await ws.db
     .mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms])
     .runReadWrite(async (tx) => {
-      await tx.peerPullPaymentInitiations.put({
-        amount: req.partialContractTerms.amount,
-        contractTermsHash: hContractTerms,
-        exchangeBaseUrl: exchangeBaseUrl,
-        pursePriv: pursePair.priv,
-        pursePub: pursePair.pub,
-        mergePriv: mergePair.priv,
-        mergePub: mergePair.pub,
-        status: PeerPullPaymentInitiationStatus.PendingCreatePurse,
-        contractTerms: contractTerms,
-        mergeTimestamp,
-        contractEncNonce,
-        mergeReserveRowId: mergeReserveRowId,
-        contractPriv: contractKeyPair.priv,
-        contractPub: contractKeyPair.pub,
-        withdrawalGroupId,
-        estimatedAmountEffective: wi.withdrawalAmountEffective,
-      });
+      const ppi: PeerPullPaymentInitiationRecord = {
+          amount: req.partialContractTerms.amount,
+          contractTermsHash: hContractTerms,
+          exchangeBaseUrl: exchangeBaseUrl,
+          pursePriv: pursePair.priv,
+          pursePub: pursePair.pub,
+          mergePriv: mergePair.priv,
+          mergePub: mergePair.pub,
+          status: PeerPullPaymentInitiationStatus.PendingCreatePurse,
+          contractTerms: contractTerms,
+          mergeTimestamp,
+          contractEncNonce,
+          mergeReserveRowId: mergeReserveRowId,
+          contractPriv: contractKeyPair.priv,
+          contractPub: contractKeyPair.pub,
+          withdrawalGroupId,
+          estimatedAmountEffective: wi.withdrawalAmountEffective,
+      }
+      await tx.peerPullPaymentInitiations.put(ppi);
+      const oldTxState: TransactionState = {
+        major: TransactionMajorState.None,
+      };
+      const newTxState = computePeerPullCreditTransactionState(ppi);
       await tx.contractTerms.put({
         contractTermsRaw: contractTerms,
         h: hContractTerms,
       });
+      return { oldTxState, newTxState };
     });
 
   const transactionId = constructTransactionIdentifier({
@@ -781,6 +787,10 @@ export async function initiatePeerPullPayment(
   // The pending-incoming balance has changed.
   ws.notify({ type: NotificationType.BalanceChange });
 
+  notifyTransition(ws, transactionId, transitionInfo);
+
+  ws.workAvailable.trigger();
+
   return {
     talerUri: stringifyTalerUri({
       type: TalerUriAction.PayPull,
diff --git a/packages/taler-wallet-core/src/operations/testing.ts 
b/packages/taler-wallet-core/src/operations/testing.ts
index 77e218cd7..2a3584a0a 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -27,6 +27,8 @@ import {
   NotificationType,
   stringToBytes,
   TestPayResult,
+  TransactionMajorState,
+  TransactionMinorState,
   WithdrawTestBalanceRequest,
 } from "@gnu-taler/taler-util";
 import {
@@ -66,6 +68,7 @@ import {
 } from "./pay-peer-push-credit.js";
 import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
 import { OpenedPromise, openPromise } from "../index.js";
+import { getTransactionById } from "./transactions.js";
 
 const logger = new Logger("operations/testing.ts");
 
@@ -459,10 +462,45 @@ async function waitUntilDone(ws: InternalWalletState): 
Promise<void> {
   });
 }
 
+async function waitUntilPendingReady(
+  ws: InternalWalletState,
+  transactionId: string,
+): Promise<void> {
+  logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
+  ws.ensureTaskLoopRunning();
+  let p: OpenedPromise<void> | undefined = undefined;
+  ws.addNotificationListener((notif) => {
+    if (!p) {
+      return;
+    }
+    if (notif.type === NotificationType.TransactionStateTransition) {
+      p.resolve();
+    }
+  });
+  while (1) {
+    p = openPromise();
+    const tx = await getTransactionById(ws, {
+      transactionId,
+    });
+    if (
+      tx.txState.major == TransactionMajorState.Pending &&
+      tx.txState.minor === TransactionMinorState.Ready
+    ) {
+      break;
+    }
+    // Wait until transaction state changed
+    await p.promise;
+  }
+  logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
+  // FIXME: Remove listener!
+}
+
 export async function runIntegrationTest2(
   ws: InternalWalletState,
   args: IntegrationTestV2Args,
 ): Promise<void> {
+  // FIXME: Make sure that a task look is running, since we're
+  // waiting for notifications.
   logger.info("running test with arguments", args);
 
   const exchangeInfo = await updateExchangeFromUrl(ws, args.exchangeBaseUrl);
@@ -565,6 +603,8 @@ export async function runIntegrationTest2(
     },
   });
 
+  await waitUntilPendingReady(ws, peerPushInit.transactionId);
+
   const peerPushCredit = await preparePeerPushCredit(ws, {
     talerUri: peerPushInit.talerUri,
   });
@@ -586,6 +626,8 @@ export async function runIntegrationTest2(
     },
   });
 
+  await waitUntilPendingReady(ws, peerPullInit.transactionId);
+
   const peerPullInc = await preparePeerPullDebit(ws, {
     talerUri: peerPullInit.talerUri,
   });
@@ -594,6 +636,8 @@ export async function runIntegrationTest2(
     peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId,
   });
 
+  await ws.runUntilDone();
+
   logger.trace("integration test: all done!");
 }
 
diff --git a/packages/taler-wallet-core/src/wallet.ts 
b/packages/taler-wallet-core/src/wallet.ts
index 583dc33d4..1b355a32c 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -287,9 +287,7 @@ import {
   GetReadOnlyAccess,
   GetReadWriteAccess,
 } from "./util/query.js";
-import {
-  TaskIdentifiers,
-} from "./operations/common.js";
+import { TaskIdentifiers } from "./operations/common.js";
 import { TimerAPI, TimerGroup } from "./util/timer.js";
 import {
   WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -404,6 +402,12 @@ async function runTaskLoop(
   opts: RetryLoopOpts = {},
 ): Promise<TaskLoopResult> {
   logger.info(`running task loop opts=${j2s(opts)}`);
+  if (ws.isTaskLoopRunning) {
+    logger.warn(
+      "task loop already running, nesting the wallet-core task loop is 
deprecated and should be avoided",
+    );
+  }
+  ws.isTaskLoopRunning = true;
   let retriesExceeded = false;
   for (let iteration = 0; !ws.stopped; iteration++) {
     const pending = await getPendingOperations(ws);
@@ -434,6 +438,14 @@ async function runTaskLoop(
 
     if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
       logger.warn(`stopping, as no pending operations have lifeness`);
+      ws.isTaskLoopRunning = false;
+      return {
+        retriesExceeded,
+      };
+    }
+
+    if (ws.stopped) {
+      ws.isTaskLoopRunning = false;
       return {
         retriesExceeded,
       };
@@ -468,16 +480,24 @@ async function runTaskLoop(
         }
         await runTaskWithErrorReporting(ws, p.id, async () => {
           logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
+          ws.isTaskLoopRunning = false;
           return await callOperationHandler(ws, p);
         });
         ws.notify({
           type: NotificationType.PendingOperationProcessed,
           id: p.id,
         });
+        if (ws.stopped) {
+          ws.isTaskLoopRunning = false;
+          return {
+            retriesExceeded,
+          };
+        }
       }
     }
   }
-  logger.trace("exiting wallet retry loop");
+  logger.trace("exiting wallet task loop");
+  ws.isTaskLoopRunning = false;
   return {
     retriesExceeded,
   };
@@ -1575,7 +1595,9 @@ export async function handleCoreApiRequest(
     };
   } catch (e: any) {
     const err = getErrorDetailFromException(e);
-    logger.info(`finished wallet core request ${operation} with error: 
${j2s(err)}`);
+    logger.info(
+      `finished wallet core request ${operation} with error: ${j2s(err)}`,
+    );
     return {
       type: "error",
       operation,
@@ -1737,6 +1759,8 @@ class InternalWalletStateImpl implements 
InternalWalletState {
    */
   private resourceLocks: Set<string> = new Set();
 
+  isTaskLoopRunning: boolean = false;
+
   config: Readonly<WalletConfig>;
 
   constructor(
@@ -1948,6 +1972,20 @@ class InternalWalletStateImpl implements 
InternalWalletState {
       }
     }
   }
+
+  ensureTaskLoopRunning(): void {
+    if (this.isTaskLoopRunning) {
+      return;
+    }
+    runTaskLoop(this)
+      .catch((e) => {
+        logger.error("error running task loop");
+        logger.error(`err: ${e}`);
+      })
+      .then(() => {
+        logger.info("done running task loop");
+      });
+  }
 }
 
 /**

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]