gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: implement task shepherd, many


From: gnunet
Subject: [taler-wallet-core] branch master updated: implement task shepherd, many small fixes and tweaks
Date: Thu, 15 Feb 2024 21:57:11 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new 70a803038 implement task shepherd, many small fixes and tweaks
70a803038 is described below

commit 70a803038f1cbe05dc4779bdd87376fd073421be
Author: Florian Dold <florian@dold.me>
AuthorDate: Tue Feb 13 10:53:43 2024 +0100

    implement task shepherd, many small fixes and tweaks
---
 packages/taler-harness/src/harness/harness.ts      |  11 +
 .../src/integrationtests/test-payment-fault.ts     |  82 +-
 .../src/integrationtests/test-payment-share.ts     |  33 +-
 .../src/integrationtests/test-peer-repair.ts       |   4 +-
 .../src/integrationtests/test-peer-to-peer-pull.ts |   1 -
 .../test-timetravel-autorefresh.ts                 |  19 +-
 packages/taler-util/src/time.ts                    |  23 +-
 packages/taler-util/src/wallet-types.ts            |  12 +-
 packages/taler-wallet-cli/src/index.ts             |  11 +-
 .../taler-wallet-core/src/internal-wallet-state.ts |  23 +-
 .../taler-wallet-core/src/operations/README.md     |   7 -
 .../src/operations/backup/index.ts                 | 165 ++--
 .../taler-wallet-core/src/operations/common.ts     | 466 ++---------
 .../taler-wallet-core/src/operations/deposits.ts   | 140 ++--
 .../taler-wallet-core/src/operations/exchanges.ts  | 468 ++++++++---
 .../src/operations/pay-merchant.ts                 | 552 +++++++------
 .../src/operations/pay-peer-pull-credit.ts         | 167 ++--
 .../src/operations/pay-peer-pull-debit.ts          |  33 +-
 .../src/operations/pay-peer-push-credit.ts         | 123 ++-
 .../src/operations/pay-peer-push-debit.ts          | 231 +++---
 .../taler-wallet-core/src/operations/pending.ts    | 814 --------------------
 .../taler-wallet-core/src/operations/refresh.ts    | 155 +---
 .../taler-wallet-core/src/operations/reward.ts     |   6 -
 .../taler-wallet-core/src/operations/testing.ts    |  41 +-
 .../src/operations/transactions.ts                 | 442 ++++++++---
 .../taler-wallet-core/src/operations/withdraw.ts   | 233 +++---
 packages/taler-wallet-core/src/pending-types.ts    |  10 -
 packages/taler-wallet-core/src/shepherd.ts         | 851 +++++++++++++++++++++
 .../taler-wallet-core/src/util/coinSelection.ts    |   3 +-
 .../taler-wallet-core/src/util/promiseUtils.ts     |  40 +
 packages/taler-wallet-core/src/util/query.ts       |   2 +-
 packages/taler-wallet-core/src/wallet-api-types.ts |  15 -
 packages/taler-wallet-core/src/wallet.ts           | 244 +-----
 33 files changed, 2589 insertions(+), 2838 deletions(-)

diff --git a/packages/taler-harness/src/harness/harness.ts 
b/packages/taler-harness/src/harness/harness.ts
index 48f8450fd..975d73cf8 100644
--- a/packages/taler-harness/src/harness/harness.ts
+++ b/packages/taler-harness/src/harness/harness.ts
@@ -412,6 +412,17 @@ export class GlobalTestState {
       }
     }
   }
+
+  /**
+   * Log that the test arrived a certain step.
+   *
+   * The step name should be unique across the whole
+   */
+  logStep(stepName: string): void {
+    // Now we just log, later we may report the steps that were done
+    // to easily see where the test hangs.
+    console.info(`STEP: ${stepName}`);
+  }
 }
 
 export function shouldLingerInTest(): boolean {
diff --git a/packages/taler-harness/src/integrationtests/test-payment-fault.ts 
b/packages/taler-harness/src/integrationtests/test-payment-fault.ts
index af6751ef4..cadcc9056 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-fault.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-fault.ts
@@ -21,11 +21,7 @@
 /**
  * Imports.
  */
-import {
-  TalerCorebankApiClient,
-  CoreApiResponse,
-  MerchantApiClient,
-} from "@gnu-taler/taler-util";
+import { ConfirmPayResultType, MerchantApiClient } from 
"@gnu-taler/taler-util";
 import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
 import { defaultCoinConfig } from "../harness/denomStructures.js";
 import {
@@ -38,10 +34,13 @@ import {
   ExchangeService,
   GlobalTestState,
   MerchantService,
-  WalletCli,
   generateRandomPayto,
   setupDb,
 } from "../harness/harness.js";
+import {
+  createWalletDaemonWithClient,
+  withdrawViaBankV2,
+} from "../harness/helpers.js";
 
 /**
  * Run test for basic, bank-integrated withdrawal.
@@ -123,45 +122,20 @@ export async function runPaymentFaultTest(t: 
GlobalTestState) {
 
   console.log("setup done!");
 
-  const wallet = new WalletCli(t);
-
-  // Create withdrawal operation
-
-  const bankClient = new TalerCorebankApiClient(bank.corebankApiBaseUrl);
-
-  const user = await bankClient.createRandomBankUser();
-  const wop = await bankClient.createWithdrawalOperation(
-    user.username,
-    "TESTKUDOS:20",
-  );
-
-  // Hand it to the wallet
-
-  await wallet.client.call(WalletApiOperation.GetWithdrawalDetailsForUri, {
-    talerWithdrawUri: wop.taler_withdraw_uri,
-  });
-
-  await wallet.runPending();
-
-  // Withdraw
-
-  await wallet.client.call(WalletApiOperation.AcceptBankIntegratedWithdrawal, {
-    exchangeBaseUrl: faultyExchange.baseUrl,
-    talerWithdrawUri: wop.taler_withdraw_uri,
+  const { walletClient } = await createWalletDaemonWithClient(t, {
+    name: "default",
   });
-  await wallet.runPending();
 
-  // Confirm it
+  await walletClient.call(WalletApiOperation.GetBalances, {});
 
-  await bankClient.confirmWithdrawalOperation(user.username, {
-    withdrawalOperationId: wop.withdrawal_id,
+  const wres = await withdrawViaBankV2(t, {
+    walletClient,
+    bank,
+    exchange: faultyExchange,
+    amount: "TESTKUDOS:20",
   });
 
-  await wallet.runUntilDone();
-
-  // Check balance
-
-  await wallet.client.call(WalletApiOperation.GetBalances, {});
+  await wres.withdrawalFinishedCond;
 
   // Set up order.
 
@@ -181,24 +155,22 @@ export async function runPaymentFaultTest(t: 
GlobalTestState) {
 
   // Make wallet pay for the order
 
-  let apiResp: CoreApiResponse;
-
-  const prepResp = await wallet.client.call(
+  const prepResp = await walletClient.call(
     WalletApiOperation.PreparePayForUri,
     {
       talerPayUri: orderStatus.taler_pay_uri,
     },
   );
 
-  const proposalId = prepResp.proposalId;
-
-  await wallet.runPending();
-
   // Drop 3 responses from the exchange.
   let faultCount = 0;
   faultyExchange.faultProxy.addFault({
     async modifyResponse(ctx: FaultInjectionResponseContext) {
-      if (!ctx.request.requestUrl.endsWith("/deposit")) {
+      console.log(`in modifyResponse for ${ctx.request.requestUrl}`);
+      if (
+        !ctx.request.requestUrl.endsWith("/deposit") &&
+        !ctx.request.requestUrl.endsWith("/batch-deposit")
+      ) {
         return;
       }
       if (faultCount < 3) {
@@ -213,12 +185,16 @@ export async function runPaymentFaultTest(t: 
GlobalTestState) {
 
   // confirmPay won't work, as the exchange is unreachable
 
-  await wallet.client.call(WalletApiOperation.ConfirmPay, {
-    // FIXME: should be validated, don't cast!
-    proposalId: proposalId,
-  });
+  const confirmPayResp = await walletClient.call(
+    WalletApiOperation.ConfirmPay,
+    {
+      transactionId: prepResp.transactionId,
+    },
+  );
+
+  t.assertDeepEqual(confirmPayResp.type, ConfirmPayResultType.Pending);
 
-  await wallet.runUntilDone();
+  await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
 
   // Check if payment was successful.
 
diff --git a/packages/taler-harness/src/integrationtests/test-payment-share.ts 
b/packages/taler-harness/src/integrationtests/test-payment-share.ts
index ef4f8adeb..034bbc98d 100644
--- a/packages/taler-harness/src/integrationtests/test-payment-share.ts
+++ b/packages/taler-harness/src/integrationtests/test-payment-share.ts
@@ -65,6 +65,8 @@ export async function runPaymentShareTest(t: GlobalTestState) 
{
   });
   await secondWallet.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
 
+  t.logStep("setup-done");
+
   // create two orders to pay
   async function createOrder(amount: string) {
     const order = {
@@ -74,7 +76,6 @@ export async function runPaymentShareTest(t: GlobalTestState) 
{
     };
 
     const args = { order };
-    const auth = {};
 
     const orderResp = await merchantClient.createOrder({
       order: args.order,
@@ -88,6 +89,8 @@ export async function runPaymentShareTest(t: GlobalTestState) 
{
     return { id: orderResp.order_id, uri: orderStatus.taler_pay_uri };
   }
 
+  t.logStep("orders-created");
+
   /**
    * FIRST CASE, create in first wallet and pay in the second wallet
    * first wallet should not be able to continue
@@ -104,6 +107,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimFirstWallet.status === PreparePayResultType.PaymentPossible,
     );
 
+    t.logStep("w1-payment-possible");
+
     // share order from the first wallet
     const { privatePayUri } = await firstWallet.call(
       WalletApiOperation.SharePayment,
@@ -113,6 +118,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       },
     );
 
+    t.logStep("w1-payment-shared");
+
     // claim from the second wallet
     const claimSecondWallet = await secondWallet.call(
       WalletApiOperation.PreparePayForUri,
@@ -123,18 +130,25 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimSecondWallet.status === PreparePayResultType.PaymentPossible,
     );
 
+    t.logStep("w2-claimed");
+
     // pay from the second wallet
     const r2 = await secondWallet.call(WalletApiOperation.ConfirmPay, {
-      proposalId: claimSecondWallet.proposalId,
+      transactionId: claimSecondWallet.transactionId,
     });
 
+    t.assertTrue(r2.type === ConfirmPayResultType.Done);
+
+    t.logStep("w2-confirmed");
+
     // Wait for refresh to settle before we do checks
     await secondWallet.call(
       WalletApiOperation.TestingWaitTransactionsFinal,
       {},
     );
 
-    t.assertTrue(r2.type === ConfirmPayResultType.Done);
+    t.logStep("w2-refresh-settled");
+
     {
       const first = await firstWallet.call(WalletApiOperation.GetBalances, {});
       const second = await secondWallet.call(
@@ -155,11 +169,16 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimFirstWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
     );
 
+    t.logStep("w1-prepared-again");
+
     const r1 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
-      proposalId: claimFirstWallet.proposalId,
+      transactionId: claimFirstWallet.transactionId,
     });
 
     t.assertTrue(r1.type === ConfirmPayResultType.Done);
+
+    t.logStep("w1-confirmed-shared");
+
     {
       const first = await firstWallet.call(WalletApiOperation.GetBalances, {});
       const second = await secondWallet.call(
@@ -171,6 +190,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
     }
   }
 
+  t.logStep("first-case-done");
+
   /**
    * SECOND CASE, create in first wallet and share to the second wallet
    * pay with the first wallet, second wallet should not be able to continue
@@ -208,7 +229,7 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
 
     // pay from the second wallet
     const r2 = await firstWallet.call(WalletApiOperation.ConfirmPay, {
-      proposalId: claimFirstWallet.proposalId,
+      transactionId: claimFirstWallet.transactionId,
     });
 
     t.assertTrue(r2.type === ConfirmPayResultType.Done);
@@ -232,6 +253,8 @@ export async function runPaymentShareTest(t: 
GlobalTestState) {
       claimSecondWalletAgain.status === PreparePayResultType.AlreadyConfirmed,
     );
   }
+
+  t.logStep("second-case-done");
 }
 
 runPaymentShareTest.suites = ["wallet"];
diff --git a/packages/taler-harness/src/integrationtests/test-peer-repair.ts 
b/packages/taler-harness/src/integrationtests/test-peer-repair.ts
index a225a2057..22664bcc1 100644
--- a/packages/taler-harness/src/integrationtests/test-peer-repair.ts
+++ b/packages/taler-harness/src/integrationtests/test-peer-repair.ts
@@ -22,21 +22,19 @@ import {
   AmountString,
   Duration,
   NotificationType,
-  TalerUriAction,
   TransactionMajorState,
   TransactionMinorState,
   TransactionType,
   WalletNotification,
-  stringifyTalerUri,
 } from "@gnu-taler/taler-util";
 import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
+import * as fs from "node:fs";
 import { GlobalTestState } from "../harness/harness.js";
 import {
   createSimpleTestkudosEnvironmentV2,
   createWalletDaemonWithClient,
   withdrawViaBankV2,
 } from "../harness/helpers.js";
-import * as fs from "node:fs";
 
 export async function runPeerRepairTest(t: GlobalTestState) {
   // Set up test environment
diff --git 
a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts 
b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
index e8d34e288..b61a3941b 100644
--- a/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
+++ b/packages/taler-harness/src/integrationtests/test-peer-to-peer-pull.ts
@@ -33,7 +33,6 @@ import {
   BankServiceHandle,
   ExchangeService,
   GlobalTestState,
-  WalletCli,
   WalletClient,
 } from "../harness/harness.js";
 import {
diff --git 
a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts 
b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
index def2462e0..3c47f30db 100644
--- a/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
+++ b/packages/taler-harness/src/integrationtests/test-timetravel-autorefresh.ts
@@ -22,6 +22,7 @@ import {
   Duration,
   durationFromSpec,
   MerchantApiClient,
+  NotificationType,
   PreparePayResultType,
 } from "@gnu-taler/taler-util";
 import {
@@ -124,6 +125,12 @@ export async function runTimetravelAutorefreshTest(t: 
GlobalTestState) {
   });
   await wres.withdrawalFinishedCond;
 
+  const exchangeUpdated1Cond = walletClient.waitForNotificationCond(
+    (x) =>
+      x.type === NotificationType.ExchangeStateTransition &&
+      x.exchangeBaseUrl === exchange.baseUrl,
+  );
+
   // Travel into the future, the deposit expiration is two years
   // into the future.
   console.log("applying first time travel");
@@ -142,7 +149,8 @@ export async function runTimetravelAutorefreshTest(t: 
GlobalTestState) {
   console.log("pending operations after first time travel");
   console.log(JSON.stringify(p, undefined, 2));
 
-  await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {});
+  // The time travel should cause exchanges to update.
+  await exchangeUpdated1Cond;
   await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
 
   const wres2 = await withdrawViaBankV2(t, {
@@ -155,6 +163,12 @@ export async function runTimetravelAutorefreshTest(t: 
GlobalTestState) {
 
   await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
 
+  const exchangeUpdated2Cond = walletClient.waitForNotificationCond(
+    (x) =>
+      x.type === NotificationType.ExchangeStateTransition &&
+      x.exchangeBaseUrl === exchange.baseUrl,
+  );
+
   // Travel into the future, the deposit expiration is two years
   // into the future.
   console.log("applying second time travel");
@@ -167,7 +181,8 @@ export async function runTimetravelAutorefreshTest(t: 
GlobalTestState) {
     },
   );
 
-  await walletClient.call(WalletApiOperation.TestingWaitTasksProcessed, {});
+  // The time travel should cause exchanges to update.
+  await exchangeUpdated2Cond;
   await walletClient.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
 
   // At this point, the original coins should've been refreshed.
diff --git a/packages/taler-util/src/time.ts b/packages/taler-util/src/time.ts
index c677d52ae..5702b2947 100644
--- a/packages/taler-util/src/time.ts
+++ b/packages/taler-util/src/time.ts
@@ -21,7 +21,7 @@
 /**
  * Imports.
  */
-import { Codec, renderContext, Context } from "./codec.js";
+import { Codec, Context, renderContext } from "./codec.js";
 
 declare const flavor_AbsoluteTime: unique symbol;
 declare const flavor_TalerProtocolTimestamp: unique symbol;
@@ -412,6 +412,10 @@ export namespace AbsoluteTime {
     return cmp(t, now()) <= 0;
   }
 
+  export function isNever(t: AbsoluteTime): boolean {
+    return t.t_ms === "never";
+  }
+
   export function fromProtocolTimestamp(
     t: TalerProtocolTimestamp,
   ): AbsoluteTime {
@@ -503,6 +507,23 @@ export namespace AbsoluteTime {
     return { t_ms: t1.t_ms + d.d_ms, [opaque_AbsoluteTime]: true };
   }
 
+  /**
+   * Get the remaining duration until {@param t1}.
+   *
+   * If {@param t1} already happened, the remaining duration
+   * is zero.
+   */
+  export function remaining(t1: AbsoluteTime): Duration {
+    if (t1.t_ms === "never") {
+      return Duration.getForever();
+    }
+    const stampNow = now();
+    if (stampNow.t_ms === "never") {
+      throw Error("invariant violated");
+    }
+    return Duration.fromMilliseconds(Math.max(0, t1.t_ms - stampNow.t_ms));
+  }
+
   export function subtractDuraction(
     t1: AbsoluteTime,
     d: Duration,
diff --git a/packages/taler-util/src/wallet-types.ts 
b/packages/taler-util/src/wallet-types.ts
index 0749df9f9..b79bfe4fe 100644
--- a/packages/taler-util/src/wallet-types.ts
+++ b/packages/taler-util/src/wallet-types.ts
@@ -71,12 +71,10 @@ import {
 } from "./taler-types.js";
 import {
   AbsoluteTime,
-  Duration,
   TalerPreciseTimestamp,
   TalerProtocolDuration,
   TalerProtocolTimestamp,
   codecForAbsoluteTime,
-  codecForDuration,
   codecForTimestamp,
 } from "./time.js";
 import {
@@ -3062,3 +3060,13 @@ export const codecForRemoveGlobalCurrencyAuditorRequest =
       .property("auditorBaseUrl", codecForString())
       .property("auditorPub", codecForString())
       .build("RemoveGlobalCurrencyAuditorRequest");
+
+export interface RetryLoopOpts {
+  /**
+   * Stop the retry loop when all lifeness-giving pending operations
+   * are done.
+   *
+   * Defaults to false.
+   */
+  stopWhenDone?: boolean;
+}
diff --git a/packages/taler-wallet-cli/src/index.ts 
b/packages/taler-wallet-cli/src/index.ts
index 91dcd2702..f81236cd4 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -284,9 +284,6 @@ async function createLocalWallet(
     console.error("Operation failed: " + summarizeTalerErrorDetail(ed));
     console.error("Error details:", JSON.stringify(ed, undefined, 2));
     processExit(1);
-  } finally {
-    logger.trace("operation with wallet finished, stopping");
-    logger.trace("stopped wallet");
   }
 }
 
@@ -343,6 +340,7 @@ async function withLocalWallet<T>(
   const wh = await createLocalWallet(walletCliArgs);
   const w = wh.wallet;
   const res = await f({ client: w.client, ws: w });
+  logger.info("Work done, stopping wallet.");
   w.stop();
   return res;
 }
@@ -956,7 +954,6 @@ depositCli
         },
       );
       console.log(`Created deposit ${resp.depositGroupId}`);
-      await wallet.ws.runPending();
     });
   });
 
@@ -1231,9 +1228,9 @@ advancedCli
     help: "Run pending operations.",
   })
   .action(async (args) => {
-    await withLocalWallet(args, async (wallet) => {
-      await wallet.ws.runPending();
-    });
+    logger.error(
+      "Subcommand run-pending not supported anymore.  Please use 
run-until-done or the client/server wallet.",
+    );
   });
 
 advancedCli
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts 
b/packages/taler-wallet-core/src/internal-wallet-state.ts
index fdf04a65f..4379f20b5 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -31,16 +31,14 @@
  */
 import { IDBFactory } from "@gnu-taler/idb-bridge";
 import {
-  CoinRefreshRequest,
   DenominationInfo,
-  RefreshGroupId,
-  RefreshReason,
   TransactionState,
   WalletNotification,
 } from "@gnu-taler/taler-util";
 import { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
 import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
 import { WalletStoresV1 } from "./db.js";
+import { TaskScheduler } from "./shepherd.js";
 import { AsyncCondition } from "./util/promiseUtils.js";
 import {
   DbAccess,
@@ -78,12 +76,6 @@ export interface RecoupOperations {
 
 export type NotificationListener = (n: WalletNotification) => void;
 
-export interface ActiveLongpollInfo {
-  [opId: string]: {
-    cancel: () => void;
-  };
-}
-
 export type CancelFn = () => void;
 
 /**
@@ -94,11 +86,6 @@ export type CancelFn = () => void;
  * as it's an opaque implementation detail.
  */
 export interface InternalWalletState {
-  /**
-   * Active longpoll operations.
-   */
-  activeLongpoll: ActiveLongpollInfo;
-
   cryptoApi: TalerCryptoInterface;
 
   timerGroup: TimerGroup;
@@ -106,13 +93,7 @@ export interface InternalWalletState {
 
   config: Readonly<WalletConfig>;
 
-  /**
-   * Asynchronous condition to interrupt the sleep of the
-   * retry loop.
-   *
-   * Used to allow processing of new work faster.
-   */
-  workAvailable: AsyncCondition;
+  taskScheduler: TaskScheduler;
 
   listeners: NotificationListener[];
 
diff --git a/packages/taler-wallet-core/src/operations/README.md 
b/packages/taler-wallet-core/src/operations/README.md
deleted file mode 100644
index a40349d37..000000000
--- a/packages/taler-wallet-core/src/operations/README.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# Wallet Operations
-
-This folder contains the implementations for all wallet operations that 
operate on the wallet state.
-
-To avoid cyclic dependencies, these files must **not** reference each other. 
Instead, other operations should only be accessed via injected dependencies.
-
-Avoiding cyclic dependencies is important for module bundlers.
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts 
b/packages/taler-wallet-core/src/operations/backup/index.ts
index 7a2771c57..e4e4e43f6 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -30,12 +30,10 @@ import {
   AttentionType,
   BackupRecovery,
   Codec,
-  DenomKeyType,
   EddsaKeyPair,
   HttpStatusCode,
   Logger,
   PreparePayResult,
-  PreparePayResultType,
   RecoveryLoadRequest,
   RecoveryMergeStrategy,
   TalerError,
@@ -61,11 +59,9 @@ import {
   encodeCrock,
   getRandomBytes,
   hash,
-  hashDenomPub,
   j2s,
   kdf,
   notEmpty,
-  rsaBlind,
   secretbox,
   secretbox_open,
   stringToBytes,
@@ -75,7 +71,6 @@ import {
   readTalerErrorResponse,
 } from "@gnu-taler/taler-util/http";
 import { gunzipSync, gzipSync } from "fflate";
-import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js";
 import {
   BackupProviderRecord,
   BackupProviderState,
@@ -84,25 +79,23 @@ import {
   ConfigRecord,
   ConfigRecordKey,
   WalletBackupConfState,
+  WalletStoresV1,
   timestampOptionalPreciseFromDb,
-  timestampPreciseFromDb,
   timestampPreciseToDb,
 } from "../../db.js";
 import { InternalWalletState } from "../../internal-wallet-state.js";
-import { assertUnreachable } from "../../util/assertUnreachable.js";
 import {
   checkDbInvariant,
   checkLogicInvariant,
 } from "../../util/invariants.js";
+import { GetReadOnlyAccess } from "../../util/query.js";
 import { addAttentionRequest, removeAttentionRequest } from "../attention.js";
 import {
+  TaskIdentifiers,
   TaskRunResult,
   TaskRunResultType,
-  TaskIdentifiers,
 } from "../common.js";
-import { checkPaymentByProposalId, preparePayForUri } from 
"../pay-merchant.js";
-import { WalletStoresV1 } from "../../db.js";
-import { GetReadOnlyAccess } from "../../util/query.js";
+import { preparePayForUri } from "../pay-merchant.js";
 
 const logger = new Logger("operations/backup.ts");
 
@@ -318,9 +311,10 @@ async function runBackupCycleForProvider(
           await tx.backupProviders.put(prov);
         });
 
-      return {
-        type: TaskRunResultType.Pending,
-      };
+      throw Error("not implemented");
+      // return {
+      //   type: TaskRunResultType.Pending,
+      // };
     }
     const result = res;
 
@@ -352,9 +346,10 @@ async function runBackupCycleForProvider(
       provider.baseUrl,
     );
 
-    return {
-      type: TaskRunResultType.Pending,
-    };
+    throw Error("not implemented");
+    // return {
+    //   type: TaskRunResultType.Pending,
+    // };
   }
 
   if (resp.status === HttpStatusCode.NoContent) {
@@ -658,30 +653,27 @@ async function runFirstBackupCycleForProvider(
   ws: InternalWalletState,
   args: BackupForProviderArgs,
 ): Promise<AddBackupProviderResponse> {
-  const resp = await runBackupCycleForProvider(ws, args);
-  switch (resp.type) {
-    case TaskRunResultType.Error:
-      throw TalerError.fromDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        resp.errorDetail as any, //FIXME create an error for backup problems
-      );
-    case TaskRunResultType.Finished:
-      return {
-        status: "ok",
-      };
-    case TaskRunResultType.Longpoll:
-      throw Error(
-        "unexpected runFirstBackupCycleForProvider result (longpoll)",
-      );
-    case TaskRunResultType.Pending:
-      return {
-        status: "payment-required",
-        talerUri: "FIXME",
-        //talerUri: resp.result.talerUri,
-      };
-    default:
-      assertUnreachable(resp);
-  }
+  throw Error("not implemented");
+  // const resp = await runBackupCycleForProvider(ws, args);
+  // switch (resp.type) {
+  //   case TaskRunResultType.Error:
+  //     throw TalerError.fromDetail(
+  //       TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+  //       resp.errorDetail as any, //FIXME create an error for backup problems
+  //     );
+  //   case TaskRunResultType.Finished:
+  //     return {
+  //       status: "ok",
+  //     };
+  //   case TaskRunResultType.Pending:
+  //     return {
+  //       status: "payment-required",
+  //       talerUri: "FIXME",
+  //       //talerUri: resp.result.talerUri,
+  //     };
+  //   default:
+  //     assertUnreachable(resp);
+  // }
 }
 
 export async function restoreFromRecoverySecret(): Promise<void> {
@@ -780,51 +772,52 @@ async function getProviderPaymentInfo(
   ws: InternalWalletState,
   provider: BackupProviderRecord,
 ): Promise<ProviderPaymentStatus> {
-  if (!provider.currentPaymentProposalId) {
-    return {
-      type: ProviderPaymentType.Unpaid,
-    };
-  }
-  const status = await checkPaymentByProposalId(
-    ws,
-    provider.currentPaymentProposalId,
-  ).catch(() => undefined);
-
-  if (!status) {
-    return {
-      type: ProviderPaymentType.Unpaid,
-    };
-  }
-
-  switch (status.status) {
-    case PreparePayResultType.InsufficientBalance:
-      return {
-        type: ProviderPaymentType.InsufficientBalance,
-        amount: status.amountRaw,
-      };
-    case PreparePayResultType.PaymentPossible:
-      return {
-        type: ProviderPaymentType.Pending,
-        talerUri: status.talerUri,
-      };
-    case PreparePayResultType.AlreadyConfirmed:
-      if (status.paid) {
-        return {
-          type: ProviderPaymentType.Paid,
-          paidUntil: AbsoluteTime.addDuration(
-            AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp),
-            durationFromSpec({ years: 1 }), //FIXME: take this from the 
contract term
-          ),
-        };
-      } else {
-        return {
-          type: ProviderPaymentType.Pending,
-          talerUri: status.talerUri,
-        };
-      }
-    default:
-      assertUnreachable(status);
-  }
+  throw Error("not implemented");
+  // if (!provider.currentPaymentProposalId) {
+  //   return {
+  //     type: ProviderPaymentType.Unpaid,
+  //   };
+  // }
+  // const status = await checkPaymentByProposalId(
+  //   ws,
+  //   provider.currentPaymentProposalId,
+  // ).catch(() => undefined);
+
+  // if (!status) {
+  //   return {
+  //     type: ProviderPaymentType.Unpaid,
+  //   };
+  // }
+
+  // switch (status.status) {
+  //   case PreparePayResultType.InsufficientBalance:
+  //     return {
+  //       type: ProviderPaymentType.InsufficientBalance,
+  //       amount: status.amountRaw,
+  //     };
+  //   case PreparePayResultType.PaymentPossible:
+  //     return {
+  //       type: ProviderPaymentType.Pending,
+  //       talerUri: status.talerUri,
+  //     };
+  //   case PreparePayResultType.AlreadyConfirmed:
+  //     if (status.paid) {
+  //       return {
+  //         type: ProviderPaymentType.Paid,
+  //         paidUntil: AbsoluteTime.addDuration(
+  //           
AbsoluteTime.fromProtocolTimestamp(status.contractTerms.timestamp),
+  //           durationFromSpec({ years: 1 }), //FIXME: take this from the 
contract term
+  //         ),
+  //       };
+  //     } else {
+  //       return {
+  //         type: ProviderPaymentType.Pending,
+  //         talerUri: status.talerUri,
+  //       };
+  //     }
+  //   default:
+  //     assertUnreachable(status);
+  // }
 }
 
 /**
diff --git a/packages/taler-wallet-core/src/operations/common.ts 
b/packages/taler-wallet-core/src/operations/common.ts
index 4c7c55212..92950b35b 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -21,7 +21,6 @@ import {
   AbsoluteTime,
   AmountJson,
   Amounts,
-  CancellationToken,
   CoinRefreshRequest,
   CoinStatus,
   Duration,
@@ -29,22 +28,15 @@ import {
   ExchangeEntryStatus,
   ExchangeTosStatus,
   ExchangeUpdateStatus,
-  getErrorDetailFromException,
-  j2s,
   Logger,
-  makeErrorDetail,
-  NotificationType,
   RefreshReason,
-  TalerError,
-  TalerErrorCode,
   TalerErrorDetail,
   TalerPreciseTimestamp,
+  TalerProtocolTimestamp,
   TombstoneIdStr,
   TransactionIdStr,
-  TransactionType,
-  WalletNotification,
+  durationMul,
 } from "@gnu-taler/taler-util";
-import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
 import {
   BackupProviderRecord,
   CoinRecord,
@@ -61,17 +53,16 @@ import {
   RecoupGroupRecord,
   RefreshGroupRecord,
   RewardRecord,
-  timestampPreciseToDb,
   WalletStoresV1,
   WithdrawalGroupRecord,
+  timestampPreciseToDb,
 } from "../db.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
 import { PendingTaskType, TaskId } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
 import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
+import { GetReadWriteAccess } from "../util/query.js";
 import { createRefreshGroup } from "./refresh.js";
-import { constructTransactionIdentifier } from "./transactions.js";
 
 const logger = new Logger("operations/common.ts");
 
@@ -251,331 +242,6 @@ export async function spendCoins(
   );
 }
 
-/**
- * Convert the task ID for a task that processes a transaction int
- * the ID for the transaction.
- */
-function convertTaskToTransactionId(
-  taskId: string,
-): TransactionIdStr | undefined {
-  const parsedTaskId = parseTaskIdentifier(taskId);
-  switch (parsedTaskId.tag) {
-    case PendingTaskType.PeerPullCredit:
-      return constructTransactionIdentifier({
-        tag: TransactionType.PeerPullCredit,
-        pursePub: parsedTaskId.pursePub,
-      });
-    case PendingTaskType.PeerPullDebit:
-      return constructTransactionIdentifier({
-        tag: TransactionType.PeerPullDebit,
-        peerPullDebitId: parsedTaskId.peerPullDebitId,
-      });
-    // FIXME: This doesn't distinguish internal-withdrawal.
-    // Maybe we should have a different task type for that as well?
-    // Or maybe transaction IDs should be valid task identifiers?
-    case PendingTaskType.Withdraw:
-      return constructTransactionIdentifier({
-        tag: TransactionType.Withdrawal,
-        withdrawalGroupId: parsedTaskId.withdrawalGroupId,
-      });
-    case PendingTaskType.PeerPushCredit:
-      return constructTransactionIdentifier({
-        tag: TransactionType.PeerPushCredit,
-        peerPushCreditId: parsedTaskId.peerPushCreditId,
-      });
-    case PendingTaskType.Deposit:
-      return constructTransactionIdentifier({
-        tag: TransactionType.Deposit,
-        depositGroupId: parsedTaskId.depositGroupId,
-      });
-    case PendingTaskType.Refresh:
-      return constructTransactionIdentifier({
-        tag: TransactionType.Refresh,
-        refreshGroupId: parsedTaskId.refreshGroupId,
-      });
-    case PendingTaskType.RewardPickup:
-      return constructTransactionIdentifier({
-        tag: TransactionType.Reward,
-        walletRewardId: parsedTaskId.walletRewardId,
-      });
-    case PendingTaskType.PeerPushDebit:
-      return constructTransactionIdentifier({
-        tag: TransactionType.PeerPushDebit,
-        pursePub: parsedTaskId.pursePub,
-      });
-    case PendingTaskType.Purchase:
-      return constructTransactionIdentifier({
-        tag: TransactionType.Payment,
-        proposalId: parsedTaskId.proposalId,
-      });
-    default:
-      return undefined;
-  }
-}
-
-async function makeTransactionRetryNotification(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
-  pendingTaskId: string,
-  e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
-  const txId = convertTaskToTransactionId(pendingTaskId);
-  if (!txId) {
-    return undefined;
-  }
-  const txState = await ws.getTransactionState(ws, tx, txId);
-  if (!txState) {
-    return undefined;
-  }
-  const notif: WalletNotification = {
-    type: NotificationType.TransactionStateTransition,
-    transactionId: txId,
-    oldTxState: txState,
-    newTxState: txState,
-  };
-  if (e) {
-    notif.errorInfo = {
-      code: e.code as number,
-      hint: e.hint,
-    };
-  }
-  return notif;
-}
-
-async function makeExchangeRetryNotification(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
-  pendingTaskId: string,
-  e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
-  logger.info("making exchange retry notification");
-  const parsedTaskId = parseTaskIdentifier(pendingTaskId);
-  if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
-    throw Error("invalid task identifier");
-  }
-  const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
-
-  if (!rec) {
-    logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
-    return undefined;
-  }
-
-  const notif: WalletNotification = {
-    type: NotificationType.ExchangeStateTransition,
-    exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
-    oldExchangeState: getExchangeState(rec),
-    newExchangeState: getExchangeState(rec),
-  };
-  if (e) {
-    notif.errorInfo = {
-      code: e.code as number,
-      hint: e.hint,
-    };
-  }
-  return notif;
-}
-
-/**
- * Generate an appropriate error transition notification
- * for applicable tasks.
- *
- * Namely, transition notifications are generated for:
- * - exchange update errors
- * - transactions
- */
-async function taskToRetryNotification(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
-  pendingTaskId: string,
-  e: TalerErrorDetail | undefined,
-): Promise<WalletNotification | undefined> {
-  const parsedTaskId = parseTaskIdentifier(pendingTaskId);
-
-  switch (parsedTaskId.tag) {
-    case PendingTaskType.ExchangeUpdate:
-      return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
-    case PendingTaskType.PeerPullCredit:
-    case PendingTaskType.PeerPullDebit:
-    case PendingTaskType.Withdraw:
-    case PendingTaskType.PeerPushCredit:
-    case PendingTaskType.Deposit:
-    case PendingTaskType.Refresh:
-    case PendingTaskType.RewardPickup:
-    case PendingTaskType.PeerPushDebit:
-    case PendingTaskType.Purchase:
-      return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
-    case PendingTaskType.Backup:
-    case PendingTaskType.ExchangeCheckRefresh:
-    case PendingTaskType.Recoup:
-      return undefined;
-  }
-}
-
-async function storePendingTaskError(
-  ws: InternalWalletState,
-  pendingTaskId: string,
-  e: TalerErrorDetail,
-): Promise<void> {
-  logger.info(`storing pending task error for ${pendingTaskId}`);
-  const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
-    let retryRecord = await tx.operationRetries.get(pendingTaskId);
-    if (!retryRecord) {
-      retryRecord = {
-        id: pendingTaskId,
-        lastError: e,
-        retryInfo: DbRetryInfo.reset(),
-      };
-    } else {
-      retryRecord.lastError = e;
-      retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
-    }
-    await tx.operationRetries.put(retryRecord);
-    return taskToRetryNotification(ws, tx, pendingTaskId, e);
-  });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
-  }
-}
-
-export async function resetPendingTaskTimeout(
-  ws: InternalWalletState,
-  pendingTaskId: string,
-): Promise<void> {
-  const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
-    let retryRecord = await tx.operationRetries.get(pendingTaskId);
-    if (retryRecord) {
-      // Note that we don't reset the lastError, it should still be visible
-      // while the retry runs.
-      retryRecord.retryInfo = DbRetryInfo.reset();
-      await tx.operationRetries.put(retryRecord);
-    }
-    return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
-  });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
-  }
-}
-
-async function storePendingTaskPending(
-  ws: InternalWalletState,
-  pendingTaskId: string,
-): Promise<void> {
-  const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
-    let retryRecord = await tx.operationRetries.get(pendingTaskId);
-    let hadError = false;
-    if (!retryRecord) {
-      retryRecord = {
-        id: pendingTaskId,
-        retryInfo: DbRetryInfo.reset(),
-      };
-    } else {
-      if (retryRecord.lastError) {
-        hadError = true;
-      }
-      delete retryRecord.lastError;
-      retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
-    }
-    await tx.operationRetries.put(retryRecord);
-    if (hadError) {
-      return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
-    } else {
-      return undefined;
-    }
-  });
-  if (maybeNotification) {
-    ws.notify(maybeNotification);
-  }
-}
-
-async function storePendingTaskFinished(
-  ws: InternalWalletState,
-  pendingTaskId: string,
-): Promise<void> {
-  await ws.db
-    .mktx((x) => [x.operationRetries])
-    .runReadWrite(async (tx) => {
-      await tx.operationRetries.delete(pendingTaskId);
-    });
-}
-
-export async function runTaskWithErrorReporting(
-  ws: InternalWalletState,
-  opId: TaskId,
-  f: () => Promise<TaskRunResult>,
-): Promise<TaskRunResult> {
-  let maybeError: TalerErrorDetail | undefined;
-  try {
-    const resp = await f();
-    switch (resp.type) {
-      case TaskRunResultType.Error:
-        await storePendingTaskError(ws, opId, resp.errorDetail);
-        return resp;
-      case TaskRunResultType.Finished:
-        await storePendingTaskFinished(ws, opId);
-        return resp;
-      case TaskRunResultType.Pending:
-        await storePendingTaskPending(ws, opId);
-        return resp;
-      case TaskRunResultType.Longpoll:
-        return resp;
-    }
-  } catch (e) {
-    if (e instanceof CryptoApiStoppedError) {
-      if (ws.stopped) {
-        logger.warn("crypto API stopped during shutdown, ignoring error");
-        return {
-          type: TaskRunResultType.Error,
-          errorDetail: makeErrorDetail(
-            TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-            {},
-            "Crypto API stopped during shutdown",
-          ),
-        };
-      }
-    }
-    if (e instanceof TalerError) {
-      logger.warn("operation processed resulted in error");
-      logger.warn(`error was: ${j2s(e.errorDetail)}`);
-      maybeError = e.errorDetail;
-      await storePendingTaskError(ws, opId, maybeError!);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: e.errorDetail,
-      };
-    } else if (e instanceof Error) {
-      // This is a bug, as we expect pending operations to always
-      // do their own error handling and only throw 
WALLET_PENDING_OPERATION_FAILED
-      // or return something.
-      logger.error(`Uncaught exception: ${e.message}`);
-      logger.error(`Stack: ${e.stack}`);
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {
-          stack: e.stack,
-        },
-        `unexpected exception (message: ${e.message})`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    } else {
-      logger.error("Uncaught exception, value is not even an error.");
-      maybeError = makeErrorDetail(
-        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
-        {},
-        `unexpected exception (not even an error)`,
-      );
-      await storePendingTaskError(ws, opId, maybeError);
-      return {
-        type: TaskRunResultType.Error,
-        errorDetail: maybeError,
-      };
-    }
-  }
-}
-
 export enum TombstoneTag {
   DeleteWithdrawalGroup = "delete-withdrawal-group",
   DeleteReserve = "delete-reserve",
@@ -646,47 +312,6 @@ export function getExchangeState(r: ExchangeEntryRecord): 
ExchangeEntryState {
   };
 }
 
-export interface LongpollResult {
-  ready: boolean;
-}
-
-export function runLongpollAsync(
-  ws: InternalWalletState,
-  retryTag: string,
-  reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
-): void {
-  const asyncFn = async () => {
-    if (ws.stopped) {
-      logger.trace("not long-polling reserve, wallet already stopped");
-      await storePendingTaskPending(ws, retryTag);
-      return;
-    }
-    const cts = CancellationToken.create();
-    let res: { ready: boolean } | undefined = undefined;
-    try {
-      ws.activeLongpoll[retryTag] = {
-        cancel: () => {
-          logger.trace("cancel of reserve longpoll requested");
-          cts.cancel();
-        },
-      };
-      res = await reqFn(cts.token);
-    } catch (e) {
-      const errDetail = getErrorDetailFromException(e);
-      logger.warn(`got error during long-polling: ${j2s(errDetail)}`);
-      await storePendingTaskError(ws, retryTag, errDetail);
-      return;
-    } finally {
-      delete ws.activeLongpoll[retryTag];
-    }
-    if (!res.ready) {
-      await storePendingTaskPending(ws, retryTag);
-    }
-    ws.workAvailable.trigger();
-  };
-  asyncFn();
-}
-
 export type ParsedTombstone =
   | {
       tag: TombstoneTag.DeleteWithdrawalGroup;
@@ -732,31 +357,53 @@ export interface TransactionManager {
 
 export enum TaskRunResultType {
   Finished = "finished",
-  Pending = "pending",
+  Backoff = "backoff",
+  Progress = "progress",
   Error = "error",
-  Longpoll = "longpoll",
+  ScheduleLater = "schedule-later",
 }
 
 export type TaskRunResult =
   | TaskRunFinishedResult
   | TaskRunErrorResult
-  | TaskRunLongpollResult
-  | TaskRunPendingResult;
+  | TaskRunBackoffResult
+  | TaskRunProgressResult
+  | TaskRunScheduleLaterResult;
 
 export namespace TaskRunResult {
+  /**
+   * Task is finished and does not need to be processed again.
+   */
   export function finished(): TaskRunResult {
     return {
       type: TaskRunResultType.Finished,
     };
   }
-  export function pending(): TaskRunResult {
+  /**
+   * Task is waiting for something, should be invoked
+   * again with exponentiall back-off until some other
+   * result is returned.
+   */
+  export function backoff(): TaskRunResult {
+    return {
+      type: TaskRunResultType.Backoff,
+    };
+  }
+  /**
+   * Task made progress and should be processed again.
+   */
+  export function progress(): TaskRunResult {
     return {
-      type: TaskRunResultType.Pending,
+      type: TaskRunResultType.Progress,
     };
   }
-  export function longpoll(): TaskRunResult {
+  /**
+   * Run the task again at a fixed time in the future.
+   */
+  export function runAgainAt(runAt: AbsoluteTime): TaskRunResult {
     return {
-      type: TaskRunResultType.Longpoll,
+      type: TaskRunResultType.ScheduleLater,
+      runAt,
     };
   }
 }
@@ -765,8 +412,17 @@ export interface TaskRunFinishedResult {
   type: TaskRunResultType.Finished;
 }
 
-export interface TaskRunPendingResult {
-  type: TaskRunResultType.Pending;
+export interface TaskRunBackoffResult {
+  type: TaskRunResultType.Backoff;
+}
+
+export interface TaskRunProgressResult {
+  type: TaskRunResultType.Progress;
+}
+
+export interface TaskRunScheduleLaterResult {
+  type: TaskRunResultType.ScheduleLater;
+  runAt: AbsoluteTime;
 }
 
 export interface TaskRunErrorResult {
@@ -774,10 +430,6 @@ export interface TaskRunErrorResult {
   errorDetail: TalerErrorDetail;
 }
 
-export interface TaskRunLongpollResult {
-  type: TaskRunResultType.Longpoll;
-}
-
 export interface DbRetryInfo {
   firstTry: DbPreciseTimestamp;
   nextRetry: DbPreciseTimestamp;
@@ -866,6 +518,24 @@ export namespace DbRetryInfo {
   }
 }
 
+/**
+ * Timestamp after which the wallet would do an auto-refresh.
+ */
+export function getAutoRefreshExecuteThreshold(d: {
+  stampExpireWithdraw: TalerProtocolTimestamp;
+  stampExpireDeposit: TalerProtocolTimestamp;
+}): AbsoluteTime {
+  const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+    d.stampExpireWithdraw,
+  );
+  const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+    d.stampExpireDeposit,
+  );
+  const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+  const deltaDiv = durationMul(delta, 0.5);
+  return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
+}
+
 /**
  * Parsed representation of task identifiers.
  */
@@ -877,7 +547,6 @@ export type ParsedTaskIdentifier =
   | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
   | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
   | { tag: PendingTaskType.Deposit; depositGroupId: string }
-  | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string }
   | { tag: PendingTaskType.PeerPullDebit; peerPullDebitId: string }
   | { tag: PendingTaskType.PeerPullCredit; pursePub: string }
   | { tag: PendingTaskType.PeerPushCredit; peerPushCreditId: string }
@@ -900,8 +569,6 @@ export function parseTaskIdentifier(x: string): 
ParsedTaskIdentifier {
       return { tag: type, backupProviderBaseUrl: decodeURIComponent(rest[0]) };
     case PendingTaskType.Deposit:
       return { tag: type, depositGroupId: rest[0] };
-    case PendingTaskType.ExchangeCheckRefresh:
-      return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
     case PendingTaskType.ExchangeUpdate:
       return { tag: type, exchangeBaseUrl: decodeURIComponent(rest[0]) };
     case PendingTaskType.PeerPullCredit:
@@ -933,8 +600,6 @@ export function constructTaskIdentifier(p: 
ParsedTaskIdentifier): TaskId {
       return `${p.tag}:${p.backupProviderBaseUrl}` as TaskId;
     case PendingTaskType.Deposit:
       return `${p.tag}:${p.depositGroupId}` as TaskId;
-    case PendingTaskType.ExchangeCheckRefresh:
-      return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
     case PendingTaskType.ExchangeUpdate:
       return `${p.tag}:${encodeURIComponent(p.exchangeBaseUrl)}` as TaskId;
     case PendingTaskType.PeerPullDebit:
@@ -974,11 +639,6 @@ export namespace TaskIdentifiers {
       exchBaseUrl,
     )}` as TaskId;
   }
-  export function forExchangeCheckRefresh(exch: ExchangeEntryRecord): TaskId {
-    return `${PendingTaskType.ExchangeCheckRefresh}:${encodeURIComponent(
-      exch.baseUrl,
-    )}` as TaskId;
-  }
   export function forTipPickup(tipRecord: RewardRecord): TaskId {
     return `${PendingTaskType.RewardPickup}:${tipRecord.walletRewardId}` as 
TaskId;
   }
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts 
b/packages/taler-wallet-core/src/operations/deposits.ts
index 3619ac4f4..38b5d43f0 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -48,6 +48,7 @@ import {
   TalerProtocolTimestamp,
   TrackTransaction,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -75,6 +76,7 @@ import {
   KycPendingInfo,
   PendingTaskType,
   RefreshOperationStatus,
+  TaskId,
   createRefreshGroup,
   getCandidateWithdrawalDenomsTx,
   getTotalRefreshCost,
@@ -90,7 +92,6 @@ import {
   TombstoneTag,
   TransactionContext,
   constructTaskIdentifier,
-  runLongpollAsync,
   spendCoins,
 } from "./common.js";
 import { getExchangeWireDetailsInTx } from "./exchanges.js";
@@ -103,7 +104,6 @@ import {
   constructTransactionIdentifier,
   notifyTransition,
   parseTransactionIdentifier,
-  stopLongpolling,
 } from "./transactions.js";
 
 /**
@@ -112,8 +112,8 @@ import {
 const logger = new Logger("deposits.ts");
 
 export class DepositTransactionContext implements TransactionContext {
-  private transactionId: string;
-  private retryTag: string;
+  readonly transactionId: TransactionIdStr;
+  readonly taskId: TaskId;
   constructor(
     public ws: InternalWalletState,
     public depositGroupId: string,
@@ -122,7 +122,7 @@ export class DepositTransactionContext implements 
TransactionContext {
       tag: TransactionType.Deposit,
       depositGroupId,
     });
-    this.retryTag = constructTaskIdentifier({
+    this.taskId = constructTaskIdentifier({
       tag: PendingTaskType.Deposit,
       depositGroupId,
     });
@@ -148,7 +148,7 @@ export class DepositTransactionContext implements 
TransactionContext {
   }
 
   async suspendTransaction(): Promise<void> {
-    const { ws, depositGroupId, transactionId, retryTag } = this;
+    const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.depositGroups])
       .runReadWrite(async (tx) => {
@@ -185,12 +185,12 @@ export class DepositTransactionContext implements 
TransactionContext {
           newTxState: computeDepositTransactionStatus(dg),
         };
       });
-    stopLongpolling(ws, retryTag);
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async abortTransaction(): Promise<void> {
-    const { ws, depositGroupId, transactionId, retryTag } = this;
+    const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.depositGroups])
       .runReadWrite(async (tx) => {
@@ -219,14 +219,13 @@ export class DepositTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    stopLongpolling(ws, retryTag);
-    // Need to process the operation again.
-    ws.workAvailable.trigger();
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async resumeTransaction(): Promise<void> {
-    const { ws, depositGroupId, transactionId, retryTag } = this;
+    const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.depositGroups])
       .runReadWrite(async (tx) => {
@@ -263,12 +262,12 @@ export class DepositTransactionContext implements 
TransactionContext {
           newTxState: computeDepositTransactionStatus(dg),
         };
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async failTransaction(): Promise<void> {
-    const { ws, depositGroupId, transactionId, retryTag } = this;
+    const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.depositGroups])
       .runReadWrite(async (tx) => {
@@ -294,7 +293,7 @@ export class DepositTransactionContext implements 
TransactionContext {
         return undefined;
       });
     // FIXME: Also cancel ongoing work (via cancellation token, once 
implemented)
-    stopLongpolling(ws, retryTag);
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 }
@@ -453,7 +452,7 @@ async function waitForRefreshOnDepositGroup(
     });
 
   notifyTransition(ws, transactionId, transitionInfo);
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 async function refundDepositGroup(
@@ -568,7 +567,7 @@ async function refundDepositGroup(
       await tx.depositGroups.put(newDg);
     });
 
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 async function processDepositGroupAborting(
@@ -588,6 +587,7 @@ async function processDepositGroupAborting(
 async function processDepositGroupPendingKyc(
   ws: InternalWalletState,
   depositGroup: DepositGroupRecord,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const { depositGroupId } = depositGroup;
   const transactionId = constructTransactionIdentifier({
@@ -606,51 +606,45 @@ async function processDepositGroupPendingKyc(
     throw Error("invalid DB state, in pending(kyc), but no kycInfo present");
   }
 
-  runLongpollAsync(ws, retryTag, async (ct) => {
-    const url = new URL(
-      `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
-      kycInfo.exchangeBaseUrl,
-    );
-    url.searchParams.set("timeout_ms", "10000");
-    logger.info(`kyc url ${url.href}`);
-    const kycStatusRes = await ws.http.fetch(url.href, {
-      method: "GET",
-      cancellationToken: ct,
-    });
-    if (
-      kycStatusRes.status === HttpStatusCode.Ok ||
-      //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
-      // remove after the exchange is fixed or clarified
-      kycStatusRes.status === HttpStatusCode.NoContent
-    ) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.depositGroups])
-        .runReadWrite(async (tx) => {
-          const newDg = await tx.depositGroups.get(depositGroupId);
-          if (!newDg) {
-            return;
-          }
-          if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) {
-            return;
-          }
-          const oldTxState = computeDepositTransactionStatus(newDg);
-          newDg.operationStatus = DepositOperationStatus.PendingTrack;
-          const newTxState = computeDepositTransactionStatus(newDg);
-          await tx.depositGroups.put(newDg);
-          return { oldTxState, newTxState };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return { ready: true };
-    } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
-      // FIXME: Do we have to update the URL here?
-      return { ready: false };
-    } else {
-      throw Error(
-        `unexpected response from kyc-check (${kycStatusRes.status})`,
-      );
-    }
+  const url = new URL(
+    `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+    kycInfo.exchangeBaseUrl,
+  );
+  url.searchParams.set("timeout_ms", "10000");
+  logger.info(`kyc url ${url.href}`);
+  const kycStatusRes = await ws.http.fetch(url.href, {
+    method: "GET",
+    cancellationToken,
   });
-  return TaskRunResult.longpoll();
+  if (
+    kycStatusRes.status === HttpStatusCode.Ok ||
+    //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+    // remove after the exchange is fixed or clarified
+    kycStatusRes.status === HttpStatusCode.NoContent
+  ) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.depositGroups])
+      .runReadWrite(async (tx) => {
+        const newDg = await tx.depositGroups.get(depositGroupId);
+        if (!newDg) {
+          return;
+        }
+        if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) {
+          return;
+        }
+        const oldTxState = computeDepositTransactionStatus(newDg);
+        newDg.operationStatus = DepositOperationStatus.PendingTrack;
+        const newTxState = computeDepositTransactionStatus(newDg);
+        await tx.depositGroups.put(newDg);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+  } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+    // FIXME: Do we have to update the URL here?
+  } else {
+    throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+  }
+  return TaskRunResult.backoff();
 }
 
 /**
@@ -682,7 +676,7 @@ async function transitionToKycRequired(
   });
   if (kycStatusReq.status === HttpStatusCode.Ok) {
     logger.warn("kyc requested, but already fulfilled");
-    return TaskRunResult.finished();
+    return TaskRunResult.backoff();
   } else if (kycStatusReq.status === HttpStatusCode.Accepted) {
     const kycStatus = await kycStatusReq.json();
     logger.info(`kyc status: ${j2s(kycStatus)}`);
@@ -864,7 +858,7 @@ async function processDepositGroupPendingTrack(
     return TaskRunResult.finished();
   } else {
     // FIXME: Use long-polling.
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
   }
 }
 
@@ -993,7 +987,7 @@ async function processDepositGroupPendingDeposit(
     });
 
   notifyTransition(ws, transactionId, transitionInfo);
-  return TaskRunResult.finished();
+  return TaskRunResult.progress();
 }
 
 /**
@@ -1002,9 +996,7 @@ async function processDepositGroupPendingDeposit(
 export async function processDepositGroup(
   ws: InternalWalletState,
   depositGroupId: string,
-  options: {
-    cancellationToken?: CancellationToken;
-  } = {},
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const depositGroup = await ws.db
     .mktx((x) => [x.depositGroups])
@@ -1021,15 +1013,15 @@ export async function processDepositGroup(
       return processDepositGroupPendingTrack(
         ws,
         depositGroup,
-        options.cancellationToken,
+        cancellationToken,
       );
     case DepositOperationStatus.PendingKyc:
-      return processDepositGroupPendingKyc(ws, depositGroup);
+      return processDepositGroupPendingKyc(ws, depositGroup, 
cancellationToken);
     case DepositOperationStatus.PendingDeposit:
       return processDepositGroupPendingDeposit(
         ws,
         depositGroup,
-        options.cancellationToken,
+        cancellationToken,
       );
     case DepositOperationStatus.Aborting:
       return processDepositGroupAborting(ws, depositGroup);
@@ -1393,10 +1385,8 @@ export async function createDepositGroup(
     operationStatus: DepositOperationStatus.PendingDeposit,
   };
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Deposit,
-    depositGroupId,
-  });
+  const ctx = new DepositTransactionContext(ws, depositGroupId);
+  const transactionId = ctx.transactionId;
 
   const newTxState = await ws.db
     .mktx((x) => [
@@ -1439,6 +1429,8 @@ export async function createDepositGroup(
     hintTransactionId: transactionId,
   });
 
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
   return {
     depositGroupId,
     transactionId,
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts 
b/packages/taler-wallet-core/src/operations/exchanges.ts
index b4d45db2c..22be4102a 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -28,6 +28,8 @@ import {
   AgeRestriction,
   Amounts,
   CancellationToken,
+  CoinRefreshRequest,
+  CoinStatus,
   DeleteExchangeRequest,
   DenomKeyType,
   DenomOperationMap,
@@ -53,6 +55,7 @@ import {
   NotificationType,
   OperationErrorInfo,
   Recoup,
+  RefreshReason,
   ScopeInfo,
   ScopeType,
   TalerError,
@@ -67,8 +70,11 @@ import {
   WireFeeMap,
   WireFeesJson,
   WireInfo,
+  assertUnreachable,
   canonicalizeBaseUrl,
   codecForExchangeKeysJson,
+  durationFromSpec,
+  durationMul,
   encodeCrock,
   hashDenomPub,
   j2s,
@@ -89,19 +95,22 @@ import {
   WalletStoresV1,
 } from "../db.js";
 import {
+  AsyncFlag,
   ExchangeEntryDbRecordStatus,
   ExchangeEntryDbUpdateStatus,
   PendingTaskType,
   WalletDbReadOnlyTransactionArr,
   WalletDbReadWriteTransactionArr,
+  createRefreshGroup,
   createTimeline,
   isWithdrawableDenom,
   selectBestForOverlappingDenominations,
   selectMinimumFee,
-  timestampOptionalAbsoluteFromDb,
+  timestampAbsoluteFromDb,
   timestampOptionalPreciseFromDb,
   timestampPreciseFromDb,
   timestampPreciseToDb,
+  timestampProtocolFromDb,
   timestampProtocolToDb,
 } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
@@ -117,11 +126,11 @@ import {
   TaskRunResult,
   TaskRunResultType,
   constructTaskIdentifier,
+  getAutoRefreshExecuteThreshold,
   getExchangeEntryStatusFromRecord,
   getExchangeState,
   getExchangeTosStatusFromRecord,
   getExchangeUpdateStatusFromRecord,
-  runTaskWithErrorReporting,
 } from "./common.js";
 
 const logger = new Logger("exchanges.ts");
@@ -635,11 +644,13 @@ async function downloadExchangeKeysInfo(
   baseUrl: string,
   http: HttpRequestLibrary,
   timeout: Duration,
+  cancellationToken: CancellationToken,
 ): Promise<ExchangeKeysDownloadResult> {
   const keysUrl = new URL("keys", baseUrl);
 
   const resp = await http.fetch(keysUrl.href, {
     timeout,
+    cancellationToken,
   });
 
   // We must make sure to parse out the protocol version
@@ -828,13 +839,19 @@ async function downloadTosFromAcceptedFormat(
  * If the exchange entry doesn't exist,
  * a new ephemeral entry is created.
  */
-export async function startUpdateExchangeEntry(
+async function startUpdateExchangeEntry(
   ws: InternalWalletState,
   exchangeBaseUrl: string,
   options: { forceUpdate?: boolean } = {},
 ): Promise<void> {
   const canonBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
 
+  logger.info(
+    `starting update of exchange entry ${canonBaseUrl}, forced=${
+      options.forceUpdate ?? false
+    }`,
+  );
+
   const { notification } = await ws.db
     .mktx((x) => [x.exchanges, x.exchangeDetails])
     .runReadWrite(async (tx) => {
@@ -845,7 +862,7 @@ export async function startUpdateExchangeEntry(
     ws.notify(notification);
   }
 
-  const { oldExchangeState, newExchangeState } = await ws.db
+  const { oldExchangeState, newExchangeState, taskId } = await ws.db
     .mktx((x) => [x.exchanges, x.operationRetries])
     .runReadWrite(async (tx) => {
       const r = await tx.exchanges.get(canonBaseUrl);
@@ -882,7 +899,7 @@ export async function startUpdateExchangeEntry(
       // Reset retries for updating the exchange entry.
       const taskId = TaskIdentifiers.forExchangeUpdate(r);
       await tx.operationRetries.delete(taskId);
-      return { oldExchangeState, newExchangeState };
+      return { oldExchangeState, newExchangeState, taskId };
     });
   ws.notify({
     type: NotificationType.ExchangeStateTransition,
@@ -890,7 +907,7 @@ export async function startUpdateExchangeEntry(
     newExchangeState: newExchangeState,
     oldExchangeState: oldExchangeState,
   });
-  ws.workAvailable.trigger();
+  ws.taskScheduler.restartShepherdTask(taskId);
 }
 
 /**
@@ -909,6 +926,119 @@ export interface ReadyExchangeSummary {
   scopeInfo: ScopeInfo;
 }
 
+async function internalWaitReadyExchange(
+  ws: InternalWalletState,
+  canonUrl: string,
+  exchangeNotifFlag: AsyncFlag,
+  options: {
+    cancellationToken?: CancellationToken;
+    forceUpdate?: boolean;
+    expectedMasterPub?: string;
+  } = {},
+): Promise<ReadyExchangeSummary> {
+  const operationId = constructTaskIdentifier({
+    tag: PendingTaskType.ExchangeUpdate,
+    exchangeBaseUrl: canonUrl,
+  });
+  while (true) {
+    logger.info(`waiting for ready exchange ${canonUrl}`);
+    const { exchange, exchangeDetails, retryInfo, scopeInfo } =
+      await ws.db.runReadOnlyTx(
+        [
+          "exchanges",
+          "exchangeDetails",
+          "operationRetries",
+          "globalCurrencyAuditors",
+          "globalCurrencyExchanges",
+        ],
+        async (tx) => {
+          const exchange = await tx.exchanges.get(canonUrl);
+          const exchangeDetails = await getExchangeRecordsInternal(
+            tx,
+            canonUrl,
+          );
+          const retryInfo = await tx.operationRetries.get(operationId);
+          let scopeInfo: ScopeInfo | undefined = undefined;
+          if (exchange && exchangeDetails) {
+            scopeInfo = await internalGetExchangeScopeInfo(tx, 
exchangeDetails);
+          }
+          return { exchange, exchangeDetails, retryInfo, scopeInfo };
+        },
+      );
+
+    if (!exchange) {
+      throw Error("exchange entry does not exist anymore");
+    }
+
+    let ready = false;
+
+    switch (exchange.updateStatus) {
+      case ExchangeEntryDbUpdateStatus.Ready:
+        ready = true;
+        break;
+      case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+        // If the update is forced,
+        // we wait until we're in a full "ready" state,
+        // as we're not happy with the stale information.
+        if (!options.forceUpdate) {
+          ready = true;
+        }
+        break;
+      default: {
+        if (retryInfo) {
+          throw TalerError.fromDetail(
+            TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+            {
+              exchangeBaseUrl: canonUrl,
+              innerError: retryInfo?.lastError,
+            },
+          );
+        }
+      }
+    }
+
+    if (!ready) {
+      logger.info("waiting for exchange update notification");
+      await exchangeNotifFlag.wait();
+      logger.info("done waiting for exchange update notification");
+      exchangeNotifFlag.reset();
+      continue;
+    }
+
+    if (!exchangeDetails) {
+      throw Error("invariant failed");
+    }
+
+    if (!scopeInfo) {
+      throw Error("invariant failed");
+    }
+
+    const res: ReadyExchangeSummary = {
+      currency: exchangeDetails.currency,
+      exchangeBaseUrl: canonUrl,
+      masterPub: exchangeDetails.masterPublicKey,
+      tosStatus: getExchangeTosStatusFromRecord(exchange),
+      tosAcceptedEtag: exchange.tosAcceptedEtag,
+      wireInfo: exchangeDetails.wireInfo,
+      protocolVersionRange: exchangeDetails.protocolVersionRange,
+      tosCurrentEtag: exchange.tosCurrentEtag,
+      tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
+        exchange.tosAcceptedTimestamp,
+      ),
+      scopeInfo,
+    };
+
+    if (options.expectedMasterPub) {
+      if (res.masterPub !== options.expectedMasterPub) {
+        throw Error(
+          "public key of the exchange does not match expected public key",
+        );
+      }
+    }
+    return res;
+  }
+}
+
 /**
  * Ensure that a fresh exchange entry exists for the given
  * exchange base URL.
@@ -933,127 +1063,149 @@ export async function fetchFreshExchange(
   } = {},
 ): Promise<ReadyExchangeSummary> {
   const canonUrl = canonicalizeBaseUrl(baseUrl);
-  const operationId = constructTaskIdentifier({
-    tag: PendingTaskType.ExchangeUpdate,
-    exchangeBaseUrl: canonUrl,
+
+  ws.ensureTaskLoopRunning();
+
+  await startUpdateExchangeEntry(ws, canonUrl, {
+    forceUpdate: options.forceUpdate,
   });
 
-  const oldExchange = await ws.db
-    .mktx((x) => [x.exchanges])
-    .runReadOnly(async (tx) => {
-      return tx.exchanges.get(canonUrl);
-    });
+  return waitReadyExchange(ws, canonUrl, options);
+}
 
-  let needsUpdate = false;
+async function waitReadyExchange(
+  ws: InternalWalletState,
+  canonUrl: string,
+  options: {
+    cancellationToken?: CancellationToken;
+    forceUpdate?: boolean;
+    expectedMasterPub?: string;
+  } = {},
+): Promise<ReadyExchangeSummary> {
+  // FIXME: We should use Symbol.dispose magic here for cleanup!
 
-  if (!oldExchange || options.forceUpdate) {
-    needsUpdate = true;
-    await startUpdateExchangeEntry(ws, canonUrl, {
-      forceUpdate: options.forceUpdate,
-    });
-  } else {
-    const nextUpdate = timestampOptionalAbsoluteFromDb(
-      oldExchange.nextUpdateStamp,
-    );
+  const exchangeNotifFlag = new AsyncFlag();
+  // Raise exchangeNotifFlag whenever we get a notification
+  // about our exchange.
+  const cancelNotif = ws.addNotificationListener((notif) => {
     if (
-      nextUpdate == null ||
-      AbsoluteTime.isExpired(nextUpdate) ||
-      oldExchange.updateStatus !== ExchangeEntryDbUpdateStatus.Ready
+      notif.type === NotificationType.ExchangeStateTransition &&
+      notif.exchangeBaseUrl === canonUrl
     ) {
-      needsUpdate = true;
+      logger.info(`raising update notification: ${j2s(notif)}`);
+      exchangeNotifFlag.raise();
     }
-  }
+  });
 
-  if (needsUpdate) {
-    await runTaskWithErrorReporting(ws, operationId, () =>
-      updateExchangeFromUrlHandler(ws, canonUrl),
+  try {
+    const res = await internalWaitReadyExchange(
+      ws,
+      canonUrl,
+      exchangeNotifFlag,
+      options,
     );
+    logger.info("done waiting for ready exchange");
+    return res;
+  } finally {
+    cancelNotif();
   }
+}
 
-  const { exchange, exchangeDetails, retryInfo, scopeInfo } =
-    await ws.db.runReadOnlyTx(
-      [
-        "exchanges",
-        "exchangeDetails",
-        "operationRetries",
-        "globalCurrencyAuditors",
-        "globalCurrencyExchanges",
-      ],
-      async (tx) => {
-        const exchange = await tx.exchanges.get(canonUrl);
-        const exchangeDetails = await getExchangeRecordsInternal(tx, canonUrl);
-        const retryInfo = await tx.operationRetries.get(operationId);
-        let scopeInfo: ScopeInfo | undefined = undefined;
-        if (exchange && exchangeDetails) {
-          scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
-        }
-        return { exchange, exchangeDetails, retryInfo, scopeInfo };
-      },
-    );
+/**
+ * Update an exchange entry in the wallet's database
+ * by fetching the /keys and /wire information.
+ * Optionally link the reserve entry to the new or existing
+ * exchange entry in then DB.
+ */
+export async function updateExchangeFromUrlHandler(
+  ws: InternalWalletState,
+  exchangeBaseUrl: string,
+  cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+  logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
+  exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
 
-  if (!exchange) {
-    throw Error("exchange entry does not exist anymore");
+  const oldExchangeRec = await ws.db.runReadOnlyTx(
+    ["exchanges"],
+    async (tx) => {
+      return tx.exchanges.get(exchangeBaseUrl);
+    },
+  );
+
+  if (!oldExchangeRec) {
+    logger.info(`not updating exchange ${exchangeBaseUrl}, no record in DB`);
+    return TaskRunResult.finished();
   }
 
-  switch (exchange.updateStatus) {
-    case ExchangeEntryDbUpdateStatus.Ready:
+  let updateRequestedExplicitly = false;
+
+  switch (oldExchangeRec.updateStatus) {
+    case ExchangeEntryDbUpdateStatus.Suspended:
+      logger.info(`not updating exchange in status "suspended"`);
+      return TaskRunResult.finished();
+    case ExchangeEntryDbUpdateStatus.Initial:
+      logger.info(`not updating exchange in status "initial"`);
+      return TaskRunResult.finished();
+    case ExchangeEntryDbUpdateStatus.InitialUpdate:
     case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+    case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+      updateRequestedExplicitly = true;
+      break;
+    case ExchangeEntryDbUpdateStatus.Ready:
       break;
     default:
-      throw TalerError.fromDetail(TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, {
-        exchangeBaseUrl: canonUrl,
-        innerError: retryInfo?.lastError,
-      });
+      assertUnreachable(oldExchangeRec.updateStatus);
   }
 
-  if (!exchangeDetails) {
-    throw Error("invariant failed");
-  }
+  let refreshCheckNecessary = true;
 
-  if (!scopeInfo) {
-    throw Error("invariant failed");
-  }
+  if (!updateRequestedExplicitly) {
+    // If the update wasn't requested explicitly,
+    // check if we really need to update.
 
-  const res: ReadyExchangeSummary = {
-    currency: exchangeDetails.currency,
-    exchangeBaseUrl: canonUrl,
-    masterPub: exchangeDetails.masterPublicKey,
-    tosStatus: getExchangeTosStatusFromRecord(exchange),
-    tosAcceptedEtag: exchange.tosAcceptedEtag,
-    wireInfo: exchangeDetails.wireInfo,
-    protocolVersionRange: exchangeDetails.protocolVersionRange,
-    tosCurrentEtag: exchange.tosCurrentEtag,
-    tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
-      exchange.tosAcceptedTimestamp,
-    ),
-    scopeInfo,
-  };
+    let nextUpdateStamp = timestampAbsoluteFromDb(
+      oldExchangeRec.nextUpdateStamp,
+    );
 
-  if (options.expectedMasterPub) {
-    if (res.masterPub !== options.expectedMasterPub) {
-      throw Error(
-        "public key of the exchange does not match expected public key",
+    let nextRefreshCheckStamp = timestampAbsoluteFromDb(
+      oldExchangeRec.nextRefreshCheckStamp,
+    );
+
+    let updateNecessary = true;
+
+    if (
+      !AbsoluteTime.isNever(nextUpdateStamp) &&
+      !AbsoluteTime.isExpired(nextUpdateStamp)
+    ) {
+      logger.info(
+        `exchange update for ${exchangeBaseUrl} not necessary, scheduled for 
${AbsoluteTime.toIsoString(
+          nextUpdateStamp,
+        )}`,
+      );
+      updateNecessary = false;
+    }
+
+    if (
+      !AbsoluteTime.isNever(nextRefreshCheckStamp) &&
+      !AbsoluteTime.isExpired(nextRefreshCheckStamp)
+    ) {
+      logger.info(
+        `exchange refresh check for ${exchangeBaseUrl} not necessary, 
scheduled for ${AbsoluteTime.toIsoString(
+          nextRefreshCheckStamp,
+        )}`,
+      );
+      refreshCheckNecessary = false;
+    }
+
+    if (!(updateNecessary || refreshCheckNecessary)) {
+      return TaskRunResult.runAgainAt(
+        AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp),
       );
     }
   }
-  return res;
-}
 
-/**
- * Update an exchange entry in the wallet's database
- * by fetching the /keys and /wire information.
- * Optionally link the reserve entry to the new or existing
- * exchange entry in then DB.
- */
-export async function updateExchangeFromUrlHandler(
-  ws: InternalWalletState,
-  exchangeBaseUrl: string,
-  options: {
-    cancellationToken?: CancellationToken;
-  } = {},
-): Promise<TaskRunResult> {
-  logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
-  exchangeBaseUrl = canonicalizeBaseUrl(exchangeBaseUrl);
+  // When doing the auto-refresh check, we always update
+  // the key info before that.
 
   logger.trace("updating exchange /keys info");
 
@@ -1063,6 +1215,7 @@ export async function updateExchangeFromUrlHandler(
     exchangeBaseUrl,
     ws.http,
     timeout,
+    cancellationToken,
   );
 
   logger.trace("validating exchange wire info");
@@ -1302,9 +1455,13 @@ export async function updateExchangeFromUrlHandler(
     });
 
   if (recoupGroupId) {
+    const recoupTaskId = constructTaskIdentifier({
+      tag: PendingTaskType.Recoup,
+      recoupGroupId,
+    });
     // Asynchronously start recoup.  This doesn't need to finish
     // for the exchange update to be considered finished.
-    ws.workAvailable.trigger();
+    ws.taskScheduler.startShepherdTask(recoupTaskId);
   }
 
   if (!updated) {
@@ -1313,6 +1470,84 @@ export async function updateExchangeFromUrlHandler(
 
   logger.trace("done updating exchange info in database");
 
+  logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
+
+  let minCheckThreshold = AbsoluteTime.addDuration(
+    AbsoluteTime.now(),
+    durationFromSpec({ days: 1 }),
+  );
+
+  if (refreshCheckNecessary) {
+    // Do auto-refresh.
+    await ws.db
+      .mktx((x) => [
+        x.coins,
+        x.denominations,
+        x.coinAvailability,
+        x.refreshGroups,
+        x.exchanges,
+      ])
+      .runReadWrite(async (tx) => {
+        const exchange = await tx.exchanges.get(exchangeBaseUrl);
+        if (!exchange || !exchange.detailsPointer) {
+          return;
+        }
+        const coins = await tx.coins.indexes.byBaseUrl
+          .iter(exchangeBaseUrl)
+          .toArray();
+        const refreshCoins: CoinRefreshRequest[] = [];
+        for (const coin of coins) {
+          if (coin.status !== CoinStatus.Fresh) {
+            continue;
+          }
+          const denom = await tx.denominations.get([
+            exchangeBaseUrl,
+            coin.denomPubHash,
+          ]);
+          if (!denom) {
+            logger.warn("denomination not in database");
+            continue;
+          }
+          const executeThreshold =
+            getAutoRefreshExecuteThresholdForDenom(denom);
+          if (AbsoluteTime.isExpired(executeThreshold)) {
+            refreshCoins.push({
+              coinPub: coin.coinPub,
+              amount: denom.value,
+            });
+          } else {
+            const checkThreshold = getAutoRefreshCheckThreshold(denom);
+            minCheckThreshold = AbsoluteTime.min(
+              minCheckThreshold,
+              checkThreshold,
+            );
+          }
+        }
+        if (refreshCoins.length > 0) {
+          const res = await createRefreshGroup(
+            ws,
+            tx,
+            exchange.detailsPointer?.currency,
+            refreshCoins,
+            RefreshReason.Scheduled,
+            undefined,
+          );
+          logger.trace(
+            `created refresh group for auto-refresh (${res.refreshGroupId})`,
+          );
+        }
+        logger.trace(
+          `next refresh check at ${AbsoluteTime.toIsoString(
+            minCheckThreshold,
+          )}`,
+        );
+        exchange.nextRefreshCheckStamp = timestampPreciseToDb(
+          AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
+        );
+        await tx.exchanges.put(exchange);
+      });
+  }
+
   ws.notify({
     type: NotificationType.ExchangeStateTransition,
     exchangeBaseUrl,
@@ -1320,7 +1555,33 @@ export async function updateExchangeFromUrlHandler(
     oldExchangeState: updated.oldExchangeState,
   });
 
-  return TaskRunResult.finished();
+  // Next invocation will cause the task to be run again
+  // at the necessary time.
+  return TaskRunResult.progress();
+}
+
+function getAutoRefreshExecuteThresholdForDenom(
+  d: DenominationRecord,
+): AbsoluteTime {
+  return getAutoRefreshExecuteThreshold({
+    stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
+    stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
+  });
+}
+
+/**
+ * Timestamp after which the wallet would do the next check for an 
auto-refresh.
+ */
+function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
+  const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
+    timestampProtocolFromDb(d.stampExpireWithdraw),
+  );
+  const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
+    timestampProtocolFromDb(d.stampExpireDeposit),
+  );
+  const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
+  const deltaDiv = durationMul(delta, 0.75);
+  return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
 }
 
 /**
@@ -1420,6 +1681,7 @@ export async function downloadExchangeInfo(
     exchangeBaseUrl,
     http,
     Duration.getForever(),
+    CancellationToken.CONTINUE,
   );
   return {
     keys: keysInfo,
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts 
b/packages/taler-wallet-core/src/operations/pay-merchant.ts
index 52f9c70b1..e00432bd0 100644
--- a/packages/taler-wallet-core/src/operations/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts
@@ -77,6 +77,7 @@ import {
   TalerProtocolViolationError,
   TalerUriAction,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -102,12 +103,14 @@ import {
   WalletStoresV1,
 } from "../db.js";
 import {
+  AsyncFlag,
   getCandidateWithdrawalDenomsTx,
   PendingTaskType,
   RefundGroupRecord,
   RefundGroupStatus,
   RefundItemRecord,
   RefundItemStatus,
+  TaskId,
   timestampPreciseToDb,
   timestampProtocolFromDb,
   timestampProtocolToDb,
@@ -128,8 +131,6 @@ import {
 import {
   constructTaskIdentifier,
   DbRetryInfo,
-  runLongpollAsync,
-  runTaskWithErrorReporting,
   spendCoins,
   TaskIdentifiers,
   TaskRunResult,
@@ -147,7 +148,6 @@ import {
   constructTransactionIdentifier,
   notifyTransition,
   parseTransactionIdentifier,
-  stopLongpolling,
 } from "./transactions.js";
 
 /**
@@ -156,8 +156,8 @@ import {
 const logger = new Logger("pay-merchant.ts");
 
 export class PayMerchantTransactionContext implements TransactionContext {
-  private transactionId: string;
-  private retryTag: string;
+  readonly transactionId: TransactionIdStr;
+  readonly taskId: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -167,7 +167,7 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
       tag: TransactionType.Payment,
       proposalId,
     });
-    this.retryTag = constructTaskIdentifier({
+    this.taskId = constructTaskIdentifier({
       tag: PendingTaskType.Purchase,
       proposalId,
     });
@@ -252,7 +252,7 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
 
   async suspendTransaction(): Promise<void> {
     const { ws, proposalId, transactionId } = this;
-    stopLongpolling(ws, this.retryTag);
+    ws.taskScheduler.stopShepherdTask(this.taskId);
     const transitionInfo = await ws.db
       .mktx((x) => [x.purchases])
       .runReadWrite(async (tx) => {
@@ -270,7 +270,6 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
         return { oldTxState, newTxState };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    ws.workAvailable.trigger();
   }
 
   async abortTransaction(): Promise<void> {
@@ -330,18 +329,18 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
             break;
         }
         await tx.purchases.put(purchase);
-        await tx.operationRetries.delete(this.retryTag);
+        await tx.operationRetries.delete(this.taskId);
         const newTxState = computePayMerchantTransactionState(purchase);
         return { oldTxState, newTxState };
       },
     );
+    ws.taskScheduler.stopShepherdTask(this.taskId);
     notifyTransition(ws, transactionId, transitionInfo);
-    ws.workAvailable.trigger();
+    ws.taskScheduler.startShepherdTask(this.taskId);
   }
 
   async resumeTransaction(): Promise<void> {
-    const { ws, proposalId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
+    const { ws, proposalId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.purchases])
       .runReadWrite(async (tx) => {
@@ -358,9 +357,8 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
         const newTxState = computePayMerchantTransactionState(purchase);
         return { oldTxState, newTxState };
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
-    ws.workAvailable.trigger();
+    ws.taskScheduler.startShepherdTask(this.taskId);
   }
 
   async failTransaction(): Promise<void> {
@@ -394,7 +392,7 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
         return { oldTxState, newTxState };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    ws.workAvailable.trigger();
+    ws.taskScheduler.stopShepherdTask(this.taskId);
   }
 }
 
@@ -638,14 +636,18 @@ async function processDownloadProposal(
     return TaskRunResult.finished();
   }
 
+  const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
   if (proposal.purchaseStatus != PurchaseStatus.PendingDownloadingProposal) {
+    logger.error(
+      `unexpected state ${proposal.purchaseStatus}/${
+        PurchaseStatus[proposal.purchaseStatus]
+      } for ${ctx.transactionId} in processDownloadProposal`,
+    );
     return TaskRunResult.finished();
   }
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Payment,
-    proposalId,
-  });
+  const transactionId = ctx.transactionId;
 
   const orderClaimUrl = new URL(
     `orders/${proposal.orderId}/claim`,
@@ -857,7 +859,7 @@ async function processDownloadProposal(
 
   notifyTransition(ws, transactionId, transitionInfo);
 
-  return TaskRunResult.finished();
+  return TaskRunResult.progress();
 }
 
 /**
@@ -865,7 +867,7 @@ async function processDownloadProposal(
  * record for the provided arguments already exists,
  * return the old proposal ID.
  */
-async function createPurchase(
+async function createOrReusePurchase(
   ws: InternalWalletState,
   merchantBaseUrl: string,
   orderId: string,
@@ -889,23 +891,26 @@ async function createPurchase(
       p.claimToken === claimToken
     );
   });
-  /* If we have already claimed this proposal with the same sessionId
-   * nonce and claim token, reuse it. */
+  // If we have already claimed this proposal with the same sessionId
+  // nonce and claim token, reuse it. */
   if (
     oldProposal &&
     oldProposal.downloadSessionId === sessionId &&
     (!noncePriv || oldProposal.noncePriv === noncePriv) &&
     oldProposal.claimToken === claimToken
   ) {
-    // FIXME: This lacks proper error handling
-    await processDownloadProposal(ws, oldProposal.proposalId);
-
+    logger.info(
+      `Found old proposal (status=${
+        PurchaseStatus[oldProposal.purchaseStatus]
+      }) for order ${orderId} at ${merchantBaseUrl}`,
+    );
     if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) {
       const download = await expectProposalDownload(ws, oldProposal);
       const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+      logger.info(`old proposal paid: ${paid}`);
       if (paid) {
-        //if this transaction was shared and the order is paid then it
-        //means that another wallet already paid the proposal
+        // if this transaction was shared and the order is paid then it
+        // means that another wallet already paid the proposal
         const transitionInfo = await ws.db
           .mktx((x) => [x.purchases])
           .runReadWrite(async (tx) => {
@@ -990,8 +995,6 @@ async function createPurchase(
     proposalId,
   });
   notifyTransition(ws, transactionId, transitionInfo);
-
-  await processDownloadProposal(ws, proposalId);
   return proposalId;
 }
 
@@ -1244,11 +1247,10 @@ async function handleInsufficientFunds(
   });
 }
 
-// FIXME: Should probably not be exported in its current state
 // FIXME: Should take a transaction ID instead of a proposal ID
 // FIXME: Does way more than checking the payment
 // FIXME: Should return immediately.
-export async function checkPaymentByProposalId(
+async function checkPaymentByProposalId(
   ws: InternalWalletState,
   proposalId: string,
   sessionId?: string,
@@ -1284,10 +1286,9 @@ export async function checkPaymentByProposalId(
 
   proposalId = proposal.proposalId;
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Payment,
-    proposalId,
-  });
+  const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+  const transactionId = ctx.transactionId;
 
   const talerUri = stringifyTalerUri({
     type: TalerUriAction.Pay,
@@ -1377,12 +1378,12 @@ export async function checkPaymentByProposalId(
         return { oldTxState, newTxState };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    // FIXME: What about error handling?! This doesn't properly store errors 
in the DB.
-    const r = await processPurchasePay(ws, proposalId);
-    if (r.type !== TaskRunResultType.Finished) {
-      // FIXME: This does not surface the original error
-      throw Error("submitting pay failed");
-    }
+    ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+    // FIXME: Consider changing the API here so that we don't have to
+    // wait inline for the repurchase.
+
+    await waitPaymentResult(ws, proposalId, sessionId);
     const download = await expectProposalDownload(ws, purchase);
     return {
       status: PreparePayResultType.AlreadyConfirmed,
@@ -1476,7 +1477,7 @@ export async function preparePayForUri(
     );
   }
 
-  const proposalId = await createPurchase(
+  const proposalId = await createOrReusePurchase(
     ws,
     uriResult.merchantBaseUrl,
     uriResult.orderId,
@@ -1485,9 +1486,79 @@ export async function preparePayForUri(
     uriResult.noncePriv,
   );
 
+  await waitProposalDownloaded(ws, proposalId);
+
   return checkPaymentByProposalId(ws, proposalId, uriResult.sessionId);
 }
 
+/**
+ * Wait until a proposal is at least downloaded.
+ */
+async function waitProposalDownloaded(
+  ws: InternalWalletState,
+  proposalId: string,
+): Promise<void> {
+  const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+  logger.info(`waiting for ${ctx.transactionId} to be downloaded`);
+
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+  // FIXME: We should use Symbol.dispose magic here for cleanup!
+
+  const payNotifFlag = new AsyncFlag();
+  // Raise exchangeNotifFlag whenever we get a notification
+  // about our exchange.
+  const cancelNotif = ws.addNotificationListener((notif) => {
+    if (
+      notif.type === NotificationType.TransactionStateTransition &&
+      notif.transactionId === ctx.transactionId
+    ) {
+      logger.info(`raising update notification: ${j2s(notif)}`);
+      payNotifFlag.raise();
+    }
+  });
+
+  try {
+    await internalWaitProposalDownloaded(ctx, payNotifFlag);
+    logger.info(`done waiting for ${ctx.transactionId} to be downloaded`);
+  } finally {
+    cancelNotif();
+  }
+}
+
+async function internalWaitProposalDownloaded(
+  ctx: PayMerchantTransactionContext,
+  payNotifFlag: AsyncFlag,
+): Promise<void> {
+  while (true) {
+    const { purchase, retryInfo } = await ctx.ws.db.runReadOnlyTx(
+      ["purchases", "operationRetries"],
+      async (tx) => {
+        return {
+          purchase: await tx.purchases.get(ctx.proposalId),
+          retryInfo: await tx.operationRetries.get(ctx.taskId),
+        };
+      },
+    );
+    if (!purchase) {
+      throw Error("purchase does not exist anymore");
+    }
+    if (purchase.download) {
+      return;
+    }
+    if (retryInfo) {
+      if (retryInfo.lastError) {
+        throw TalerError.fromUncheckedDetail(retryInfo.lastError);
+      } else {
+        throw Error("transient error while waiting for proposal download");
+      }
+    }
+    await payNotifFlag.wait();
+    payNotifFlag.reset();
+  }
+}
+
 export async function preparePayForTemplate(
   ws: InternalWalletState,
   req: PreparePayTemplateRequest,
@@ -1598,71 +1669,101 @@ export async function generateDepositPermissions(
   return depositPermissions;
 }
 
-/**
- * Run the operation handler for a payment
- * and return the result as a {@link ConfirmPayResult}.
- */
-async function runPayForConfirmPay(
-  ws: InternalWalletState,
-  proposalId: string,
+async function internalWaitPaymentResult(
+  ctx: PayMerchantTransactionContext,
+  purchaseNotifFlag: AsyncFlag,
+  waitSessionId?: string,
 ): Promise<ConfirmPayResult> {
-  logger.trace("processing proposal for confirmPay");
-  const taskId = constructTaskIdentifier({
-    tag: PendingTaskType.Purchase,
-    proposalId,
-  });
-  const res = await runTaskWithErrorReporting(ws, taskId, async () => {
-    return await processPurchasePay(ws, proposalId);
-  });
-  logger.trace(`processPurchasePay response type ${res.type}`);
-  switch (res.type) {
-    case TaskRunResultType.Finished: {
-      const purchase = await ws.db
-        .mktx((x) => [x.purchases])
-        .runReadOnly(async (tx) => {
-          return tx.purchases.get(proposalId);
-        });
-      if (!purchase) {
-        throw Error("purchase record not available anymore");
+  while (true) {
+    const txRes = await ctx.ws.db.runReadOnlyTx(
+      ["purchases", "operationRetries"],
+      async (tx) => {
+        const purchase = await tx.purchases.get(ctx.proposalId);
+        const retryRecord = await tx.operationRetries.get(ctx.taskId);
+        return { purchase, retryRecord };
+      },
+    );
+
+    if (!txRes.purchase) {
+      throw Error("purchase gone");
+    }
+
+    const purchase = txRes.purchase;
+
+    logger.info(
+      `purchase is in state ${PurchaseStatus[purchase.purchaseStatus]}`,
+    );
+
+    const d = await expectProposalDownload(ctx.ws, purchase);
+
+    if (txRes.purchase.timestampFirstSuccessfulPay) {
+      if (
+        waitSessionId == null ||
+        txRes.purchase.lastSessionId === waitSessionId
+      ) {
+        return {
+          type: ConfirmPayResultType.Done,
+          contractTerms: d.contractTermsRaw,
+          transactionId: ctx.transactionId,
+        };
       }
-      const d = await expectProposalDownload(ws, purchase);
-      return {
-        type: ConfirmPayResultType.Done,
-        contractTerms: d.contractTermsRaw,
-        transactionId: constructTransactionIdentifier({
-          tag: TransactionType.Payment,
-          proposalId,
-        }),
-      };
     }
-    case TaskRunResultType.Error: {
-      // We hide transient errors from the caller.
-      const opRetry = await ws.db
-        .mktx((x) => [x.operationRetries])
-        .runReadOnly(async (tx) => tx.operationRetries.get(taskId));
+
+    if (txRes.retryRecord) {
       return {
         type: ConfirmPayResultType.Pending,
-        lastError: opRetry?.lastError,
-        transactionId: constructTransactionIdentifier({
-          tag: TransactionType.Payment,
-          proposalId,
-        }),
+        lastError: txRes.retryRecord.lastError,
+        transactionId: ctx.transactionId,
       };
     }
-    case TaskRunResultType.Pending:
-      logger.trace("reporting pending as confirmPay response");
-      return {
-        type: ConfirmPayResultType.Pending,
-        transactionId: constructTransactionIdentifier({
-          tag: TransactionType.Payment,
-          proposalId,
-        }),
-        lastError: undefined,
-      };
-    case TaskRunResultType.Longpoll:
-      throw Error("unexpected processPurchasePay result (longpoll)");
-    default:
-      assertUnreachable(res);
+
+    await purchaseNotifFlag.wait();
+    purchaseNotifFlag.reset();
+  }
+}
+
+/**
+ * Wait until either:
+ * a) the payment succeeded (if provided under the {@param waitSessionId}), or
+ * b) the attempt to pay failed (merchant unavailable, etc.)
+ */
+async function waitPaymentResult(
+  ws: InternalWalletState,
+  proposalId: string,
+  waitSessionId?: string,
+): Promise<ConfirmPayResult> {
+  const ctx = new PayMerchantTransactionContext(ws, proposalId);
+
+  ws.ensureTaskLoopRunning();
+
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
+  // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+  const purchaseNotifFlag = new AsyncFlag();
+  // Raise purchaseNotifFlag whenever we get a notification
+  // about our purchase.
+  const cancelNotif = ws.addNotificationListener((notif) => {
+    if (
+      notif.type === NotificationType.TransactionStateTransition &&
+      notif.transactionId === ctx.transactionId
+    ) {
+      purchaseNotifFlag.raise();
+    }
+  });
+
+  try {
+    logger.info(`waiting for first payment success on ${ctx.transactionId}`);
+    const res = await internalWaitPaymentResult(
+      ctx,
+      purchaseNotifFlag,
+      waitSessionId,
+    );
+    logger.info(
+      `done waiting for first payment success on ${ctx.transactionId}, result 
${res.type}`,
+    );
+    return res;
+  } finally {
+    cancelNotif();
   }
 }
 
@@ -1719,7 +1820,12 @@ export async function confirmPay(
 
   if (existingPurchase && existingPurchase.payInfo) {
     logger.trace("confirmPay: submitting payment for existing purchase");
-    return runPayForConfirmPay(ws, proposalId);
+    const ctx = new PayMerchantTransactionContext(
+      ws,
+      existingPurchase.proposalId,
+    );
+    await ws.taskScheduler.resetTaskRetries(ctx.taskId);
+    return waitPaymentResult(ws, proposalId);
   }
 
   logger.trace("confirmPay: purchase record does not exist yet");
@@ -1817,9 +1923,8 @@ export async function confirmPay(
     hintTransactionId: transactionId,
   });
 
-  // We directly make a first attempt to pay.
-  // FIXME: In the future we should just wait for the right event
-  return runPayForConfirmPay(ws, proposalId);
+  // Wait until we have completed the first attempt to pay.
+  return waitPaymentResult(ws, proposalId);
 }
 
 export async function processPurchase(
@@ -2017,7 +2122,7 @@ async function processPurchasePay(
 
         // FIXME: Should we really consider this to be pending?
 
-        return TaskRunResult.pending();
+        return TaskRunResult.backoff();
       }
     }
 
@@ -2076,7 +2181,7 @@ async function processPurchasePay(
     await storePayReplaySuccess(ws, proposalId, sessionId);
   }
 
-  return TaskRunResult.finished();
+  return TaskRunResult.progress();
 }
 
 export async function refuseProposal(
@@ -2365,7 +2470,7 @@ export async function sharePayment(
         p.purchaseStatus !== PurchaseStatus.DialogProposed &&
         p.purchaseStatus !== PurchaseStatus.DialogShared
       ) {
-        //FIXME: purchase can be shared before being paid
+        // FIXME: purchase can be shared before being paid
         return undefined;
       }
       if (p.purchaseStatus === PurchaseStatus.DialogProposed) {
@@ -2426,57 +2531,37 @@ async function processPurchaseDialogShared(
 ): Promise<TaskRunResult> {
   const proposalId = purchase.proposalId;
   logger.trace(`processing dialog-shared for proposal ${proposalId}`);
-
-  const taskId = constructTaskIdentifier({
-    tag: PendingTaskType.Purchase,
-    proposalId,
-  });
-
-  // FIXME: Put this logic into runLongpollAsync?
-  if (ws.activeLongpoll[taskId]) {
-    return TaskRunResult.longpoll();
-  }
   const download = await expectProposalDownload(ws, purchase);
 
   if (purchase.purchaseStatus !== PurchaseStatus.DialogShared) {
     return TaskRunResult.finished();
   }
 
-  runLongpollAsync(ws, taskId, async (ct) => {
-    const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
-    if (paid) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.purchases])
-        .runReadWrite(async (tx) => {
-          const p = await tx.purchases.get(purchase.proposalId);
-          if (!p) {
-            logger.warn("purchase does not exist anymore");
-            return;
-          }
-          const oldTxState = computePayMerchantTransactionState(p);
-          p.purchaseStatus = PurchaseStatus.FailedClaim;
-          const newTxState = computePayMerchantTransactionState(p);
-          await tx.purchases.put(p);
-          return { oldTxState, newTxState };
-        });
-      const transactionId = constructTransactionIdentifier({
-        tag: TransactionType.Payment,
-        proposalId,
+  const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData);
+  if (paid) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.purchases])
+      .runReadWrite(async (tx) => {
+        const p = await tx.purchases.get(purchase.proposalId);
+        if (!p) {
+          logger.warn("purchase does not exist anymore");
+          return;
+        }
+        const oldTxState = computePayMerchantTransactionState(p);
+        p.purchaseStatus = PurchaseStatus.FailedClaim;
+        const newTxState = computePayMerchantTransactionState(p);
+        await tx.purchases.put(p);
+        return { oldTxState, newTxState };
       });
+    const transactionId = constructTransactionIdentifier({
+      tag: TransactionType.Payment,
+      proposalId,
+    });
 
-      notifyTransition(ws, transactionId, transitionInfo);
-
-      return {
-        ready: true,
-      };
-    }
-
-    return {
-      ready: false,
-    };
-  });
+    notifyTransition(ws, transactionId, transitionInfo);
+  }
 
-  return TaskRunResult.longpoll();
+  return TaskRunResult.backoff();
 }
 
 async function processPurchaseAutoRefund(
@@ -2496,97 +2581,81 @@ async function processPurchaseAutoRefund(
     proposalId,
   });
 
-  // FIXME: Put this logic into runLongpollAsync?
-  if (ws.activeLongpoll[taskId]) {
-    return TaskRunResult.longpoll();
-  }
-
   const download = await expectProposalDownload(ws, purchase);
 
-  runLongpollAsync(ws, taskId, async (ct) => {
-    if (
-      !purchase.autoRefundDeadline ||
-      AbsoluteTime.isExpired(
-        AbsoluteTime.fromProtocolTimestamp(
-          timestampProtocolFromDb(purchase.autoRefundDeadline),
-        ),
-      )
-    ) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.purchases])
-        .runReadWrite(async (tx) => {
-          const p = await tx.purchases.get(purchase.proposalId);
-          if (!p) {
-            logger.warn("purchase does not exist anymore");
-            return;
-          }
-          if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
-            return;
-          }
-          const oldTxState = computePayMerchantTransactionState(p);
-          p.purchaseStatus = PurchaseStatus.Done;
-          p.refundAmountAwaiting = undefined;
-          const newTxState = computePayMerchantTransactionState(p);
-          await tx.purchases.put(p);
-          return { oldTxState, newTxState };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return {
-        ready: true,
-      };
-    }
+  if (
+    !purchase.autoRefundDeadline ||
+    AbsoluteTime.isExpired(
+      AbsoluteTime.fromProtocolTimestamp(
+        timestampProtocolFromDb(purchase.autoRefundDeadline),
+      ),
+    )
+  ) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.purchases])
+      .runReadWrite(async (tx) => {
+        const p = await tx.purchases.get(purchase.proposalId);
+        if (!p) {
+          logger.warn("purchase does not exist anymore");
+          return;
+        }
+        if (p.purchaseStatus !== PurchaseStatus.PendingQueryingRefund) {
+          return;
+        }
+        const oldTxState = computePayMerchantTransactionState(p);
+        p.purchaseStatus = PurchaseStatus.Done;
+        p.refundAmountAwaiting = undefined;
+        const newTxState = computePayMerchantTransactionState(p);
+        await tx.purchases.put(p);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+    return TaskRunResult.finished();
+  }
 
-    const requestUrl = new URL(
-      `orders/${download.contractData.orderId}`,
-      download.contractData.merchantBaseUrl,
-    );
-    requestUrl.searchParams.set(
-      "h_contract",
-      download.contractData.contractTermsHash,
-    );
+  const requestUrl = new URL(
+    `orders/${download.contractData.orderId}`,
+    download.contractData.merchantBaseUrl,
+  );
+  requestUrl.searchParams.set(
+    "h_contract",
+    download.contractData.contractTermsHash,
+  );
 
-    requestUrl.searchParams.set("timeout_ms", "1000");
-    requestUrl.searchParams.set("await_refund_obtained", "yes");
+  requestUrl.searchParams.set("timeout_ms", "1000");
+  requestUrl.searchParams.set("await_refund_obtained", "yes");
 
-    const resp = await ws.http.fetch(requestUrl.href);
+  const resp = await ws.http.fetch(requestUrl.href);
 
-    // FIXME: Check other status codes!
+  // FIXME: Check other status codes!
 
-    const orderStatus = await readSuccessResponseJsonOrThrow(
-      resp,
-      codecForMerchantOrderStatusPaid(),
-    );
+  const orderStatus = await readSuccessResponseJsonOrThrow(
+    resp,
+    codecForMerchantOrderStatusPaid(),
+  );
 
-    if (orderStatus.refund_pending) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.purchases])
-        .runReadWrite(async (tx) => {
-          const p = await tx.purchases.get(purchase.proposalId);
-          if (!p) {
-            logger.warn("purchase does not exist anymore");
-            return;
-          }
-          if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) {
-            return;
-          }
-          const oldTxState = computePayMerchantTransactionState(p);
-          p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
-          const newTxState = computePayMerchantTransactionState(p);
-          await tx.purchases.put(p);
-          return { oldTxState, newTxState };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return {
-        ready: true,
-      };
-    } else {
-      return {
-        ready: false,
-      };
-    }
-  });
+  if (orderStatus.refund_pending) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.purchases])
+      .runReadWrite(async (tx) => {
+        const p = await tx.purchases.get(purchase.proposalId);
+        if (!p) {
+          logger.warn("purchase does not exist anymore");
+          return;
+        }
+        if (p.purchaseStatus !== PurchaseStatus.PendingQueryingAutoRefund) {
+          return;
+        }
+        const oldTxState = computePayMerchantTransactionState(p);
+        p.purchaseStatus = PurchaseStatus.PendingAcceptRefund;
+        const newTxState = computePayMerchantTransactionState(p);
+        await tx.purchases.put(p);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+  }
 
-  return TaskRunResult.longpoll();
+  return TaskRunResult.backoff();
 }
 
 async function processPurchaseAbortingRefund(
@@ -2734,7 +2803,7 @@ async function processPurchaseQueryRefund(
         return { oldTxState, newTxState };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    return TaskRunResult.finished();
+    return TaskRunResult.progress();
   } else {
     const refundAwaiting = Amounts.sub(
       Amounts.parseOrThrow(orderStatus.refund_amount),
@@ -2760,7 +2829,7 @@ async function processPurchaseQueryRefund(
         return { oldTxState, newTxState };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    return TaskRunResult.finished();
+    return TaskRunResult.progress();
   }
 }
 
@@ -2836,10 +2905,7 @@ export async function startQueryRefund(
   ws: InternalWalletState,
   proposalId: string,
 ): Promise<void> {
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Payment,
-    proposalId,
-  });
+  const ctx = new PayMerchantTransactionContext(ws, proposalId);
   const transitionInfo = await ws.db
     .mktx((x) => [x.purchases])
     .runReadWrite(async (tx) => {
@@ -2857,8 +2923,8 @@ export async function startQueryRefund(
       await tx.purchases.put(p);
       return { oldTxState, newTxState };
     });
-  notifyTransition(ws, transactionId, transitionInfo);
-  ws.workAvailable.trigger();
+  notifyTransition(ws, ctx.transactionId, transitionInfo);
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
 }
 
 async function computeRefreshRequest(
@@ -3128,10 +3194,10 @@ async function storeRefunds(
   notifyTransition(ws, transactionId, result.transitionInfo);
 
   if (result.numPendingItemsTotal > 0) {
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
+  } else {
+    return TaskRunResult.progress();
   }
-
-  return TaskRunResult.finished();
 }
 
 export function computeRefundTransactionState(
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index e655eba4b..cc41abde9 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -33,6 +33,7 @@ import {
   TalerProtocolTimestamp,
   TalerUriAction,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -62,17 +63,15 @@ import {
   timestampPreciseToDb,
 } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
 import { checkDbInvariant } from "../util/invariants.js";
 import {
-  LongpollResult,
   TaskRunResult,
   TaskRunResultType,
   TombstoneTag,
   TransactionContext,
   constructTaskIdentifier,
-  runLongpollAsync,
 } from "./common.js";
 import {
   codecForExchangePurseStatus,
@@ -81,7 +80,6 @@ import {
 import {
   constructTransactionIdentifier,
   notifyTransition,
-  stopLongpolling,
 } from "./transactions.js";
 import {
   getExchangeWithdrawalInfo,
@@ -91,8 +89,8 @@ import {
 const logger = new Logger("pay-peer-pull-credit.ts");
 
 export class PeerPullCreditTransactionContext implements TransactionContext {
-  private transactionId: string;
-  private retryTag: string;
+  readonly transactionId: TransactionIdStr;
+  readonly retryTag: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -139,7 +137,6 @@ export class PeerPullCreditTransactionContext implements 
TransactionContext {
 
   async suspendTransaction(): Promise<void> {
     const { ws, pursePub, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPullCredit])
       .runReadWrite(async (tx) => {
@@ -193,12 +190,12 @@ export class PeerPullCreditTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async failTransaction(): Promise<void> {
     const { ws, pursePub, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPullCredit])
       .runReadWrite(async (tx) => {
@@ -244,11 +241,11 @@ export class PeerPullCreditTransactionContext implements 
TransactionContext {
         return undefined;
       });
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.stopShepherdTask(retryTag);
   }
 
   async resumeTransaction(): Promise<void> {
     const { ws, pursePub, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPullCredit])
       .runReadWrite(async (tx) => {
@@ -301,13 +298,12 @@ export class PeerPullCreditTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async abortTransaction(): Promise<void> {
     const { ws, pursePub, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPullCredit])
       .runReadWrite(async (tx) => {
@@ -355,7 +351,9 @@ export class PeerPullCreditTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 }
 
@@ -363,7 +361,7 @@ async function queryPurseForPeerPullCredit(
   ws: InternalWalletState,
   pullIni: PeerPullCreditRecord,
   cancellationToken: CancellationToken,
-): Promise<LongpollResult> {
+): Promise<TaskRunResult> {
   const purseDepositUrl = new URL(
     `purses/${pullIni.pursePub}/deposit`,
     pullIni.exchangeBaseUrl,
@@ -401,10 +399,10 @@ async function queryPurseForPeerPullCredit(
           return { oldTxState, newTxState };
         });
       notifyTransition(ws, transactionId, transitionInfo);
-      return { ready: true };
+      return TaskRunResult.backoff();
     }
     case HttpStatusCode.NotFound:
-      return { ready: false };
+      return TaskRunResult.backoff();
   }
 
   const result = await readSuccessResponseJsonOrThrow(
@@ -418,7 +416,7 @@ async function queryPurseForPeerPullCredit(
 
   if (!depositTimestamp || TalerProtocolTimestamp.isNever(depositTimestamp)) {
     logger.info("purse not ready yet (no deposit)");
-    return { ready: false };
+    return TaskRunResult.backoff();
   }
 
   const reserve = await ws.db
@@ -462,9 +460,7 @@ async function queryPurseForPeerPullCredit(
       return { oldTxState, newTxState };
     });
   notifyTransition(ws, transactionId, transitionInfo);
-  return {
-    ready: true,
-  };
+  return TaskRunResult.backoff();
 }
 
 async function longpollKycStatus(
@@ -473,6 +469,7 @@ async function longpollKycStatus(
   exchangeUrl: string,
   kycInfo: KycPendingInfo,
   userType: KycUserType,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.PeerPullCredit,
@@ -483,56 +480,47 @@ async function longpollKycStatus(
     pursePub,
   });
 
-  runLongpollAsync(ws, retryTag, async (ct) => {
-    const url = new URL(
-      `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
-      exchangeUrl,
-    );
-    url.searchParams.set("timeout_ms", "10000");
-    logger.info(`kyc url ${url.href}`);
-    const kycStatusRes = await ws.http.fetch(url.href, {
-      method: "GET",
-      cancellationToken: ct,
-    });
-    if (
-      kycStatusRes.status === HttpStatusCode.Ok ||
-      //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
-      // remove after the exchange is fixed or clarified
-      kycStatusRes.status === HttpStatusCode.NoContent
-    ) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.peerPullCredit])
-        .runReadWrite(async (tx) => {
-          const peerIni = await tx.peerPullCredit.get(pursePub);
-          if (!peerIni) {
-            return;
-          }
-          if (
-            peerIni.status !==
-            PeerPullPaymentCreditStatus.PendingMergeKycRequired
-          ) {
-            return;
-          }
-          const oldTxState = computePeerPullCreditTransactionState(peerIni);
-          peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
-          const newTxState = computePeerPullCreditTransactionState(peerIni);
-          await tx.peerPullCredit.put(peerIni);
-          return { oldTxState, newTxState };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return { ready: true };
-    } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
-      // FIXME: Do we have to update the URL here?
-      return { ready: false };
-    } else {
-      throw Error(
-        `unexpected response from kyc-check (${kycStatusRes.status})`,
-      );
-    }
+  const url = new URL(
+    `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+    exchangeUrl,
+  );
+  url.searchParams.set("timeout_ms", "10000");
+  logger.info(`kyc url ${url.href}`);
+  const kycStatusRes = await ws.http.fetch(url.href, {
+    method: "GET",
+    cancellationToken,
   });
-  return {
-    type: TaskRunResultType.Longpoll,
-  };
+  if (
+    kycStatusRes.status === HttpStatusCode.Ok ||
+    //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+    // remove after the exchange is fixed or clarified
+    kycStatusRes.status === HttpStatusCode.NoContent
+  ) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.peerPullCredit])
+      .runReadWrite(async (tx) => {
+        const peerIni = await tx.peerPullCredit.get(pursePub);
+        if (!peerIni) {
+          return;
+        }
+        if (
+          peerIni.status !== 
PeerPullPaymentCreditStatus.PendingMergeKycRequired
+        ) {
+          return;
+        }
+        const oldTxState = computePeerPullCreditTransactionState(peerIni);
+        peerIni.status = PeerPullPaymentCreditStatus.PendingCreatePurse;
+        const newTxState = computePeerPullCreditTransactionState(peerIni);
+        await tx.peerPullCredit.put(peerIni);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+  } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+    // FIXME: Do we have to update the URL here?
+  } else {
+    throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+  }
+  return TaskRunResult.backoff();
 }
 
 async function processPeerPullCreditAbortingDeletePurse(
@@ -584,7 +572,7 @@ async function processPeerPullCreditAbortingDeletePurse(
     });
   notifyTransition(ws, transactionId, transitionInfo);
 
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 async function handlePeerPullCreditWithdrawing(
@@ -637,7 +625,7 @@ async function handlePeerPullCreditWithdrawing(
     return TaskRunResult.finished();
   } else {
     // FIXME: Return indicator that we depend on the other operation!
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
   }
 }
 
@@ -757,13 +745,13 @@ async function handlePeerPullCreditCreatePurse(
       return { oldTxState, newTxState };
     });
   notifyTransition(ws, transactionId, transitionInfo);
-
-  return TaskRunResult.finished();
+  return TaskRunResult.backoff();
 }
 
 export async function processPeerPullCredit(
   ws: InternalWalletState,
   pursePub: string,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const pullIni = await ws.db
     .mktx((x) => [x.peerPullCredit])
@@ -779,14 +767,6 @@ export async function processPeerPullCredit(
     pursePub,
   });
 
-  // We're already running!
-  if (ws.activeLongpoll[retryTag]) {
-    logger.info("peer-pull-credit already in long-polling, returning!");
-    return {
-      type: TaskRunResultType.Longpoll,
-    };
-  }
-
   logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
 
   switch (pullIni.status) {
@@ -794,15 +774,7 @@ export async function processPeerPullCredit(
       return TaskRunResult.finished();
     }
     case PeerPullPaymentCreditStatus.PendingReady:
-      runLongpollAsync(ws, retryTag, async (cancellationToken) =>
-        queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
-      );
-      logger.trace(
-        "returning early from processPeerPullCredit for long-polling in 
background",
-      );
-      return {
-        type: TaskRunResultType.Longpoll,
-      };
+      return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken);
     case PeerPullPaymentCreditStatus.PendingMergeKycRequired: {
       if (!pullIni.kycInfo) {
         throw Error("invalid state, kycInfo required");
@@ -813,6 +785,7 @@ export async function processPeerPullCredit(
         pullIni.exchangeBaseUrl,
         pullIni.kycInfo,
         "individual",
+        cancellationToken,
       );
     }
     case PeerPullPaymentCreditStatus.PendingCreatePurse:
@@ -866,7 +839,7 @@ async function processPeerPullCreditKycRequired(
     kycStatusRes.status === HttpStatusCode.NoContent
   ) {
     logger.warn("kyc requested, but already fulfilled");
-    return TaskRunResult.finished();
+    return TaskRunResult.backoff();
   } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
     const kycStatus = await kycStatusRes.json();
     logger.info(`kyc status: ${j2s(kycStatus)}`);
@@ -906,7 +879,7 @@ async function processPeerPullCreditKycRequired(
         };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
   } else {
     throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
   }
@@ -1095,20 +1068,16 @@ export async function initiatePeerPullPayment(
       return { oldTxState, newTxState };
     });
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.PeerPullCredit,
-    pursePub: pursePair.pub,
-  });
+  const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub);
 
   // The pending-incoming balance has changed.
   ws.notify({
     type: NotificationType.BalanceChange,
-    hintTransactionId: transactionId,
+    hintTransactionId: ctx.transactionId,
   });
 
-  notifyTransition(ws, transactionId, transitionInfo);
-
-  ws.workAvailable.trigger();
+  notifyTransition(ws, ctx.transactionId, transitionInfo);
+  ws.taskScheduler.startShepherdTask(ctx.retryTag);
 
   return {
     talerUri: stringifyTalerUri({
@@ -1116,7 +1085,7 @@ export async function initiatePeerPullPayment(
       exchangeBaseUrl: exchangeBaseUrl,
       contractPriv: contractKeyPair.priv,
     }),
-    transactionId,
+    transactionId: ctx.transactionId,
   };
 }
 
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index c7e447dab..e5ae6b73b 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -42,6 +42,7 @@ import {
   TalerPreciseTimestamp,
   TalerProtocolViolationError,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -69,6 +70,7 @@ import {
   PendingTaskType,
   RefreshOperationStatus,
   StoreNames,
+  TaskId,
   WalletStoresV1,
   createRefreshGroup,
   timestampPreciseToDb,
@@ -93,7 +95,6 @@ import {
   constructTransactionIdentifier,
   notifyTransition,
   parseTransactionIdentifier,
-  stopLongpolling,
 } from "./transactions.js";
 
 const logger = new Logger("pay-peer-pull-debit.ts");
@@ -103,8 +104,8 @@ const logger = new Logger("pay-peer-pull-debit.ts");
  */
 export class PeerPullDebitTransactionContext implements TransactionContext {
   ws: InternalWalletState;
-  transactionId: string;
-  taskId: string;
+  readonly transactionId: TransactionIdStr;
+  readonly taskId: TaskId;
   peerPullDebitId: string;
 
   constructor(ws: InternalWalletState, peerPullDebitId: string) {
@@ -140,7 +141,6 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
     const transactionId = this.transactionId;
     const ws = this.ws;
     const peerPullDebitId = this.peerPullDebitId;
-    stopLongpolling(ws, taskId);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPullDebit])
       .runReadWrite(async (tx) => {
@@ -185,11 +185,11 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
         return undefined;
       });
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.stopShepherdTask(taskId);
   }
 
   async resumeTransaction(): Promise<void> {
     const ctx = this;
-    stopLongpolling(ctx.ws, ctx.taskId);
     await ctx.transition(async (pi) => {
       switch (pi.status) {
         case PeerPullDebitRecordStatus.SuspendedDeposit:
@@ -207,11 +207,11 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
           return TransitionResult.Stay;
       }
     });
+    this.ws.taskScheduler.startShepherdTask(this.taskId);
   }
 
   async failTransaction(): Promise<void> {
     const ctx = this;
-    stopLongpolling(ctx.ws, ctx.taskId);
     await ctx.transition(async (pi) => {
       switch (pi.status) {
         case PeerPullDebitRecordStatus.SuspendedDeposit:
@@ -225,6 +225,7 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
           return TransitionResult.Stay;
       }
     });
+    this.ws.taskScheduler.stopShepherdTask(this.taskId);
   }
 
   async abortTransaction(): Promise<void> {
@@ -325,7 +326,9 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
         }
       },
     );
+    ws.taskScheduler.stopShepherdTask(this.taskId);
     notifyTransition(ws, this.transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(this.taskId);
   }
 }
 
@@ -405,7 +408,7 @@ async function handlePurseCreationConflict(
     }
     await tx.peerPullDebit.put(myPpi);
   });
-  return TaskRunResult.finished();
+  return TaskRunResult.backoff();
 }
 
 async function processPeerPullDebitPendingDeposit(
@@ -469,7 +472,7 @@ async function processPeerPullDebitPendingDeposit(
     }
     case HttpStatusCode.Gone: {
       await ctx.abortTransaction();
-      return TaskRunResult.finished();
+      return TaskRunResult.backoff();
     }
     case HttpStatusCode.Conflict: {
       return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
@@ -529,7 +532,7 @@ async function processPeerPullDebitAbortingRefresh(
     });
   notifyTransition(ws, transactionId, transitionInfo);
   // FIXME: Shouldn't this be finished in some cases?!
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 export async function processPeerPullDebit(
@@ -607,7 +610,7 @@ export async function confirmPeerPullDebit(
     coinSelRes.result.coins,
   );
 
-  const ppi = await ws.db
+  await ws.db
     .mktx((x) => [
       x.exchanges,
       x.coins,
@@ -643,19 +646,19 @@ export async function confirmPeerPullDebit(
         };
       }
       await tx.peerPullDebit.put(pi);
-      return pi;
     });
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.PeerPullDebit,
-    peerPullDebitId,
-  });
+  const ctx = new PeerPullDebitTransactionContext(ws, peerPullDebitId);
+
+  const transactionId = ctx.transactionId;
 
   ws.notify({
     type: NotificationType.BalanceChange,
     hintTransactionId: transactionId,
   });
 
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
   return {
     transactionId,
   };
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
index 427961f44..23976f11b 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-credit.ts
@@ -17,6 +17,7 @@
 import {
   AcceptPeerPushPaymentResponse,
   Amounts,
+  CancellationToken,
   ConfirmPeerPushCreditRequest,
   ContractTermsUtil,
   ExchangePurseMergeRequest,
@@ -54,9 +55,10 @@ import {
   InternalWalletState,
   KycPendingInfo,
   KycUserType,
-  PeerPushPaymentIncomingRecord,
   PeerPushCreditStatus,
+  PeerPushPaymentIncomingRecord,
   PendingTaskType,
+  TaskId,
   WithdrawalGroupStatus,
   WithdrawalRecordType,
   timestampPreciseToDb,
@@ -69,9 +71,8 @@ import {
   TombstoneTag,
   TransactionContext,
   constructTaskIdentifier,
-  runLongpollAsync,
 } from "./common.js";
-import { fetchFreshExchange, markExchangeUsed } from "./exchanges.js";
+import { fetchFreshExchange } from "./exchanges.js";
 import {
   codecForExchangePurseStatus,
   getMergeReserveInfo,
@@ -81,7 +82,6 @@ import {
   constructTransactionIdentifier,
   notifyTransition,
   parseTransactionIdentifier,
-  stopLongpolling,
 } from "./transactions.js";
 import {
   PerformCreateWithdrawalGroupResult,
@@ -93,8 +93,8 @@ import {
 const logger = new Logger("pay-peer-push-credit.ts");
 
 export class PeerPushCreditTransactionContext implements TransactionContext {
-  private transactionId: string;
-  private retryTag: string;
+  readonly transactionId: string;
+  readonly retryTag: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -141,7 +141,6 @@ export class PeerPushCreditTransactionContext implements 
TransactionContext {
 
   async suspendTransaction(): Promise<void> {
     const { ws, peerPushCreditId, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db.runReadWriteTx(
       ["peerPushCredit"],
       async (tx) => {
@@ -191,11 +190,11 @@ export class PeerPushCreditTransactionContext implements 
TransactionContext {
       },
     );
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.stopShepherdTask(retryTag);
   }
 
   async abortTransaction(): Promise<void> {
     const { ws, peerPushCreditId, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db.runReadWriteTx(
       ["peerPushCredit"],
       async (tx) => {
@@ -248,11 +247,11 @@ export class PeerPushCreditTransactionContext implements 
TransactionContext {
       },
     );
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async resumeTransaction(): Promise<void> {
     const { ws, peerPushCreditId, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db.runReadWriteTx(
       ["peerPushCredit"],
       async (tx) => {
@@ -300,13 +299,12 @@ export class PeerPushCreditTransactionContext implements 
TransactionContext {
         return undefined;
       },
     );
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async failTransaction(): Promise<void> {
     const { ws, peerPushCreditId, retryTag, transactionId } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db.runReadWriteTx(
       ["peerPushCredit"],
       async (tx) => {
@@ -349,8 +347,9 @@ export class PeerPushCreditTransactionContext implements 
TransactionContext {
         return undefined;
       },
     );
-    ws.workAvailable.trigger();
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 }
 
@@ -521,63 +520,51 @@ async function longpollKycStatus(
   exchangeUrl: string,
   kycInfo: KycPendingInfo,
   userType: KycUserType,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.PeerPushCredit,
     peerPushCreditId,
   });
-  const retryTag = constructTaskIdentifier({
-    tag: PendingTaskType.PeerPushCredit,
-    peerPushCreditId,
-  });
-
-  runLongpollAsync(ws, retryTag, async (ct) => {
-    const url = new URL(
-      `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
-      exchangeUrl,
-    );
-    url.searchParams.set("timeout_ms", "10000");
-    logger.info(`kyc url ${url.href}`);
-    const kycStatusRes = await ws.http.fetch(url.href, {
-      method: "GET",
-      cancellationToken: ct,
-    });
-    if (
-      kycStatusRes.status === HttpStatusCode.Ok ||
-      //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
-      // remove after the exchange is fixed or clarified
-      kycStatusRes.status === HttpStatusCode.NoContent
-    ) {
-      const transitionInfo = await ws.db
-        .mktx((x) => [x.peerPushCredit])
-        .runReadWrite(async (tx) => {
-          const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
-          if (!peerInc) {
-            return;
-          }
-          if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) 
{
-            return;
-          }
-          const oldTxState = computePeerPushCreditTransactionState(peerInc);
-          peerInc.status = PeerPushCreditStatus.PendingMerge;
-          const newTxState = computePeerPushCreditTransactionState(peerInc);
-          await tx.peerPushCredit.put(peerInc);
-          return { oldTxState, newTxState };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return { ready: true };
-    } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
-      // FIXME: Do we have to update the URL here?
-      return { ready: false };
-    } else {
-      throw Error(
-        `unexpected response from kyc-check (${kycStatusRes.status})`,
-      );
-    }
+  const url = new URL(
+    `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
+    exchangeUrl,
+  );
+  url.searchParams.set("timeout_ms", "10000");
+  logger.info(`kyc url ${url.href}`);
+  const kycStatusRes = await ws.http.fetch(url.href, {
+    method: "GET",
+    cancellationToken,
   });
-  return {
-    type: TaskRunResultType.Longpoll,
-  };
+  if (
+    kycStatusRes.status === HttpStatusCode.Ok ||
+    //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+    // remove after the exchange is fixed or clarified
+    kycStatusRes.status === HttpStatusCode.NoContent
+  ) {
+    const transitionInfo = await ws.db
+      .mktx((x) => [x.peerPushCredit])
+      .runReadWrite(async (tx) => {
+        const peerInc = await tx.peerPushCredit.get(peerPushCreditId);
+        if (!peerInc) {
+          return;
+        }
+        if (peerInc.status !== PeerPushCreditStatus.PendingMergeKycRequired) {
+          return;
+        }
+        const oldTxState = computePeerPushCreditTransactionState(peerInc);
+        peerInc.status = PeerPushCreditStatus.PendingMerge;
+        const newTxState = computePeerPushCreditTransactionState(peerInc);
+        await tx.peerPushCredit.put(peerInc);
+        return { oldTxState, newTxState };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+  } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+    // FIXME: Do we have to update the URL here?
+  } else {
+    throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+  }
+  return TaskRunResult.backoff();
 }
 
 async function processPeerPushCreditKycRequired(
@@ -786,7 +773,7 @@ async function handlePendingMerge(
   );
   notifyTransition(ws, transactionId, txRes?.peerPushCreditTransition);
 
-  return TaskRunResult.finished();
+  return TaskRunResult.backoff();
 }
 
 async function handlePendingWithdrawing(
@@ -839,13 +826,14 @@ async function handlePendingWithdrawing(
     return TaskRunResult.finished();
   } else {
     // FIXME: Return indicator that we depend on the other operation!
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
   }
 }
 
 export async function processPeerPushCredit(
   ws: InternalWalletState,
   peerPushCreditId: string,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   let peerInc: PeerPushPaymentIncomingRecord | undefined;
   let contractTerms: PeerContractTerms | undefined;
@@ -886,6 +874,7 @@ export async function processPeerPushCredit(
         peerInc.exchangeBaseUrl,
         peerInc.kycInfo,
         "individual",
+        cancellationToken,
       );
     }
 
@@ -940,7 +929,9 @@ export async function confirmPeerPushCredit(
     );
   }
 
-  ws.workAvailable.trigger();
+  const ctx = new PeerPushCreditTransactionContext(ws, peerPushCreditId);
+
+  ws.taskScheduler.startShepherdTask(ctx.retryTag);
 
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.PeerPushCredit,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts 
b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
index 2e5af4e78..165c8deee 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-push-debit.ts
@@ -16,6 +16,7 @@
 
 import {
   Amounts,
+  CancellationToken,
   CheckPeerPushDebitRequest,
   CheckPeerPushDebitResponse,
   CoinRefreshRequest,
@@ -32,6 +33,7 @@ import {
   TalerProtocolTimestamp,
   TalerProtocolViolationError,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -56,7 +58,7 @@ import {
   timestampProtocolToDb,
 } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
 import { PeerCoinRepair, selectPeerCoins } from "../util/coinSelection.js";
 import { checkLogicInvariant } from "../util/invariants.js";
@@ -65,7 +67,6 @@ import {
   TaskRunResultType,
   TransactionContext,
   constructTaskIdentifier,
-  runLongpollAsync,
   spendCoins,
 } from "./common.js";
 import {
@@ -76,14 +77,13 @@ import {
 import {
   constructTransactionIdentifier,
   notifyTransition,
-  stopLongpolling,
 } from "./transactions.js";
 
 const logger = new Logger("pay-peer-push-debit.ts");
 
 export class PeerPushDebitTransactionContext implements TransactionContext {
-  public transactionId: string;
-  public retryTag: string;
+  readonly transactionId: TransactionIdStr;
+  readonly retryTag: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -114,7 +114,6 @@ export class PeerPushDebitTransactionContext implements 
TransactionContext {
 
   async suspendTransaction(): Promise<void> {
     const { ws, pursePub, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPushDebit])
       .runReadWrite(async (tx) => {
@@ -166,12 +165,12 @@ export class PeerPushDebitTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async abortTransaction(): Promise<void> {
     const { ws, pursePub, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPushDebit])
       .runReadWrite(async (tx) => {
@@ -218,12 +217,13 @@ export class PeerPushDebitTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async resumeTransaction(): Promise<void> {
     const { ws, pursePub, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPushDebit])
       .runReadWrite(async (tx) => {
@@ -275,13 +275,12 @@ export class PeerPushDebitTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
+    ws.taskScheduler.startShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async failTransaction(): Promise<void> {
     const { ws, pursePub, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.peerPushDebit])
       .runReadWrite(async (tx) => {
@@ -328,7 +327,9 @@ export class PeerPushDebitTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 }
 
@@ -432,7 +433,7 @@ async function handlePurseCreationConflict(
       }
       await tx.peerPushDebit.put(myPpi);
     });
-  return TaskRunResult.finished();
+  return TaskRunResult.progress();
 }
 
 async function processPeerPushDebitCreateReserve(
@@ -554,7 +555,7 @@ async function processPeerPushDebitCreateReserve(
     stTo: PeerPushDebitStatus.PendingReady,
   });
 
-  return TaskRunResult.finished();
+  return TaskRunResult.backoff();
 }
 
 async function processPeerPushDebitAbortingDeletePurse(
@@ -628,7 +629,7 @@ async function processPeerPushDebitAbortingDeletePurse(
     });
   notifyTransition(ws, transactionId, transitionInfo);
 
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 interface SimpleTransition {
@@ -712,7 +713,7 @@ async function processPeerPushDebitAbortingRefreshDeleted(
     });
   notifyTransition(ws, transactionId, transitionInfo);
   // FIXME: Shouldn't this be finished in some cases?!
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 async function processPeerPushDebitAbortingRefreshExpired(
@@ -760,7 +761,7 @@ async function processPeerPushDebitAbortingRefreshExpired(
     });
   notifyTransition(ws, transactionId, transitionInfo);
   // FIXME: Shouldn't this be finished in some cases?!
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 /**
@@ -769,118 +770,102 @@ async function 
processPeerPushDebitAbortingRefreshExpired(
 async function processPeerPushDebitReady(
   ws: InternalWalletState,
   peerPushInitiation: PeerPushDebitRecord,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   logger.trace("processing peer-push-debit pending(ready)");
   const pursePub = peerPushInitiation.pursePub;
-  const retryTag = constructTaskIdentifier({
-    tag: PendingTaskType.PeerPushDebit,
-    pursePub,
-  });
   const transactionId = constructTaskIdentifier({
     tag: PendingTaskType.PeerPushDebit,
     pursePub,
   });
-  runLongpollAsync(ws, retryTag, async (ct) => {
-    const mergeUrl = new URL(
-      `purses/${pursePub}/merge`,
-      peerPushInitiation.exchangeBaseUrl,
+  const mergeUrl = new URL(
+    `purses/${pursePub}/merge`,
+    peerPushInitiation.exchangeBaseUrl,
+  );
+  mergeUrl.searchParams.set("timeout_ms", "30000");
+  logger.info(`long-polling on purse status at ${mergeUrl.href}`);
+  const resp = await ws.http.fetch(mergeUrl.href, {
+    // timeout: getReserveRequestTimeout(withdrawalGroup),
+    cancellationToken,
+  });
+  if (resp.status === HttpStatusCode.Ok) {
+    const purseStatus = await readSuccessResponseJsonOrThrow(
+      resp,
+      codecForExchangePurseStatus(),
     );
-    mergeUrl.searchParams.set("timeout_ms", "30000");
-    logger.info(`long-polling on purse status at ${mergeUrl.href}`);
-    const resp = await ws.http.fetch(mergeUrl.href, {
-      // timeout: getReserveRequestTimeout(withdrawalGroup),
-      cancellationToken: ct,
-    });
-    if (resp.status === HttpStatusCode.Ok) {
-      const purseStatus = await readSuccessResponseJsonOrThrow(
-        resp,
-        codecForExchangePurseStatus(),
+    const mergeTimestamp = purseStatus.merge_timestamp;
+    logger.info(`got purse status ${j2s(purseStatus)}`);
+    if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
+      return TaskRunResult.backoff();
+    } else {
+      await transitionPeerPushDebitTransaction(
+        ws,
+        peerPushInitiation.pursePub,
+        {
+          stFrom: PeerPushDebitStatus.PendingReady,
+          stTo: PeerPushDebitStatus.Done,
+        },
       );
-      const mergeTimestamp = purseStatus.merge_timestamp;
-      logger.info(`got purse status ${j2s(purseStatus)}`);
-      if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
-        return { ready: false };
-      } else {
-        await transitionPeerPushDebitTransaction(
+      return TaskRunResult.finished();
+    }
+  } else if (resp.status === HttpStatusCode.Gone) {
+    logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
+    const transitionInfo = await ws.db
+      .mktx((x) => [
+        x.peerPushDebit,
+        x.refreshGroups,
+        x.denominations,
+        x.coinAvailability,
+        x.coins,
+      ])
+      .runReadWrite(async (tx) => {
+        const ppiRec = await tx.peerPushDebit.get(pursePub);
+        if (!ppiRec) {
+          return undefined;
+        }
+        if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
+          return undefined;
+        }
+        const currency = Amounts.currencyOf(ppiRec.amount);
+        const oldTxState = computePeerPushDebitTransactionState(ppiRec);
+        const coinPubs: CoinRefreshRequest[] = [];
+
+        for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
+          coinPubs.push({
+            amount: ppiRec.coinSel.contributions[i],
+            coinPub: ppiRec.coinSel.coinPubs[i],
+          });
+        }
+
+        const refresh = await createRefreshGroup(
           ws,
-          peerPushInitiation.pursePub,
-          {
-            stFrom: PeerPushDebitStatus.PendingReady,
-            stTo: PeerPushDebitStatus.Done,
-          },
+          tx,
+          currency,
+          coinPubs,
+          RefreshReason.AbortPeerPushDebit,
+          transactionId,
         );
+        ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
+        ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
+        await tx.peerPushDebit.put(ppiRec);
+        const newTxState = computePeerPushDebitTransactionState(ppiRec);
         return {
-          ready: true,
+          oldTxState,
+          newTxState,
         };
-      }
-    } else if (resp.status === HttpStatusCode.Gone) {
-      logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
-      const transitionInfo = await ws.db
-        .mktx((x) => [
-          x.peerPushDebit,
-          x.refreshGroups,
-          x.denominations,
-          x.coinAvailability,
-          x.coins,
-        ])
-        .runReadWrite(async (tx) => {
-          const ppiRec = await tx.peerPushDebit.get(pursePub);
-          if (!ppiRec) {
-            return undefined;
-          }
-          if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
-            return undefined;
-          }
-          const currency = Amounts.currencyOf(ppiRec.amount);
-          const oldTxState = computePeerPushDebitTransactionState(ppiRec);
-          const coinPubs: CoinRefreshRequest[] = [];
-
-          for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
-            coinPubs.push({
-              amount: ppiRec.coinSel.contributions[i],
-              coinPub: ppiRec.coinSel.coinPubs[i],
-            });
-          }
-
-          const refresh = await createRefreshGroup(
-            ws,
-            tx,
-            currency,
-            coinPubs,
-            RefreshReason.AbortPeerPushDebit,
-            transactionId,
-          );
-          ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
-          ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
-          await tx.peerPushDebit.put(ppiRec);
-          const newTxState = computePeerPushDebitTransactionState(ppiRec);
-          return {
-            oldTxState,
-            newTxState,
-          };
-        });
-      notifyTransition(ws, transactionId, transitionInfo);
-      return {
-        ready: true,
-      };
-    } else {
-      logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
-      return {
-        ready: false,
-      };
-    }
-  });
-  logger.trace(
-    "returning early from peer-push-debit for long-polling in background",
-  );
-  return {
-    type: TaskRunResultType.Longpoll,
-  };
+      });
+    notifyTransition(ws, transactionId, transitionInfo);
+    return TaskRunResult.backoff();
+  } else {
+    logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
+    return TaskRunResult.backoff();
+  }
 }
 
 export async function processPeerPushDebit(
   ws: InternalWalletState,
   pursePub: string,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const peerPushInitiation = await ws.db
     .mktx((x) => [x.peerPushDebit])
@@ -891,24 +876,15 @@ export async function processPeerPushDebit(
     throw Error("peer push payment not found");
   }
 
-  const retryTag = constructTaskIdentifier({
-    tag: PendingTaskType.PeerPushDebit,
-    pursePub,
-  });
-
-  // We're already running!
-  if (ws.activeLongpoll[retryTag]) {
-    logger.info("peer-push-debit task already in long-polling, returning!");
-    return {
-      type: TaskRunResultType.Longpoll,
-    };
-  }
-
   switch (peerPushInitiation.status) {
     case PeerPushDebitStatus.PendingCreatePurse:
       return processPeerPushDebitCreateReserve(ws, peerPushInitiation);
     case PeerPushDebitStatus.PendingReady:
-      return processPeerPushDebitReady(ws, peerPushInitiation);
+      return processPeerPushDebitReady(
+        ws,
+        peerPushInitiation,
+        cancellationToken,
+      );
     case PeerPushDebitStatus.AbortingDeletePurse:
       return processPeerPushDebitAbortingDeletePurse(ws, peerPushInitiation);
     case PeerPushDebitStatus.AbortingRefreshDeleted:
@@ -971,10 +947,9 @@ export async function initiatePeerPushDebit(
 
   const pursePub = pursePair.pub;
 
-  const transactionId = constructTaskIdentifier({
-    tag: PendingTaskType.PeerPushDebit,
-    pursePub,
-  });
+  const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+
+  const transactionId = ctx.transactionId;
 
   const contractEncNonce = encodeCrock(getRandomBytes(24));
 
@@ -1044,6 +1019,8 @@ export async function initiatePeerPushDebit(
     hintTransactionId: transactionId,
   });
 
+  ws.taskScheduler.startShepherdTask(ctx.retryTag);
+
   return {
     contractPriv: contractKeyPair.priv,
     mergePriv: mergePair.priv,
diff --git a/packages/taler-wallet-core/src/operations/pending.ts 
b/packages/taler-wallet-core/src/operations/pending.ts
deleted file mode 100644
index 990d9a7b3..000000000
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ /dev/null
@@ -1,814 +0,0 @@
-/*
- This file is part of GNU Taler
- (C) 2019 GNUnet e.V.
-
- GNU Taler is free software; you can redistribute it and/or modify it under the
- terms of the GNU General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along with
- GNU Taler; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
- */
-
-/**
- * Derive pending tasks from the wallet database.
- */
-
-/**
- * Imports.
- */
-import { GlobalIDB } from "@gnu-taler/idb-bridge";
-import {
-  AbsoluteTime,
-  TalerPreciseTimestamp,
-  TransactionRecordFilter,
-} from "@gnu-taler/taler-util";
-import {
-  BackupProviderStateTag,
-  DbPreciseTimestamp,
-  DepositElementStatus,
-  DepositGroupRecord,
-  ExchangeEntryDbUpdateStatus,
-  OPERATION_STATUS_ACTIVE_FIRST,
-  OPERATION_STATUS_ACTIVE_LAST,
-  PeerPullCreditRecord,
-  PeerPullDebitRecordStatus,
-  PeerPullPaymentIncomingRecord,
-  PeerPushCreditStatus,
-  PeerPushDebitRecord,
-  PeerPushPaymentIncomingRecord,
-  PurchaseRecord,
-  PurchaseStatus,
-  RefreshCoinStatus,
-  RefreshGroupRecord,
-  RefreshOperationStatus,
-  RefundGroupRecord,
-  RewardRecord,
-  WalletStoresV1,
-  WithdrawalGroupRecord,
-  timestampAbsoluteFromDb,
-  timestampOptionalAbsoluteFromDb,
-  timestampPreciseFromDb,
-  timestampPreciseToDb,
-} from "../db.js";
-import { InternalWalletState } from "../internal-wallet-state.js";
-import {
-  PendingOperationsResponse,
-  PendingTaskType,
-  TaskId,
-} from "../pending-types.js";
-import { GetReadOnlyAccess } from "../util/query.js";
-import { TaskIdentifiers } from "./common.js";
-
-function getPendingCommon(
-  ws: InternalWalletState,
-  opTag: TaskId,
-  timestampDue: AbsoluteTime,
-): {
-  id: TaskId;
-  isDue: boolean;
-  timestampDue: AbsoluteTime;
-  isLongpolling: boolean;
-} {
-  const isDue =
-    AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag];
-  return {
-    id: opTag,
-    isDue,
-    timestampDue,
-    isLongpolling: !!ws.activeLongpoll[opTag],
-  };
-}
-
-async function gatherExchangePending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    exchanges: typeof WalletStoresV1.exchanges;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  let timestampDue: DbPreciseTimestamp | undefined = undefined;
-  await tx.exchanges.iter().forEachAsync(async (exch) => {
-    switch (exch.updateStatus) {
-      case ExchangeEntryDbUpdateStatus.Initial:
-      case ExchangeEntryDbUpdateStatus.Suspended:
-        return;
-    }
-    const opUpdateExchangeTag = TaskIdentifiers.forExchangeUpdate(exch);
-    let opr = await tx.operationRetries.get(opUpdateExchangeTag);
-
-    switch (exch.updateStatus) {
-      case ExchangeEntryDbUpdateStatus.Ready:
-        timestampDue = opr?.retryInfo.nextRetry ?? exch.nextRefreshCheckStamp;
-        break;
-      case ExchangeEntryDbUpdateStatus.ReadyUpdate:
-      case ExchangeEntryDbUpdateStatus.InitialUpdate:
-      case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
-        timestampDue =
-          opr?.retryInfo.nextRetry ??
-          timestampPreciseToDb(TalerPreciseTimestamp.now());
-        break;
-    }
-
-    resp.pendingOperations.push({
-      type: PendingTaskType.ExchangeUpdate,
-      ...getPendingCommon(
-        ws,
-        opUpdateExchangeTag,
-        
AbsoluteTime.fromPreciseTimestamp(timestampPreciseFromDb(timestampDue)),
-      ),
-      givesLifeness: false,
-      exchangeBaseUrl: exch.baseUrl,
-      lastError: opr?.lastError,
-    });
-
-    // We only schedule a check for auto-refresh if the exchange update
-    // was successful.
-    if (!opr?.lastError) {
-      const opCheckRefreshTag = TaskIdentifiers.forExchangeCheckRefresh(exch);
-      resp.pendingOperations.push({
-        type: PendingTaskType.ExchangeCheckRefresh,
-        ...getPendingCommon(
-          ws,
-          opCheckRefreshTag,
-          AbsoluteTime.fromPreciseTimestamp(
-            timestampPreciseFromDb(timestampDue),
-          ),
-        ),
-        timestampDue: AbsoluteTime.fromPreciseTimestamp(
-          timestampPreciseFromDb(exch.nextRefreshCheckStamp),
-        ),
-        givesLifeness: false,
-        exchangeBaseUrl: exch.baseUrl,
-      });
-    }
-  });
-}
-
-/**
- * Iterate refresh records based on a filter.
- */
-export async function iterRecordsForRefresh(
-  tx: GetReadOnlyAccess<{
-    refreshGroups: typeof WalletStoresV1.refreshGroups;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: RefreshGroupRecord) => Promise<void>,
-): Promise<void> {
-  let refreshGroups: RefreshGroupRecord[];
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      RefreshOperationStatus.Pending,
-      RefreshOperationStatus.Suspended,
-    );
-    refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
-  } else {
-    refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
-  }
-
-  for (const r of refreshGroups) {
-    await f(r);
-  }
-}
-
-async function gatherRefreshPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    refreshGroups: typeof WalletStoresV1.refreshGroups;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => {
-    if (r.timestampFinished) {
-      return;
-    }
-    const opId = TaskIdentifiers.forRefresh(r);
-    const retryRecord = await tx.operationRetries.get(opId);
-    const timestampDue =
-      timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-      AbsoluteTime.now();
-    resp.pendingOperations.push({
-      type: PendingTaskType.Refresh,
-      ...getPendingCommon(ws, opId, timestampDue),
-      givesLifeness: true,
-      refreshGroupId: r.refreshGroupId,
-      finishedPerCoin: r.statusPerCoin.map(
-        (x) => x === RefreshCoinStatus.Finished,
-      ),
-      retryInfo: retryRecord?.retryInfo,
-    });
-  });
-}
-
-export async function iterRecordsForWithdrawal(
-  tx: GetReadOnlyAccess<{
-    withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: WithdrawalGroupRecord) => Promise<void>,
-): Promise<void> {
-  let withdrawalGroupRecords: WithdrawalGroupRecord[];
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    withdrawalGroupRecords =
-      await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange);
-  } else {
-    withdrawalGroupRecords =
-      await tx.withdrawalGroups.indexes.byStatus.getAll();
-  }
-  for (const wgr of withdrawalGroupRecords) {
-    await f(wgr);
-  }
-}
-
-async function gatherWithdrawalPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
-    planchets: typeof WalletStoresV1.planchets;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => 
{
-    const opTag = TaskIdentifiers.forWithdrawal(wsr);
-    let opr = await tx.operationRetries.get(opTag);
-    /**
-     * kyc pending operation don't give lifeness
-     * since the user need to complete kyc procedure
-     */
-    const userNeedToCompleteKYC = wsr.kycUrl !== undefined;
-    const now = AbsoluteTime.now();
-    if (!opr) {
-      opr = {
-        id: opTag,
-        retryInfo: {
-          firstTry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
-          nextRetry: 
timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
-          retryCounter: 0,
-        },
-      };
-    }
-    resp.pendingOperations.push({
-      type: PendingTaskType.Withdraw,
-      ...getPendingCommon(
-        ws,
-        opTag,
-        timestampOptionalAbsoluteFromDb(opr.retryInfo?.nextRetry) ??
-          AbsoluteTime.now(),
-      ),
-      givesLifeness: !userNeedToCompleteKYC,
-      withdrawalGroupId: wsr.withdrawalGroupId,
-      lastError: opr.lastError,
-      retryInfo: opr.retryInfo,
-    });
-  });
-}
-
-export async function iterRecordsForDeposit(
-  tx: GetReadOnlyAccess<{
-    depositGroups: typeof WalletStoresV1.depositGroups;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: DepositGroupRecord) => Promise<void>,
-): Promise<void> {
-  let dgs: DepositGroupRecord[];
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange);
-  } else {
-    dgs = await tx.depositGroups.indexes.byStatus.getAll();
-  }
-
-  for (const dg of dgs) {
-    await f(dg);
-  }
-}
-
-async function gatherDepositPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    depositGroups: typeof WalletStoresV1.depositGroups;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => {
-    let deposited = true;
-    for (const d of dg.statusPerCoin) {
-      if (d === DepositElementStatus.DepositPending) {
-        deposited = false;
-      }
-    }
-    /**
-     * kyc pending operation don't give lifeness
-     * since the user need to complete kyc procedure
-     */
-    const userNeedToCompleteKYC = dg.kycInfo !== undefined;
-    const opId = TaskIdentifiers.forDeposit(dg);
-    const retryRecord = await tx.operationRetries.get(opId);
-    const timestampDue =
-      timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-      AbsoluteTime.now();
-    resp.pendingOperations.push({
-      type: PendingTaskType.Deposit,
-      ...getPendingCommon(ws, opId, timestampDue),
-      // Fully deposited operations don't give lifeness,
-      // because there is no reason to wait on the
-      // deposit tracking status.
-      givesLifeness: !deposited && !userNeedToCompleteKYC,
-      depositGroupId: dg.depositGroupId,
-      lastError: retryRecord?.lastError,
-      retryInfo: retryRecord?.retryInfo,
-    });
-  });
-}
-
-export async function iterRecordsForReward(
-  tx: GetReadOnlyAccess<{
-    rewards: typeof WalletStoresV1.rewards;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: RewardRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherRewardPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    rewards: typeof WalletStoresV1.rewards;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => {
-    const opId = TaskIdentifiers.forTipPickup(tip);
-    const retryRecord = await tx.operationRetries.get(opId);
-    const timestampDue =
-      timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-      AbsoluteTime.now();
-
-    /**
-     * kyc pending operation don't give lifeness
-     * since the user need to complete kyc procedure
-     */
-    // const userNeedToCompleteKYC = tip.
-
-    if (tip.acceptedTimestamp) {
-      resp.pendingOperations.push({
-        type: PendingTaskType.RewardPickup,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: true,
-        timestampDue,
-        merchantBaseUrl: tip.merchantBaseUrl,
-        tipId: tip.walletRewardId,
-        merchantTipId: tip.merchantRewardId,
-      });
-    }
-  });
-}
-
-export async function iterRecordsForRefund(
-  tx: GetReadOnlyAccess<{
-    refundGroups: typeof WalletStoresV1.refundGroups;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: RefundGroupRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.refundGroups.iter().forEachAsync(f);
-  }
-}
-
-export async function iterRecordsForPurchase(
-  tx: GetReadOnlyAccess<{
-    purchases: typeof WalletStoresV1.purchases;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: PurchaseRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherPurchasePending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    purchases: typeof WalletStoresV1.purchases;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => {
-    switch (pr.purchaseStatus) {
-      // These states are nonfinal but don't need any processing
-      case PurchaseStatus.DialogProposed:
-      case PurchaseStatus.DialogShared:
-        return;
-    }
-    const opId = TaskIdentifiers.forPay(pr);
-    const retryRecord = await tx.operationRetries.get(opId);
-    const timestampDue =
-      timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-      AbsoluteTime.now();
-    resp.pendingOperations.push({
-      type: PendingTaskType.Purchase,
-      ...getPendingCommon(ws, opId, timestampDue),
-      givesLifeness: true,
-      statusStr: PurchaseStatus[pr.purchaseStatus],
-      proposalId: pr.proposalId,
-      retryInfo: retryRecord?.retryInfo,
-      lastError: retryRecord?.lastError,
-    });
-  });
-}
-
-async function gatherRecoupPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    recoupGroups: typeof WalletStoresV1.recoupGroups;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  // FIXME: Have a status field!
-  await tx.recoupGroups.iter().forEachAsync(async (rg) => {
-    if (rg.timestampFinished) {
-      return;
-    }
-    const opId = TaskIdentifiers.forRecoup(rg);
-    const retryRecord = await tx.operationRetries.get(opId);
-    const timestampDue =
-      timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-      AbsoluteTime.now();
-    resp.pendingOperations.push({
-      type: PendingTaskType.Recoup,
-      ...getPendingCommon(ws, opId, timestampDue),
-      givesLifeness: true,
-      recoupGroupId: rg.recoupGroupId,
-      retryInfo: retryRecord?.retryInfo,
-      lastError: retryRecord?.lastError,
-    });
-  });
-}
-
-async function gatherBackupPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    backupProviders: typeof WalletStoresV1.backupProviders;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await tx.backupProviders.iter().forEachAsync(async (bp) => {
-    const opId = TaskIdentifiers.forBackup(bp);
-    const retryRecord = await tx.operationRetries.get(opId);
-    if (bp.state.tag === BackupProviderStateTag.Ready) {
-      const timestampDue = timestampAbsoluteFromDb(
-        bp.state.nextBackupTimestamp,
-      );
-      resp.pendingOperations.push({
-        type: PendingTaskType.Backup,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: false,
-        backupProviderBaseUrl: bp.baseUrl,
-        lastError: undefined,
-      });
-    } else if (bp.state.tag === BackupProviderStateTag.Retrying) {
-      const timestampDue =
-        timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo?.nextRetry) ??
-        AbsoluteTime.now();
-      resp.pendingOperations.push({
-        type: PendingTaskType.Backup,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: false,
-        backupProviderBaseUrl: bp.baseUrl,
-        retryInfo: retryRecord?.retryInfo,
-        lastError: retryRecord?.lastError,
-      });
-    }
-  });
-}
-
-export async function iterRecordsForPeerPullInitiation(
-  tx: GetReadOnlyAccess<{
-    peerPullCredit: typeof WalletStoresV1.peerPullCredit;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: PeerPullCreditRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherPeerPullInitiationPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    peerPullCredit: typeof WalletStoresV1.peerPullCredit;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForPeerPullInitiation(
-    tx,
-    { onlyState: "nonfinal" },
-    async (pi) => {
-      const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
-      const retryRecord = await tx.operationRetries.get(opId);
-      const timestampDue =
-        timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-        AbsoluteTime.now();
-
-      /**
-       * kyc pending operation don't give lifeness
-       * since the user need to complete kyc procedure
-       */
-      const userNeedToCompleteKYC = pi.kycUrl !== undefined;
-
-      resp.pendingOperations.push({
-        type: PendingTaskType.PeerPullCredit,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: !userNeedToCompleteKYC,
-        retryInfo: retryRecord?.retryInfo,
-        pursePub: pi.pursePub,
-        internalOperationStatus: `0x${pi.status.toString(16)}`,
-      });
-    },
-  );
-}
-
-export async function iterRecordsForPeerPullDebit(
-  tx: GetReadOnlyAccess<{
-    peerPullDebit: typeof WalletStoresV1.peerPullDebit;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherPeerPullDebitPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    peerPullDebit: typeof WalletStoresV1.peerPullDebit;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForPeerPullDebit(
-    tx,
-    { onlyState: "nonfinal" },
-    async (pi) => {
-      const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
-      const retryRecord = await tx.operationRetries.get(opId);
-      const timestampDue =
-        timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-        AbsoluteTime.now();
-      switch (pi.status) {
-        case PeerPullDebitRecordStatus.DialogProposed:
-          return;
-      }
-      resp.pendingOperations.push({
-        type: PendingTaskType.PeerPullDebit,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: true,
-        retryInfo: retryRecord?.retryInfo,
-        peerPullDebitId: pi.peerPullDebitId,
-        internalOperationStatus: `0x${pi.status.toString(16)}`,
-      });
-    },
-  );
-}
-
-export async function iterRecordsForPeerPushInitiation(
-  tx: GetReadOnlyAccess<{
-    peerPushDebit: typeof WalletStoresV1.peerPushDebit;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: PeerPushDebitRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherPeerPushInitiationPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    peerPushDebit: typeof WalletStoresV1.peerPushDebit;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForPeerPushInitiation(
-    tx,
-    { onlyState: "nonfinal" },
-    async (pi) => {
-      const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
-      const retryRecord = await tx.operationRetries.get(opId);
-      const timestampDue =
-        timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-        AbsoluteTime.now();
-      resp.pendingOperations.push({
-        type: PendingTaskType.PeerPushDebit,
-        ...getPendingCommon(ws, opId, timestampDue),
-        givesLifeness: true,
-        retryInfo: retryRecord?.retryInfo,
-        pursePub: pi.pursePub,
-      });
-    },
-  );
-}
-
-export async function iterRecordsForPeerPushCredit(
-  tx: GetReadOnlyAccess<{
-    peerPushCredit: typeof WalletStoresV1.peerPushCredit;
-  }>,
-  filter: TransactionRecordFilter,
-  f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
-): Promise<void> {
-  if (filter.onlyState === "nonfinal") {
-    const keyRange = GlobalIDB.KeyRange.bound(
-      OPERATION_STATUS_ACTIVE_FIRST,
-      OPERATION_STATUS_ACTIVE_LAST,
-    );
-    await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
-  } else {
-    await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f);
-  }
-}
-
-async function gatherPeerPushCreditPending(
-  ws: InternalWalletState,
-  tx: GetReadOnlyAccess<{
-    peerPushCredit: typeof WalletStoresV1.peerPushCredit;
-    operationRetries: typeof WalletStoresV1.operationRetries;
-  }>,
-  now: AbsoluteTime,
-  resp: PendingOperationsResponse,
-): Promise<void> {
-  await iterRecordsForPeerPushCredit(
-    tx,
-    { onlyState: "nonfinal" },
-    async (pi) => {
-      const opId = TaskIdentifiers.forPeerPushCredit(pi);
-      const retryRecord = await tx.operationRetries.get(opId);
-      const timestampDue =
-        timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
-        AbsoluteTime.now();
-
-      /**
-       * kyc pending operation don't give lifeness
-       * since the user need to complete kyc procedure
-       */
-      const userNeedToCompleteKYC = pi.kycUrl !== undefined;
-
-      switch (pi.status) {
-        // Status is nonfinal but no processing needs to be done
-        case PeerPushCreditStatus.DialogProposed:
-          return;
-        default:
-          resp.pendingOperations.push({
-            type: PendingTaskType.PeerPushCredit,
-            ...getPendingCommon(ws, opId, timestampDue),
-            givesLifeness: !userNeedToCompleteKYC,
-            retryInfo: retryRecord?.retryInfo,
-            peerPushCreditId: pi.peerPushCreditId,
-          });
-      }
-    },
-  );
-}
-
-const taskPrio: { [X in PendingTaskType]: number } = {
-  [PendingTaskType.Deposit]: 2,
-  [PendingTaskType.ExchangeUpdate]: 1,
-  [PendingTaskType.PeerPullCredit]: 2,
-  [PendingTaskType.PeerPullDebit]: 2,
-  [PendingTaskType.PeerPushCredit]: 2,
-  [PendingTaskType.Purchase]: 2,
-  [PendingTaskType.Recoup]: 3,
-  [PendingTaskType.RewardPickup]: 2,
-  [PendingTaskType.Refresh]: 3,
-  [PendingTaskType.Withdraw]: 3,
-  [PendingTaskType.ExchangeCheckRefresh]: 3,
-  [PendingTaskType.PeerPushDebit]: 2,
-  [PendingTaskType.Backup]: 4,
-};
-
-export async function getPendingOperations(
-  ws: InternalWalletState,
-): Promise<PendingOperationsResponse> {
-  const now = AbsoluteTime.now();
-  const resp = await ws.db
-    .mktx((x) => [
-      x.backupProviders,
-      x.exchanges,
-      x.exchangeDetails,
-      x.refreshGroups,
-      x.coins,
-      x.withdrawalGroups,
-      x.rewards,
-      x.purchases,
-      x.planchets,
-      x.depositGroups,
-      x.recoupGroups,
-      x.operationRetries,
-      x.peerPullCredit,
-      x.peerPushDebit,
-      x.peerPullDebit,
-      x.peerPushCredit,
-    ])
-    .runReadWrite(async (tx) => {
-      const resp: PendingOperationsResponse = {
-        pendingOperations: [],
-      };
-      await gatherExchangePending(ws, tx, now, resp);
-      await gatherRefreshPending(ws, tx, now, resp);
-      await gatherWithdrawalPending(ws, tx, now, resp);
-      await gatherDepositPending(ws, tx, now, resp);
-      await gatherRewardPending(ws, tx, now, resp);
-      await gatherPurchasePending(ws, tx, now, resp);
-      await gatherRecoupPending(ws, tx, now, resp);
-      await gatherBackupPending(ws, tx, now, resp);
-      await gatherPeerPushInitiationPending(ws, tx, now, resp);
-      await gatherPeerPullInitiationPending(ws, tx, now, resp);
-      await gatherPeerPullDebitPending(ws, tx, now, resp);
-      await gatherPeerPushCreditPending(ws, tx, now, resp);
-      return resp;
-    });
-
-  resp.pendingOperations.sort((a, b) => {
-    let prioA = taskPrio[a.type];
-    let prioB = taskPrio[b.type];
-    return Math.sign(prioA - prioB);
-  });
-
-  return resp;
-}
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts 
b/packages/taler-wallet-core/src/operations/refresh.ts
index 5f7169dbd..b9ac12518 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -15,12 +15,12 @@
  */
 
 import {
-  AbsoluteTime,
   AgeCommitment,
   AgeRestriction,
   AmountJson,
   Amounts,
   amountToPretty,
+  CancellationToken,
   codecForExchangeMeltResponse,
   codecForExchangeRevealResponse,
   CoinPublicKeyString,
@@ -29,8 +29,6 @@ import {
   DenominationInfo,
   DenomKeyType,
   Duration,
-  durationFromSpec,
-  durationMul,
   encodeCrock,
   ExchangeMeltRequest,
   ExchangeProtocolVersion,
@@ -51,7 +49,6 @@ import {
   TalerErrorCode,
   TalerErrorDetail,
   TalerPreciseTimestamp,
-  TalerProtocolTimestamp,
   TransactionAction,
   TransactionMajorState,
   TransactionState,
@@ -79,10 +76,11 @@ import {
 } from "../db.js";
 import {
   getCandidateWithdrawalDenomsTx,
+  PendingTaskType,
   RefreshGroupPerExchangeInfo,
   RefreshSessionRecord,
+  TaskId,
   timestampPreciseToDb,
-  timestampProtocolFromDb,
   WalletDbReadWriteTransactionArr,
 } from "../index.js";
 import {
@@ -94,6 +92,7 @@ import { selectWithdrawalDenominations } from 
"../util/coinSelection.js";
 import { checkDbInvariant } from "../util/invariants.js";
 import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
 import {
+  constructTaskIdentifier,
   makeCoinAvailable,
   makeCoinsVisible,
   TaskRunResult,
@@ -111,6 +110,7 @@ const logger = new Logger("refresh.ts");
 
 export class RefreshTransactionContext implements TransactionContext {
   public transactionId: string;
+  readonly taskId: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -120,6 +120,10 @@ export class RefreshTransactionContext implements 
TransactionContext {
       tag: TransactionType.Refresh,
       refreshGroupId,
     });
+    this.taskId = constructTaskIdentifier({
+      tag: PendingTaskType.Refresh,
+      refreshGroupId,
+    });
   }
 
   async deleteTransaction(): Promise<void> {
@@ -211,8 +215,8 @@ export class RefreshTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(this.taskId);
   }
 
   async failTransaction(): Promise<void> {
@@ -250,8 +254,9 @@ export class RefreshTransactionContext implements 
TransactionContext {
           newTxState: computeRefreshTransactionState(dg),
         };
       });
-    ws.workAvailable.trigger();
+    ws.taskScheduler.stopShepherdTask(this.taskId);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(this.taskId);
   }
 }
 
@@ -1003,7 +1008,7 @@ async function refreshReveal(
 export async function processRefreshGroup(
   ws: InternalWalletState,
   refreshGroupId: string,
-  options: Record<string, never> = {},
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   logger.trace(`processing refresh group ${refreshGroupId}`);
 
@@ -1053,7 +1058,7 @@ export async function processRefreshGroup(
     logger.warn(`exception: ${e}`);
   }
   if (inShutdown) {
-    return TaskRunResult.pending();
+    return TaskRunResult.backoff();
   }
   if (errors.length > 0) {
     return {
@@ -1068,7 +1073,7 @@ export async function processRefreshGroup(
     };
   }
 
-  return TaskRunResult.pending();
+  return TaskRunResult.backoff();
 }
 
 async function processRefreshSession(
@@ -1311,134 +1316,18 @@ export async function createRefreshGroup(
 
   logger.trace(`created refresh group ${refreshGroupId}`);
 
+  const ctx = new RefreshTransactionContext(ws, refreshGroupId);
+
+  // Shepherd the task.
+  // If the current transaction fails to commit the refresh
+  // group to the DB, the shepherd will give up.
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
   return {
     refreshGroupId,
   };
 }
 
-/**
- * Timestamp after which the wallet would do the next check for an 
auto-refresh.
- */
-function getAutoRefreshCheckThreshold(d: DenominationRecord): AbsoluteTime {
-  const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
-    timestampProtocolFromDb(d.stampExpireWithdraw),
-  );
-  const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
-    timestampProtocolFromDb(d.stampExpireDeposit),
-  );
-  const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
-  const deltaDiv = durationMul(delta, 0.75);
-  return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-/**
- * Timestamp after which the wallet would do an auto-refresh.
- */
-export function getAutoRefreshExecuteThreshold(d: {
-  stampExpireWithdraw: TalerProtocolTimestamp;
-  stampExpireDeposit: TalerProtocolTimestamp;
-}): AbsoluteTime {
-  const expireWithdraw = AbsoluteTime.fromProtocolTimestamp(
-    d.stampExpireWithdraw,
-  );
-  const expireDeposit = AbsoluteTime.fromProtocolTimestamp(
-    d.stampExpireDeposit,
-  );
-  const delta = AbsoluteTime.difference(expireWithdraw, expireDeposit);
-  const deltaDiv = durationMul(delta, 0.5);
-  return AbsoluteTime.addDuration(expireWithdraw, deltaDiv);
-}
-
-function getAutoRefreshExecuteThresholdForDenom(
-  d: DenominationRecord,
-): AbsoluteTime {
-  return getAutoRefreshExecuteThreshold({
-    stampExpireWithdraw: timestampProtocolFromDb(d.stampExpireWithdraw),
-    stampExpireDeposit: timestampProtocolFromDb(d.stampExpireDeposit),
-  });
-}
-
-export async function autoRefresh(
-  ws: InternalWalletState,
-  exchangeBaseUrl: string,
-): Promise<TaskRunResult> {
-  logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
-
-  // We must make sure that the exchange is up-to-date so that
-  // can refresh into new denominations.
-  await fetchFreshExchange(ws, exchangeBaseUrl);
-
-  let minCheckThreshold = AbsoluteTime.addDuration(
-    AbsoluteTime.now(),
-    durationFromSpec({ days: 1 }),
-  );
-  await ws.db
-    .mktx((x) => [
-      x.coins,
-      x.denominations,
-      x.coinAvailability,
-      x.refreshGroups,
-      x.exchanges,
-    ])
-    .runReadWrite(async (tx) => {
-      const exchange = await tx.exchanges.get(exchangeBaseUrl);
-      if (!exchange || !exchange.detailsPointer) {
-        return;
-      }
-      const coins = await tx.coins.indexes.byBaseUrl
-        .iter(exchangeBaseUrl)
-        .toArray();
-      const refreshCoins: CoinRefreshRequest[] = [];
-      for (const coin of coins) {
-        if (coin.status !== CoinStatus.Fresh) {
-          continue;
-        }
-        const denom = await tx.denominations.get([
-          exchangeBaseUrl,
-          coin.denomPubHash,
-        ]);
-        if (!denom) {
-          logger.warn("denomination not in database");
-          continue;
-        }
-        const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom);
-        if (AbsoluteTime.isExpired(executeThreshold)) {
-          refreshCoins.push({
-            coinPub: coin.coinPub,
-            amount: denom.value,
-          });
-        } else {
-          const checkThreshold = getAutoRefreshCheckThreshold(denom);
-          minCheckThreshold = AbsoluteTime.min(
-            minCheckThreshold,
-            checkThreshold,
-          );
-        }
-      }
-      if (refreshCoins.length > 0) {
-        const res = await createRefreshGroup(
-          ws,
-          tx,
-          exchange.detailsPointer?.currency,
-          refreshCoins,
-          RefreshReason.Scheduled,
-          undefined,
-        );
-        logger.trace(
-          `created refresh group for auto-refresh (${res.refreshGroupId})`,
-        );
-      }
-      logger.trace(
-        `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`,
-      );
-      exchange.nextRefreshCheckStamp = timestampPreciseToDb(
-        AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
-      );
-      await tx.exchanges.put(exchange);
-    });
-  return TaskRunResult.finished();
-}
-
 export function computeRefreshTransactionState(
   rg: RefreshGroupRecord,
 ): TransactionState {
diff --git a/packages/taler-wallet-core/src/operations/reward.ts 
b/packages/taler-wallet-core/src/operations/reward.ts
index 6dcd48019..4d8653a9d 100644
--- a/packages/taler-wallet-core/src/operations/reward.ts
+++ b/packages/taler-wallet-core/src/operations/reward.ts
@@ -83,7 +83,6 @@ import {
   constructTransactionIdentifier,
   notifyTransition,
   parseTransactionIdentifier,
-  stopLongpolling,
 } from "./transactions.js";
 import { PendingTaskType } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
@@ -125,7 +124,6 @@ export class RewardTransactionContext implements 
TransactionContext {
 
   async suspendTransaction(): Promise<void> {
     const { ws, walletRewardId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.rewards])
       .runReadWrite(async (tx) => {
@@ -161,13 +159,11 @@ export class RewardTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async abortTransaction(): Promise<void> {
     const { ws, walletRewardId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.rewards])
       .runReadWrite(async (tx) => {
@@ -207,7 +203,6 @@ export class RewardTransactionContext implements 
TransactionContext {
 
   async resumeTransaction(): Promise<void> {
     const { ws, walletRewardId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.rewards])
       .runReadWrite(async (tx) => {
@@ -247,7 +242,6 @@ export class RewardTransactionContext implements 
TransactionContext {
 
   async failTransaction(): Promise<void> {
     const { ws, walletRewardId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
     const transitionInfo = await ws.db
       .mktx((x) => [x.rewards])
       .runReadWrite(async (tx) => {
diff --git a/packages/taler-wallet-core/src/operations/testing.ts 
b/packages/taler-wallet-core/src/operations/testing.ts
index 4c2cfae2c..5902e8362 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -62,6 +62,7 @@ import {
 import { InternalWalletState } from "../internal-wallet-state.js";
 import { checkLogicInvariant } from "../util/invariants.js";
 import { getBalances } from "./balance.js";
+import { createDepositGroup } from "./deposits.js";
 import { fetchFreshExchange } from "./exchanges.js";
 import {
   confirmPay,
@@ -78,10 +79,8 @@ import {
   preparePeerPushCredit,
 } from "./pay-peer-push-credit.js";
 import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
-import { getPendingOperations } from "./pending.js";
 import { getTransactionById, getTransactions } from "./transactions.js";
 import { acceptWithdrawalFromUri } from "./withdraw.js";
-import { createDepositGroup } from "./deposits.js";
 
 const logger = new Logger("operations/testing.ts");
 
@@ -521,44 +520,6 @@ export async function waitUntilGivenTransactionsFinal(
   logger.info("done waiting until given transactions are in a final state");
 }
 
-/**
- * Wait until pending work is processed.
- */
-export async function waitUntilTasksProcessed(
-  ws: InternalWalletState,
-): Promise<void> {
-  logger.info("waiting until pending work is processed");
-  ws.ensureTaskLoopRunning();
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.PendingOperationProcessed) {
-      p.resolve();
-    }
-  });
-  while (1) {
-    p = openPromise();
-    const pendingTasksResp = await getPendingOperations(ws);
-    logger.info(`waiting on pending ops: ${j2s(pendingTasksResp)}`);
-    let finished = true;
-    for (const task of pendingTasksResp.pendingOperations) {
-      if (task.isDue) {
-        finished = false;
-      }
-      logger.info(`continuing waiting for task ${task.id}`);
-    }
-    if (finished) {
-      break;
-    }
-    // Wait until task is done
-    await p.promise;
-  }
-  logger.info("done waiting until pending work is processed");
-  cancelNotifs();
-}
-
 export async function waitUntilRefreshesDone(
   ws: InternalWalletState,
 ): Promise<void> {
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts 
b/packages/taler-wallet-core/src/operations/transactions.ts
index 3b4e75427..10e018d23 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -17,6 +17,7 @@
 /**
  * Imports.
  */
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
 import {
   AbsoluteTime,
   Amounts,
@@ -69,18 +70,19 @@ import {
 } from "../db.js";
 import {
   GetReadOnlyAccess,
+  OPERATION_STATUS_ACTIVE_FIRST,
+  OPERATION_STATUS_ACTIVE_LAST,
   PeerPushDebitStatus,
   timestampPreciseFromDb,
   timestampProtocolFromDb,
   WalletStoresV1,
 } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
 import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
 import {
   constructTaskIdentifier,
-  resetPendingTaskTimeout,
   TaskIdentifiers,
   TransactionContext,
 } from "./common.js";
@@ -122,18 +124,6 @@ import {
   computePeerPushDebitTransactionState,
   PeerPushDebitTransactionContext,
 } from "./pay-peer-push-debit.js";
-import {
-  iterRecordsForDeposit,
-  iterRecordsForPeerPullInitiation as iterRecordsForPeerPullCredit,
-  iterRecordsForPeerPullDebit,
-  iterRecordsForPeerPushCredit,
-  iterRecordsForPeerPushInitiation as iterRecordsForPeerPushDebit,
-  iterRecordsForPurchase,
-  iterRecordsForRefresh,
-  iterRecordsForRefund,
-  iterRecordsForReward,
-  iterRecordsForWithdrawal,
-} from "./pending.js";
 import {
   computeRefreshTransactionActions,
   computeRefreshTransactionState,
@@ -159,25 +149,32 @@ function shouldSkipCurrency(
   exchangesInTransaction: string[],
 ): boolean {
   if (transactionsRequest?.scopeInfo) {
-    const sameCurrency = transactionsRequest.scopeInfo.currency.toLowerCase() 
=== currency.toLowerCase()
+    const sameCurrency =
+      transactionsRequest.scopeInfo.currency.toLowerCase() ===
+      currency.toLowerCase();
     switch (transactionsRequest.scopeInfo.type) {
       case ScopeType.Global: {
-        return !sameCurrency
+        return !sameCurrency;
       }
       case ScopeType.Exchange: {
-        const exchangeInvolveInTransaction = 
exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !== -1
-        return !sameCurrency || !exchangeInvolveInTransaction
+        const exchangeInvolveInTransaction =
+          exchangesInTransaction.indexOf(transactionsRequest.scopeInfo.url) !==
+          -1;
+        return !sameCurrency || !exchangeInvolveInTransaction;
       }
       case ScopeType.Auditor: {
         // same currency and same auditor
-        throw Error("filering balance in auditor scope is not implemented")
+        throw Error("filering balance in auditor scope is not implemented");
       }
-      default: assertUnreachable(transactionsRequest.scopeInfo)
+      default:
+        assertUnreachable(transactionsRequest.scopeInfo);
     }
   }
   // FIXME: remove next release
   if (transactionsRequest?.currency) {
-    return transactionsRequest.currency.toLowerCase() !== 
currency.toLowerCase();
+    return (
+      transactionsRequest.currency.toLowerCase() !== currency.toLowerCase()
+    );
   }
   return false;
 }
@@ -565,7 +562,7 @@ function buildTransactionForPeerPullCredit(
     const silentWithdrawalErrorForInvoice =
       wsrOrt?.lastError &&
       wsrOrt.lastError.code ===
-      TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
+        TalerErrorCode.WALLET_WITHDRAWAL_GROUP_INCOMPLETE &&
       Object.values(wsrOrt.lastError.errorsPerCoin ?? {}).every((e) => {
         return (
           e.code === TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR &&
@@ -598,10 +595,10 @@ function buildTransactionForPeerPullCredit(
       kycUrl: pullCredit.kycUrl,
       ...(wsrOrt?.lastError
         ? {
-          error: silentWithdrawalErrorForInvoice
-            ? undefined
-            : wsrOrt.lastError,
-        }
+            error: silentWithdrawalErrorForInvoice
+              ? undefined
+              : wsrOrt.lastError,
+          }
         : {}),
     };
   }
@@ -1118,8 +1115,14 @@ export async function getTransactions(
     .runReadOnly(async (tx) => {
       await iterRecordsForPeerPushDebit(tx, filter, async (pi) => {
         const amount = Amounts.parseOrThrow(pi.amount);
-        const exchangesInTx = [pi.exchangeBaseUrl]
-        if (shouldSkipCurrency(transactionsRequest, amount.currency, 
exchangesInTx)) {
+        const exchangesInTx = [pi.exchangeBaseUrl];
+        if (
+          shouldSkipCurrency(
+            transactionsRequest,
+            amount.currency,
+            exchangesInTx,
+          )
+        ) {
           return;
         }
         if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1134,8 +1137,14 @@ export async function getTransactions(
 
       await iterRecordsForPeerPullDebit(tx, filter, async (pi) => {
         const amount = Amounts.parseOrThrow(pi.amount);
-        const exchangesInTx = [pi.exchangeBaseUrl]
-        if (shouldSkipCurrency(transactionsRequest, amount.currency, 
exchangesInTx)) {
+        const exchangesInTx = [pi.exchangeBaseUrl];
+        if (
+          shouldSkipCurrency(
+            transactionsRequest,
+            amount.currency,
+            exchangesInTx,
+          )
+        ) {
           return;
         }
         if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1169,8 +1178,10 @@ export async function getTransactions(
           // Legacy transaction
           return;
         }
-        const exchangesInTx = [pi.exchangeBaseUrl]
-        if (shouldSkipCurrency(transactionsRequest, pi.currency, 
exchangesInTx)) {
+        const exchangesInTx = [pi.exchangeBaseUrl];
+        if (
+          shouldSkipCurrency(transactionsRequest, pi.currency, exchangesInTx)
+        ) {
           return;
         }
         if (shouldSkipSearch(transactionsRequest, [])) {
@@ -1208,7 +1219,7 @@ export async function getTransactions(
 
       await iterRecordsForPeerPullCredit(tx, filter, async (pi) => {
         const currency = Amounts.currencyOf(pi.amount);
-        const exchangesInTx = [pi.exchangeBaseUrl]
+        const exchangesInTx = [pi.exchangeBaseUrl];
         if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) {
           return;
         }
@@ -1243,16 +1254,16 @@ export async function getTransactions(
       await iterRecordsForRefund(tx, filter, async (refundGroup) => {
         const currency = Amounts.currencyOf(refundGroup.amountRaw);
 
-        const exchangesInTx: string[] = []
-        const p = await tx.purchases.get(refundGroup.proposalId)
+        const exchangesInTx: string[] = [];
+        const p = await tx.purchases.get(refundGroup.proposalId);
         if (!p || !p.payInfo) return; //refund with no payment
 
         p.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => {
-          const c = await tx.coins.get(cp)
+          const c = await tx.coins.get(cp);
           if (c?.exchangeBaseUrl) {
-            exchangesInTx.push(c.exchangeBaseUrl)
+            exchangesInTx.push(c.exchangeBaseUrl);
           }
-        })
+        });
 
         if (shouldSkipCurrency(transactionsRequest, currency, exchangesInTx)) {
           return;
@@ -1265,8 +1276,12 @@ export async function getTransactions(
       });
 
       await iterRecordsForRefresh(tx, filter, async (rg) => {
-        const exchangesInTx = rg.infoPerExchange ? 
Object.keys(rg.infoPerExchange) : []
-        if (shouldSkipCurrency(transactionsRequest, rg.currency, 
exchangesInTx)) {
+        const exchangesInTx = rg.infoPerExchange
+          ? Object.keys(rg.infoPerExchange)
+          : [];
+        if (
+          shouldSkipCurrency(transactionsRequest, rg.currency, exchangesInTx)
+        ) {
           return;
         }
         let required = false;
@@ -1286,7 +1301,7 @@ export async function getTransactions(
       });
 
       await iterRecordsForWithdrawal(tx, filter, async (wsr) => {
-        const exchangesInTx = [wsr.exchangeBaseUrl]
+        const exchangesInTx = [wsr.exchangeBaseUrl];
         if (
           shouldSkipCurrency(
             transactionsRequest,
@@ -1343,8 +1358,16 @@ export async function getTransactions(
 
       await iterRecordsForDeposit(tx, filter, async (dg) => {
         const amount = Amounts.parseOrThrow(dg.amount);
-        const exchangesInTx = dg.infoPerExchange ? 
Object.keys(dg.infoPerExchange) : []
-        if (shouldSkipCurrency(transactionsRequest, amount.currency, 
exchangesInTx)) {
+        const exchangesInTx = dg.infoPerExchange
+          ? Object.keys(dg.infoPerExchange)
+          : [];
+        if (
+          shouldSkipCurrency(
+            transactionsRequest,
+            amount.currency,
+            exchangesInTx,
+          )
+        ) {
           return;
         }
         const opId = TaskIdentifiers.forDeposit(dg);
@@ -1362,15 +1385,21 @@ export async function getTransactions(
           return;
         }
 
-        const exchangesInTx: string[] = []
+        const exchangesInTx: string[] = [];
         purchase.payInfo.payCoinSelection.coinPubs.forEach(async (cp) => {
-          const c = await tx.coins.get(cp)
+          const c = await tx.coins.get(cp);
           if (c?.exchangeBaseUrl) {
-            exchangesInTx.push(c.exchangeBaseUrl)
+            exchangesInTx.push(c.exchangeBaseUrl);
           }
-        })
+        });
 
-        if (shouldSkipCurrency(transactionsRequest, download.currency, 
exchangesInTx)) {
+        if (
+          shouldSkipCurrency(
+            transactionsRequest,
+            download.currency,
+            exchangesInTx,
+          )
+        ) {
           return;
         }
         const contractTermsRecord = await tx.contractTerms.get(
@@ -1429,7 +1458,6 @@ export async function getTransactions(
         transactions.push(buildTransactionForTip(tipRecord, retryRecord));
       });
       //ends REMOVE REWARDS
-
     });
 
   // One-off checks, because of a bug where the wallet previously
@@ -1587,25 +1615,7 @@ export function parseTransactionIdentifier(
   }
 }
 
-export function stopLongpolling(ws: InternalWalletState, taskId: string) {
-  const longpoll = ws.activeLongpoll[taskId];
-  if (longpoll) {
-    logger.info(`cancelling long-polling for ${taskId}`);
-    longpoll.cancel();
-    delete ws.activeLongpoll[taskId];
-  }
-}
-
-/**
- * Immediately retry the underlying operation
- * of a transaction.
- */
-export async function retryTransaction(
-  ws: InternalWalletState,
-  transactionId: string,
-): Promise<void> {
-  logger.info(`resetting retry timeout for ${transactionId}`);
-
+function maybeTaskFromTransaction(transactionId: string): TaskId | undefined {
   const parsedTx = parseTransactionIdentifier(transactionId);
 
   if (!parsedTx) {
@@ -1615,100 +1625,80 @@ export async function retryTransaction(
   // FIXME: We currently don't cancel active long-polling tasks here.
 
   switch (parsedTx.tag) {
-    case TransactionType.PeerPullCredit: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.PeerPullCredit:
+      return constructTaskIdentifier({
         tag: PendingTaskType.PeerPullCredit,
         pursePub: parsedTx.pursePub,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.Deposit: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.Deposit:
+      return constructTaskIdentifier({
         tag: PendingTaskType.Deposit,
         depositGroupId: parsedTx.depositGroupId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
     case TransactionType.InternalWithdrawal:
-    case TransactionType.Withdrawal: {
-      // FIXME: Abort current long-poller!
-      const taskId = constructTaskIdentifier({
+    case TransactionType.Withdrawal:
+      return constructTaskIdentifier({
         tag: PendingTaskType.Withdraw,
         withdrawalGroupId: parsedTx.withdrawalGroupId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.Payment: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.Payment:
+      return constructTaskIdentifier({
         tag: PendingTaskType.Purchase,
         proposalId: parsedTx.proposalId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.Reward: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.Reward:
+      return constructTaskIdentifier({
         tag: PendingTaskType.RewardPickup,
         walletRewardId: parsedTx.walletRewardId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.Refresh: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.Refresh:
+      return constructTaskIdentifier({
         tag: PendingTaskType.Refresh,
         refreshGroupId: parsedTx.refreshGroupId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.PeerPullDebit: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.PeerPullDebit:
+      return constructTaskIdentifier({
         tag: PendingTaskType.PeerPullDebit,
         peerPullDebitId: parsedTx.peerPullDebitId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.PeerPushCredit: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.PeerPushCredit:
+      return constructTaskIdentifier({
         tag: PendingTaskType.PeerPushCredit,
         peerPushCreditId: parsedTx.peerPushCreditId,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
-    case TransactionType.PeerPushDebit: {
-      const taskId = constructTaskIdentifier({
+    case TransactionType.PeerPushDebit:
+      return constructTaskIdentifier({
         tag: PendingTaskType.PeerPushDebit,
         pursePub: parsedTx.pursePub,
       });
-      await resetPendingTaskTimeout(ws, taskId);
-      stopLongpolling(ws, taskId);
-      break;
-    }
     case TransactionType.Refund:
       // Nothing to do for a refund transaction.
-      break;
+      return undefined;
     case TransactionType.Recoup:
-      // FIXME!
-      throw Error("not implemented");
+      return constructTaskIdentifier({
+        tag: PendingTaskType.Recoup,
+        recoupGroupId: parsedTx.recoupGroupId,
+      });
     default:
       assertUnreachable(parsedTx);
   }
 }
 
+/**
+ * Immediately retry the underlying operation
+ * of a transaction.
+ */
+export async function retryTransaction(
+  ws: InternalWalletState,
+  transactionId: string,
+): Promise<void> {
+  logger.info(`resetting retry timeout for ${transactionId}`);
+  const taskId = maybeTaskFromTransaction(transactionId);
+  if (taskId) {
+    ws.taskScheduler.resetTaskRetries(taskId);
+  }
+}
+
 async function getContextForTransaction(
   ws: InternalWalletState,
   transactionId: string,
@@ -1828,5 +1818,203 @@ export function notifyTransition(
       experimentalUserData,
     });
   }
-  ws.workAvailable.trigger();
+}
+
+/**
+ * Iterate refresh records based on a filter.
+ */
+async function iterRecordsForRefresh(
+  tx: GetReadOnlyAccess<{
+    refreshGroups: typeof WalletStoresV1.refreshGroups;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: RefreshGroupRecord) => Promise<void>,
+): Promise<void> {
+  let refreshGroups: RefreshGroupRecord[];
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      RefreshOperationStatus.Pending,
+      RefreshOperationStatus.Suspended,
+    );
+    refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
+  } else {
+    refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
+  }
+
+  for (const r of refreshGroups) {
+    await f(r);
+  }
+}
+
+async function iterRecordsForWithdrawal(
+  tx: GetReadOnlyAccess<{
+    withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: WithdrawalGroupRecord) => Promise<void>,
+): Promise<void> {
+  let withdrawalGroupRecords: WithdrawalGroupRecord[];
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    withdrawalGroupRecords =
+      await tx.withdrawalGroups.indexes.byStatus.getAll(keyRange);
+  } else {
+    withdrawalGroupRecords =
+      await tx.withdrawalGroups.indexes.byStatus.getAll();
+  }
+  for (const wgr of withdrawalGroupRecords) {
+    await f(wgr);
+  }
+}
+
+async function iterRecordsForDeposit(
+  tx: GetReadOnlyAccess<{
+    depositGroups: typeof WalletStoresV1.depositGroups;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: DepositGroupRecord) => Promise<void>,
+): Promise<void> {
+  let dgs: DepositGroupRecord[];
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    dgs = await tx.depositGroups.indexes.byStatus.getAll(keyRange);
+  } else {
+    dgs = await tx.depositGroups.indexes.byStatus.getAll();
+  }
+
+  for (const dg of dgs) {
+    await f(dg);
+  }
+}
+
+async function iterRecordsForReward(
+  tx: GetReadOnlyAccess<{
+    rewards: typeof WalletStoresV1.rewards;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: RewardRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.rewards.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForRefund(
+  tx: GetReadOnlyAccess<{
+    refundGroups: typeof WalletStoresV1.refundGroups;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: RefundGroupRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.refundGroups.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForPurchase(
+  tx: GetReadOnlyAccess<{
+    purchases: typeof WalletStoresV1.purchases;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: PurchaseRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForPeerPullCredit(
+  tx: GetReadOnlyAccess<{
+    peerPullCredit: typeof WalletStoresV1.peerPullCredit;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: PeerPullCreditRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForPeerPullDebit(
+  tx: GetReadOnlyAccess<{
+    peerPullDebit: typeof WalletStoresV1.peerPullDebit;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForPeerPushDebit(
+  tx: GetReadOnlyAccess<{
+    peerPushDebit: typeof WalletStoresV1.peerPushDebit;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: PeerPushDebitRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f);
+  }
+}
+
+async function iterRecordsForPeerPushCredit(
+  tx: GetReadOnlyAccess<{
+    peerPushCredit: typeof WalletStoresV1.peerPushCredit;
+  }>,
+  filter: TransactionRecordFilter,
+  f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+  if (filter.onlyState === "nonfinal") {
+    const keyRange = GlobalIDB.KeyRange.bound(
+      OPERATION_STATUS_ACTIVE_FIRST,
+      OPERATION_STATUS_ACTIVE_LAST,
+    );
+    await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
+  } else {
+    await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f);
+  }
 }
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts 
b/packages/taler-wallet-core/src/operations/withdraw.ts
index 86f05478a..4a50e0775 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -1,6 +1,6 @@
 /*
  This file is part of GNU Taler
- (C) 2019-2021 Taler Systems SA
+ (C) 2019-2024 Taler Systems SA
 
  GNU Taler is free software; you can redistribute it and/or modify it under the
  terms of the GNU General Public License as published by the Free Software
@@ -52,6 +52,7 @@ import {
   TalerPreciseTimestamp,
   TalerProtocolTimestamp,
   TransactionAction,
+  TransactionIdStr,
   TransactionMajorState,
   TransactionMinorState,
   TransactionState,
@@ -67,7 +68,6 @@ import {
   codecForCashinConversionResponse,
   codecForConversionBankConfig,
   codecForExchangeWithdrawBatchResponse,
-  codecForIntegrationBankConfig,
   codecForReserveStatus,
   codecForWalletKycUuid,
   codecForWithdrawOperationStatusResponse,
@@ -103,7 +103,6 @@ import {
 import { isWithdrawableDenom, timestampPreciseToDb } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
 import {
-  TaskIdentifiers,
   TaskRunResult,
   TaskRunResultType,
   TombstoneTag,
@@ -111,9 +110,8 @@ import {
   constructTaskIdentifier,
   makeCoinAvailable,
   makeCoinsVisible,
-  runLongpollAsync,
 } from "../operations/common.js";
-import { PendingTaskType } from "../pending-types.js";
+import { PendingTaskType, TaskId } from "../pending-types.js";
 import { assertUnreachable } from "../util/assertUnreachable.js";
 import {
   selectForcedWithdrawalDenominations,
@@ -141,7 +139,6 @@ import {
   TransitionInfo,
   constructTransactionIdentifier,
   notifyTransition,
-  stopLongpolling,
 } from "./transactions.js";
 
 /**
@@ -150,8 +147,8 @@ import {
 const logger = new Logger("operations/withdraw.ts");
 
 export class WithdrawTransactionContext implements TransactionContext {
-  public transactionId: string;
-  public retryTag: string;
+  readonly transactionId: TransactionIdStr;
+  readonly taskId: TaskId;
 
   constructor(
     public ws: InternalWalletState,
@@ -161,7 +158,7 @@ export class WithdrawTransactionContext implements 
TransactionContext {
       tag: TransactionType.Withdrawal,
       withdrawalGroupId,
     });
-    this.retryTag = constructTaskIdentifier({
+    this.taskId = constructTaskIdentifier({
       tag: PendingTaskType.Withdraw,
       withdrawalGroupId,
     });
@@ -185,8 +182,7 @@ export class WithdrawTransactionContext implements 
TransactionContext {
   }
 
   async suspendTransaction(): Promise<void> {
-    const { ws, withdrawalGroupId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
+    const { ws, withdrawalGroupId, transactionId, taskId } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.withdrawalGroups])
       .runReadWrite(async (tx) => {
@@ -235,13 +231,12 @@ export class WithdrawTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-
+    ws.taskScheduler.stopShepherdTask(taskId);
     notifyTransition(ws, transactionId, transitionInfo);
   }
 
   async abortTransaction(): Promise<void> {
-    const { ws, withdrawalGroupId, transactionId } = this;
-    stopLongpolling(ws, this.retryTag);
+    const { ws, withdrawalGroupId, transactionId, taskId } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.withdrawalGroups])
       .runReadWrite(async (tx) => {
@@ -297,12 +292,13 @@ export class WithdrawTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
+    ws.taskScheduler.stopShepherdTask(taskId);
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(taskId);
   }
 
   async resumeTransaction(): Promise<void> {
-    const { ws, withdrawalGroupId, transactionId } = this;
+    const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
     const transitionInfo = await ws.db
       .mktx((x) => [x.withdrawalGroups])
       .runReadWrite(async (tx) => {
@@ -351,13 +347,12 @@ export class WithdrawTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
-    ws.workAvailable.trigger();
     notifyTransition(ws, transactionId, transitionInfo);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 
   async failTransaction(): Promise<void> {
-    const { ws, withdrawalGroupId, transactionId, retryTag } = this;
-    stopLongpolling(ws, retryTag);
+    const { ws, withdrawalGroupId, transactionId, taskId: retryTag } = this;
     const stateUpdate = await ws.db
       .mktx((x) => [x.withdrawalGroups])
       .runReadWrite(async (tx) => {
@@ -387,7 +382,9 @@ export class WithdrawTransactionContext implements 
TransactionContext {
         }
         return undefined;
       });
+    ws.taskScheduler.stopShepherdTask(retryTag);
     notifyTransition(ws, transactionId, stateUpdate);
+    ws.taskScheduler.startShepherdTask(retryTag);
   }
 }
 
@@ -744,10 +741,8 @@ async function transitionKycUrlUpdate(
   kycUrl: string,
 ): Promise<void> {
   let notificationKycUrl: string | undefined = undefined;
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Withdrawal,
-    withdrawalGroupId,
-  });
+  const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+  const transactionId = ctx.transactionId;
 
   const transitionInfo = await ws.db
     .mktx((x) => [x.planchets, x.withdrawalGroups])
@@ -782,7 +777,7 @@ async function transitionKycUrlUpdate(
       experimentalUserData: notificationKycUrl,
     });
   }
-  ws.workAvailable.trigger();
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
 }
 
 async function handleKycRequired(
@@ -1273,7 +1268,7 @@ async function queryReserve(
   ws: InternalWalletState,
   withdrawalGroupId: string,
   cancellationToken: CancellationToken,
-): Promise<{ ready: boolean }> {
+): Promise<TaskRunResult> {
   const transactionId = constructTransactionIdentifier({
     tag: TransactionType.Withdrawal,
     withdrawalGroupId,
@@ -1283,7 +1278,7 @@ async function queryReserve(
   });
   checkDbInvariant(!!withdrawalGroup);
   if (withdrawalGroup.status !== WithdrawalGroupStatus.PendingQueryingStatus) {
-    return { ready: true };
+    return TaskRunResult.backoff();
   }
   const reservePub = withdrawalGroup.reservePub;
 
@@ -1312,7 +1307,7 @@ async function queryReserve(
       `got reserve status error, EC=${result.talerErrorResponse.code}`,
     );
     if (resp.status === HttpStatusCode.NotFound) {
-      return { ready: false };
+      return TaskRunResult.backoff();
     } else {
       throwUnexpectedRequestError(resp, result.talerErrorResponse);
     }
@@ -1341,13 +1336,7 @@ async function queryReserve(
 
   notifyTransition(ws, transactionId, transitionResult);
 
-  return { ready: true };
-}
-
-enum BankStatusResultCode {
-  Done = "done",
-  Waiting = "waiting",
-  Aborted = "aborted",
+  return TaskRunResult.backoff();
 }
 
 /**
@@ -1452,6 +1441,7 @@ async function transitionKycSatisfied(
 async function processWithdrawalGroupPendingKyc(
   ws: InternalWalletState,
   withdrawalGroup: WithdrawalGroupRecord,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const userType = "individual";
   const kycInfo = withdrawalGroup.kycPending;
@@ -1467,45 +1457,35 @@ async function processWithdrawalGroupPendingKyc(
 
   const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
 
-  const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
-  runLongpollAsync(ws, retryTag, async (cancellationToken) => {
-    logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
-    const kycStatusRes = await ws.http.fetch(url.href, {
-      method: "GET",
-      cancellationToken,
-    });
-    logger.info(
-      `kyc long-polling response status: HTTP ${kycStatusRes.status}`,
-    );
-    if (
-      kycStatusRes.status === HttpStatusCode.Ok ||
-      //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
-      // remove after the exchange is fixed or clarified
-      kycStatusRes.status === HttpStatusCode.NoContent
-    ) {
-      await transitionKycSatisfied(ws, withdrawalGroup);
-      return { ready: true };
-    } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
-      const kycStatus = await kycStatusRes.json();
-      logger.info(`kyc status: ${j2s(kycStatus)}`);
-      const kycUrl = kycStatus.kyc_url;
-      if (typeof kycUrl === "string") {
-        await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl);
-      }
-      return { ready: false };
-    } else if (
-      kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons
-    ) {
-      const kycStatus = await kycStatusRes.json();
-      logger.info(`aml status: ${j2s(kycStatus)}`);
-      return { ready: false };
-    } else {
-      throw Error(
-        `unexpected response from kyc-check (${kycStatusRes.status})`,
-      );
-    }
+  logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
+  const kycStatusRes = await ws.http.fetch(url.href, {
+    method: "GET",
+    cancellationToken,
   });
-  return TaskRunResult.longpoll();
+  logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`);
+  if (
+    kycStatusRes.status === HttpStatusCode.Ok ||
+    //FIXME: NoContent is not expected 
https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
+    // remove after the exchange is fixed or clarified
+    kycStatusRes.status === HttpStatusCode.NoContent
+  ) {
+    await transitionKycSatisfied(ws, withdrawalGroup);
+  } else if (kycStatusRes.status === HttpStatusCode.Accepted) {
+    const kycStatus = await kycStatusRes.json();
+    logger.info(`kyc status: ${j2s(kycStatus)}`);
+    const kycUrl = kycStatus.kyc_url;
+    if (typeof kycUrl === "string") {
+      await transitionKycUrlUpdate(ws, withdrawalGroupId, kycUrl);
+    }
+  } else if (
+    kycStatusRes.status === HttpStatusCode.UnavailableForLegalReasons
+  ) {
+    const kycStatus = await kycStatusRes.json();
+    logger.info(`aml status: ${j2s(kycStatus)}`);
+  } else {
+    throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
+  }
+  return TaskRunResult.backoff();
 }
 
 async function processWithdrawalGroupPendingReady(
@@ -1666,12 +1646,13 @@ async function processWithdrawalGroupPendingReady(
     };
   }
 
-  return TaskRunResult.finished();
+  return TaskRunResult.backoff();
 }
 
 export async function processWithdrawalGroup(
   ws: InternalWalletState,
   withdrawalGroupId: string,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   logger.trace("processing withdrawal group", withdrawalGroupId);
   const withdrawalGroup = await ws.db
@@ -1684,54 +1665,30 @@ export async function processWithdrawalGroup(
     throw Error(`withdrawal group ${withdrawalGroupId} not found`);
   }
 
-  const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
-
-  // We're already running!
-  if (ws.activeLongpoll[retryTag]) {
-    logger.info("withdrawal group already in long-polling, returning!");
-    return {
-      type: TaskRunResultType.Longpoll,
-    };
-  }
-
   switch (withdrawalGroup.status) {
     case WithdrawalGroupStatus.PendingRegisteringBank:
       await processReserveBankStatus(ws, withdrawalGroupId);
       // FIXME: This will get called by the main task loop, why call it here?!
-      return await processWithdrawalGroup(ws, withdrawalGroupId);
-    case WithdrawalGroupStatus.PendingQueryingStatus: {
-      runLongpollAsync(ws, retryTag, (ct) => {
-        return queryReserve(ws, withdrawalGroupId, ct);
-      });
-      logger.trace(
-        "returning early from withdrawal for long-polling in background",
+      return await processWithdrawalGroup(
+        ws,
+        withdrawalGroupId,
+        cancellationToken,
       );
-      return {
-        type: TaskRunResultType.Longpoll,
-      };
+    case WithdrawalGroupStatus.PendingQueryingStatus: {
+      return queryReserve(ws, withdrawalGroupId, cancellationToken);
     }
     case WithdrawalGroupStatus.PendingWaitConfirmBank: {
-      const res = await processReserveBankStatus(ws, withdrawalGroupId);
-      switch (res.status) {
-        case BankStatusResultCode.Aborted:
-        case BankStatusResultCode.Done:
-          return TaskRunResult.finished();
-        case BankStatusResultCode.Waiting: {
-          return TaskRunResult.pending();
-        }
-      }
-      break;
-    }
-    case WithdrawalGroupStatus.Done:
-    case WithdrawalGroupStatus.FailedBankAborted: {
-      // FIXME
-      return TaskRunResult.pending();
+      return await processReserveBankStatus(ws, withdrawalGroupId);
     }
     case WithdrawalGroupStatus.PendingAml:
       // FIXME: Handle this case, withdrawal doesn't support AML yet.
-      return TaskRunResult.pending();
+      return TaskRunResult.backoff();
     case WithdrawalGroupStatus.PendingKyc:
-      return processWithdrawalGroupPendingKyc(ws, withdrawalGroup);
+      return processWithdrawalGroupPendingKyc(
+        ws,
+        withdrawalGroup,
+        cancellationToken,
+      );
     case WithdrawalGroupStatus.PendingReady:
       // Continue with the actual withdrawal!
       return await processWithdrawalGroupPendingReady(ws, withdrawalGroup);
@@ -1747,6 +1704,8 @@ export async function processWithdrawalGroup(
     case WithdrawalGroupStatus.SuspendedReady:
     case WithdrawalGroupStatus.SuspendedRegisteringBank:
     case WithdrawalGroupStatus.SuspendedWaitConfirmBank:
+    case WithdrawalGroupStatus.Done:
+    case WithdrawalGroupStatus.FailedBankAborted:
       // Nothing to do.
       return TaskRunResult.finished();
     default:
@@ -2168,14 +2127,10 @@ async function registerReserveWithBank(
   notifyTransition(ws, transactionId, transitionInfo);
 }
 
-interface BankStatusResult {
-  status: BankStatusResultCode;
-}
-
 async function processReserveBankStatus(
   ws: InternalWalletState,
   withdrawalGroupId: string,
-): Promise<BankStatusResult> {
+): Promise<TaskRunResult> {
   const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
     withdrawalGroupId,
   });
@@ -2188,9 +2143,7 @@ async function processReserveBankStatus(
     case WithdrawalGroupStatus.PendingRegisteringBank:
       break;
     default:
-      return {
-        status: BankStatusResultCode.Done,
-      };
+      return TaskRunResult.backoff();
   }
 
   if (
@@ -2200,9 +2153,7 @@ async function processReserveBankStatus(
   }
   const bankInfo = withdrawalGroup.wgInfo.bankInfo;
   if (!bankInfo) {
-    return {
-      status: BankStatusResultCode.Done,
-    };
+    return TaskRunResult.backoff();
   }
 
   const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri);
@@ -2246,9 +2197,7 @@ async function processReserveBankStatus(
         };
       });
     notifyTransition(ws, transactionId, transitionInfo);
-    return {
-      status: BankStatusResultCode.Aborted,
-    };
+    return TaskRunResult.finished();
   }
 
   // Bank still needs to know our reserve info
@@ -2302,15 +2251,7 @@ async function processReserveBankStatus(
 
   notifyTransition(ws, transactionId, transitionInfo);
 
-  if (status.transfer_done) {
-    return {
-      status: BankStatusResultCode.Done,
-    };
-  } else {
-    return {
-      status: BankStatusResultCode.Waiting,
-    };
-  }
+  return TaskRunResult.backoff();
 }
 
 export interface PrepareCreateWithdrawalGroupResult {
@@ -2492,6 +2433,13 @@ export async function 
internalPerformCreateWithdrawalGroup(
     prep.withdrawalGroup.exchangeBaseUrl,
   );
 
+  const ctx = new WithdrawTransactionContext(
+    ws,
+    withdrawalGroup.withdrawalGroupId,
+  );
+
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
+
   return {
     withdrawalGroup,
     transitionInfo,
@@ -2619,10 +2567,10 @@ export async function acceptWithdrawalFromUri(
   });
 
   const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Withdrawal,
-    withdrawalGroupId,
-  });
+
+  const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+
+  const transactionId = ctx.transactionId;
 
   // We do this here, as the reserve should be registered before we return,
   // so that we can redirect the user to the bank's status page.
@@ -2639,7 +2587,7 @@ export async function acceptWithdrawalFromUri(
     );
   }
 
-  ws.workAvailable.trigger();
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
 
   return {
     reservePub: withdrawalGroup.reservePub,
@@ -2795,10 +2743,9 @@ export async function createManualWithdrawal(
   });
 
   const withdrawalGroupId = withdrawalGroup.withdrawalGroupId;
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Withdrawal,
-    withdrawalGroupId,
-  });
+  const ctx = new WithdrawTransactionContext(ws, withdrawalGroupId);
+
+  const transactionId = ctx.transactionId;
 
   const exchangePaytoUris = await ws.db
     .mktx((x) => [x.withdrawalGroups, x.exchanges, x.exchangeDetails])
@@ -2806,7 +2753,7 @@ export async function createManualWithdrawal(
       return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);
     });
 
-  ws.workAvailable.trigger();
+  ws.taskScheduler.startShepherdTask(ctx.taskId);
 
   return {
     reservePub: withdrawalGroup.reservePub,
diff --git a/packages/taler-wallet-core/src/pending-types.ts 
b/packages/taler-wallet-core/src/pending-types.ts
index f8406033a..4dfad9389 100644
--- a/packages/taler-wallet-core/src/pending-types.ts
+++ b/packages/taler-wallet-core/src/pending-types.ts
@@ -29,7 +29,6 @@ import { DbRetryInfo } from "./operations/common.js";
 
 export enum PendingTaskType {
   ExchangeUpdate = "exchange-update",
-  ExchangeCheckRefresh = "exchange-check-refresh",
   Purchase = "purchase",
   Refresh = "refresh",
   Recoup = "recoup",
@@ -49,7 +48,6 @@ export enum PendingTaskType {
 export type PendingTaskInfo = PendingTaskInfoCommon &
   (
     | PendingExchangeUpdateTask
-    | PendingExchangeCheckRefreshTask
     | PendingPurchaseTask
     | PendingRefreshTask
     | PendingTipPickupTask
@@ -109,14 +107,6 @@ export interface PendingPeerPushCreditTask {
   peerPushCreditId: string;
 }
 
-/**
- * The wallet should check whether coins from this exchange
- * need to be auto-refreshed.
- */
-export interface PendingExchangeCheckRefreshTask {
-  type: PendingTaskType.ExchangeCheckRefresh;
-  exchangeBaseUrl: string;
-}
 
 export enum ReserveType {
   /**
diff --git a/packages/taler-wallet-core/src/shepherd.ts 
b/packages/taler-wallet-core/src/shepherd.ts
new file mode 100644
index 000000000..d1648acc7
--- /dev/null
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -0,0 +1,851 @@
+/*
+ This file is part of GNU Taler
+ (C) 2024 Taler Systems SA
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { GlobalIDB } from "@gnu-taler/idb-bridge";
+import {
+  AbsoluteTime,
+  CancellationToken,
+  Duration,
+  Logger,
+  NotificationType,
+  RetryLoopOpts,
+  TalerError,
+  TalerErrorCode,
+  TalerErrorDetail,
+  TaskThrottler,
+  TransactionIdStr,
+  TransactionType,
+  WalletNotification,
+  assertUnreachable,
+  j2s,
+  makeErrorDetail,
+} from "@gnu-taler/taler-util";
+import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
+import {
+  GetReadOnlyAccess,
+  OPERATION_STATUS_ACTIVE_FIRST,
+  OPERATION_STATUS_ACTIVE_LAST,
+  WalletStoresV1,
+  timestampAbsoluteFromDb,
+} from "./index.js";
+import { InternalWalletState } from "./internal-wallet-state.js";
+import { processBackupForProvider } from "./operations/backup/index.js";
+import {
+  DbRetryInfo,
+  TaskRunResult,
+  TaskRunResultType,
+  constructTaskIdentifier,
+  getExchangeState,
+  parseTaskIdentifier,
+} from "./operations/common.js";
+import { processDepositGroup } from "./operations/deposits.js";
+import { updateExchangeFromUrlHandler } from "./operations/exchanges.js";
+import { processPurchase } from "./operations/pay-merchant.js";
+import { processPeerPullCredit } from "./operations/pay-peer-pull-credit.js";
+import { processPeerPullDebit } from "./operations/pay-peer-pull-debit.js";
+import { processPeerPushCredit } from "./operations/pay-peer-push-credit.js";
+import { processPeerPushDebit } from "./operations/pay-peer-push-debit.js";
+import { processRecoupGroup } from "./operations/recoup.js";
+import { processRefreshGroup } from "./operations/refresh.js";
+import { constructTransactionIdentifier } from "./operations/transactions.js";
+import { processWithdrawalGroup } from "./operations/withdraw.js";
+import { PendingTaskType, TaskId } from "./pending-types.js";
+import { AsyncCondition } from "./util/promiseUtils.js";
+
+const logger = new Logger("shepherd.ts");
+
+/**
+ * Info about one task being shepherded.
+ */
+interface ShepherdInfo {
+  cts: CancellationToken.Source;
+}
+
+export class TaskScheduler {
+  private sheps: Map<TaskId, ShepherdInfo> = new Map();
+
+  private iterCond = new AsyncCondition();
+
+  private throttler = new TaskThrottler();
+
+  constructor(private ws: InternalWalletState) {}
+
+  async loadTasksFromDb(): Promise<void> {
+    const activeTasks = await getActiveTaskIds(this.ws);
+
+    for (const tid of activeTasks.taskIds) {
+      this.startShepherdTask(tid);
+    }
+  }
+
+  async run(opts: RetryLoopOpts = {}): Promise<void> {
+    logger.info("Running task loop.");
+    this.ws.isTaskLoopRunning = true;
+    await this.loadTasksFromDb();
+    while (true) {
+      if (opts.stopWhenDone && this.sheps.size === 0) {
+        logger.info("Breaking out of task loop (no more work).");
+        break;
+      }
+      if (this.ws.stopped) {
+        logger.info("Breaking out of task loop (wallet stopped).");
+        break;
+      }
+      await this.iterCond.wait();
+    }
+    this.ws.isTaskLoopRunning = false;
+    logger.info("Done with task loop.");
+  }
+
+  startShepherdTask(taskId: TaskId): void {
+    // Run in the background, no await!
+    this.internalStartShepherdTask(taskId);
+  }
+
+  /**
+   * Stop and re-load all existing tasks.
+   *
+   * Mostly useful to interrupt all waits when time-travelling.
+   */
+  reload() {
+    const tasksIds = [...this.sheps.keys()];
+    logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
+    for (const taskId of tasksIds) {
+      this.stopShepherdTask(taskId);
+    }
+    for (const taskId of tasksIds) {
+      this.startShepherdTask(taskId);
+    }
+  }
+
+  private async internalStartShepherdTask(taskId: TaskId): Promise<void> {
+    logger.trace(`Starting to shepherd task ${taskId}`);
+    const oldShep = this.sheps.get(taskId);
+    if (oldShep) {
+      logger.trace(`Already have a shepherd for ${taskId}`);
+      return;
+    }
+    logger.trace(`Creating new shepherd for ${taskId}`);
+    const newShep: ShepherdInfo = {
+      cts: CancellationToken.create(),
+    };
+    this.sheps.set(taskId, newShep);
+    try {
+      await this.internalShepherdTask(taskId, newShep);
+    } finally {
+      logger.trace(`Done shepherding ${taskId}`);
+      this.sheps.delete(taskId);
+      this.iterCond.trigger();
+    }
+  }
+
+  stopShepherdTask(taskId: TaskId): void {
+    logger.trace(`Stopping shepherding of ${taskId}`);
+    const oldShep = this.sheps.get(taskId);
+    if (oldShep) {
+      logger.trace(`Cancelling old shepherd for ${taskId}`);
+      oldShep.cts.cancel();
+      this.sheps.delete(taskId);
+      this.iterCond.trigger();
+    }
+  }
+
+  restartShepherdTask(taskId: TaskId): void {
+    this.stopShepherdTask(taskId);
+    this.startShepherdTask(taskId);
+  }
+
+  async resetTaskRetries(taskId: TaskId): Promise<void> {
+    const maybeNotification = await this.ws.db
+      .mktxAll()
+      .runReadWrite(async (tx) => {
+        await tx.operationRetries.delete(taskId);
+        return taskToRetryNotification(this.ws, tx, taskId, undefined);
+      });
+    this.stopShepherdTask(taskId);
+    if (maybeNotification) {
+      this.ws.notify(maybeNotification);
+    }
+    this.startShepherdTask(taskId);
+  }
+
+  private async internalShepherdTask(
+    taskId: TaskId,
+    info: ShepherdInfo,
+  ): Promise<void> {
+    while (true) {
+      if (this.ws.stopped) {
+        logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
+        return;
+      }
+      if (info.cts.token.isCancelled) {
+        logger.trace(`Shepherd for ${taskId} got cancelled`);
+        return;
+      }
+      const isThrottled = this.throttler.applyThrottle(taskId);
+      if (isThrottled) {
+        logger.warn(
+          `task ${taskId} throttled, this is very likely a bug in wallet-core, 
please report`,
+        );
+        logger.warn("waiting for 60 seconds");
+        await this.ws.timerGroup.resolveAfter(
+          Duration.fromSpec({ seconds: 60 }),
+        );
+      }
+      logger.trace(`Shepherd for ${taskId} will call handler`);
+      // FIXME: This should already return the retry record.
+      const res = await runTaskWithErrorReporting(this.ws, taskId, async () => 
{
+        return await callOperationHandlerForTaskId(
+          this.ws,
+          taskId,
+          info.cts.token,
+        );
+      });
+      const retryRecord = await this.ws.db.runReadOnlyTx(
+        ["operationRetries"],
+        async (tx) => {
+          return tx.operationRetries.get(taskId);
+        },
+      );
+      switch (res.type) {
+        case TaskRunResultType.Error: {
+          logger.trace(`Shepherd for ${taskId} got error result.`);
+          if (retryRecord) {
+            let delay: Duration;
+            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+            delay = AbsoluteTime.remaining(t);
+            logger.trace(`Waiting for ${delay.d_ms} ms`);
+            try {
+              await info.cts.token.racePromise(
+                this.ws.timerGroup.resolveAfter(delay),
+              );
+            } catch (e) {
+              logger.info(`waiting for ${taskId} interrupted`);
+            }
+          } else {
+            logger.trace("Retrying immediately.");
+          }
+          break;
+        }
+        case TaskRunResultType.Backoff: {
+          logger.trace(`Shepherd for ${taskId} got backoff result.`);
+          if (retryRecord) {
+            let delay: Duration;
+            const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
+            delay = AbsoluteTime.remaining(t);
+            logger.trace(`Waiting for ${delay.d_ms} ms`);
+            try {
+              await info.cts.token.racePromise(
+                this.ws.timerGroup.resolveAfter(delay),
+              );
+            } catch (e) {
+              logger.info(`waiting for ${taskId} interrupted`);
+            }
+          } else {
+            logger.trace("Retrying immediately.");
+          }
+          break;
+        }
+        case TaskRunResultType.Progress: {
+          logger.trace(
+            `Shepherd for ${taskId} got progress result, re-running 
immediately.`,
+          );
+          break;
+        }
+        case TaskRunResultType.ScheduleLater:
+          logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
+          const delay = AbsoluteTime.remaining(res.runAt);
+          logger.trace(`Waiting for ${delay.d_ms} ms`);
+          try {
+            await info.cts.token.racePromise(
+              this.ws.timerGroup.resolveAfter(delay),
+            );
+          } catch (e) {
+            logger.info(`waiting for ${taskId} interrupted`);
+          }
+          break;
+        case TaskRunResultType.Finished:
+          logger.trace(`Shepherd for ${taskId} got finished result.`);
+          return;
+        default:
+          assertUnreachable(res);
+      }
+    }
+  }
+}
+
+async function storePendingTaskError(
+  ws: InternalWalletState,
+  pendingTaskId: string,
+  e: TalerErrorDetail,
+): Promise<void> {
+  logger.info(`storing pending task error for ${pendingTaskId}`);
+  const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+    let retryRecord = await tx.operationRetries.get(pendingTaskId);
+    if (!retryRecord) {
+      retryRecord = {
+        id: pendingTaskId,
+        lastError: e,
+        retryInfo: DbRetryInfo.reset(),
+      };
+    } else {
+      retryRecord.lastError = e;
+      retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+    }
+    await tx.operationRetries.put(retryRecord);
+    return taskToRetryNotification(ws, tx, pendingTaskId, e);
+  });
+  if (maybeNotification) {
+    ws.notify(maybeNotification);
+  }
+}
+
+/**
+ * Task made progress, clear error.
+ */
+async function storeTaskProgress(
+  ws: InternalWalletState,
+  pendingTaskId: string,
+): Promise<void> {
+  await ws.db.mktxAll().runReadWrite(async (tx) => {
+    await tx.operationRetries.delete(pendingTaskId);
+  });
+}
+
+async function storePendingTaskPending(
+  ws: InternalWalletState,
+  pendingTaskId: string,
+): Promise<void> {
+  const maybeNotification = await ws.db.mktxAll().runReadWrite(async (tx) => {
+    let retryRecord = await tx.operationRetries.get(pendingTaskId);
+    let hadError = false;
+    if (!retryRecord) {
+      retryRecord = {
+        id: pendingTaskId,
+        retryInfo: DbRetryInfo.reset(),
+      };
+    } else {
+      if (retryRecord.lastError) {
+        hadError = true;
+      }
+      delete retryRecord.lastError;
+      retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
+    }
+    await tx.operationRetries.put(retryRecord);
+    if (hadError) {
+      return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
+    } else {
+      return undefined;
+    }
+  });
+  if (maybeNotification) {
+    ws.notify(maybeNotification);
+  }
+}
+
+async function storePendingTaskFinished(
+  ws: InternalWalletState,
+  pendingTaskId: string,
+): Promise<void> {
+  await ws.db
+    .mktx((x) => [x.operationRetries])
+    .runReadWrite(async (tx) => {
+      await tx.operationRetries.delete(pendingTaskId);
+    });
+}
+
+async function runTaskWithErrorReporting(
+  ws: InternalWalletState,
+  opId: TaskId,
+  f: () => Promise<TaskRunResult>,
+): Promise<TaskRunResult> {
+  let maybeError: TalerErrorDetail | undefined;
+  try {
+    const resp = await f();
+    switch (resp.type) {
+      case TaskRunResultType.Error:
+        await storePendingTaskError(ws, opId, resp.errorDetail);
+        return resp;
+      case TaskRunResultType.Finished:
+        await storePendingTaskFinished(ws, opId);
+        return resp;
+      case TaskRunResultType.Backoff:
+        await storePendingTaskPending(ws, opId);
+        return resp;
+      case TaskRunResultType.ScheduleLater:
+        // Task succeeded but wants to be run again.
+        await storeTaskProgress(ws, opId);
+        return resp;
+      case TaskRunResultType.Progress:
+        await storeTaskProgress(ws, opId);
+        return resp;
+    }
+  } catch (e) {
+    if (e instanceof CryptoApiStoppedError) {
+      if (ws.stopped) {
+        logger.warn("crypto API stopped during shutdown, ignoring error");
+        return {
+          type: TaskRunResultType.Error,
+          errorDetail: makeErrorDetail(
+            TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+            {},
+            "Crypto API stopped during shutdown",
+          ),
+        };
+      }
+    }
+    if (e instanceof TalerError) {
+      logger.warn("operation processed resulted in error");
+      logger.warn(`error was: ${j2s(e.errorDetail)}`);
+      maybeError = e.errorDetail;
+      await storePendingTaskError(ws, opId, maybeError!);
+      return {
+        type: TaskRunResultType.Error,
+        errorDetail: e.errorDetail,
+      };
+    } else if (e instanceof Error) {
+      // This is a bug, as we expect pending operations to always
+      // do their own error handling and only throw 
WALLET_PENDING_OPERATION_FAILED
+      // or return something.
+      logger.error(`Uncaught exception: ${e.message}`);
+      logger.error(`Stack: ${e.stack}`);
+      maybeError = makeErrorDetail(
+        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+        {
+          stack: e.stack,
+        },
+        `unexpected exception (message: ${e.message})`,
+      );
+      await storePendingTaskError(ws, opId, maybeError);
+      return {
+        type: TaskRunResultType.Error,
+        errorDetail: maybeError,
+      };
+    } else {
+      logger.error("Uncaught exception, value is not even an error.");
+      maybeError = makeErrorDetail(
+        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+        {},
+        `unexpected exception (not even an error)`,
+      );
+      await storePendingTaskError(ws, opId, maybeError);
+      return {
+        type: TaskRunResultType.Error,
+        errorDetail: maybeError,
+      };
+    }
+  }
+}
+
+async function callOperationHandlerForTaskId(
+  ws: InternalWalletState,
+  taskId: TaskId,
+  cancellationToken: CancellationToken,
+): Promise<TaskRunResult> {
+  const pending = parseTaskIdentifier(taskId);
+  switch (pending.tag) {
+    case PendingTaskType.ExchangeUpdate:
+      return await updateExchangeFromUrlHandler(
+        ws,
+        pending.exchangeBaseUrl,
+        cancellationToken,
+      );
+    case PendingTaskType.Refresh:
+      return await processRefreshGroup(
+        ws,
+        pending.refreshGroupId,
+        cancellationToken,
+      );
+    case PendingTaskType.Withdraw:
+      return await processWithdrawalGroup(
+        ws,
+        pending.withdrawalGroupId,
+        cancellationToken,
+      );
+    case PendingTaskType.Purchase:
+      return await processPurchase(ws, pending.proposalId);
+    case PendingTaskType.Recoup:
+      return await processRecoupGroup(ws, pending.recoupGroupId);
+    case PendingTaskType.Deposit:
+      return await processDepositGroup(
+        ws,
+        pending.depositGroupId,
+        cancellationToken,
+      );
+    case PendingTaskType.Backup:
+      return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
+    case PendingTaskType.PeerPushDebit:
+      return await processPeerPushDebit(
+        ws,
+        pending.pursePub,
+        cancellationToken,
+      );
+    case PendingTaskType.PeerPullCredit:
+      return await processPeerPullCredit(
+        ws,
+        pending.pursePub,
+        cancellationToken,
+      );
+    case PendingTaskType.PeerPullDebit:
+      return await processPeerPullDebit(ws, pending.peerPullDebitId);
+    case PendingTaskType.PeerPushCredit:
+      return await processPeerPushCredit(
+        ws,
+        pending.peerPushCreditId,
+        cancellationToken,
+      );
+    case PendingTaskType.RewardPickup:
+      throw Error("not supported anymore");
+    default:
+      return assertUnreachable(pending);
+  }
+  throw Error(`not reached ${pending.tag}`);
+}
+
+/**
+ * Generate an appropriate error transition notification
+ * for applicable tasks.
+ *
+ * Namely, transition notifications are generated for:
+ * - exchange update errors
+ * - transactions
+ */
+async function taskToRetryNotification(
+  ws: InternalWalletState,
+  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+  pendingTaskId: string,
+  e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+  const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+
+  switch (parsedTaskId.tag) {
+    case PendingTaskType.ExchangeUpdate:
+      return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
+    case PendingTaskType.PeerPullCredit:
+    case PendingTaskType.PeerPullDebit:
+    case PendingTaskType.Withdraw:
+    case PendingTaskType.PeerPushCredit:
+    case PendingTaskType.Deposit:
+    case PendingTaskType.Refresh:
+    case PendingTaskType.RewardPickup:
+    case PendingTaskType.PeerPushDebit:
+    case PendingTaskType.Purchase:
+      return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
+    case PendingTaskType.Backup:
+    case PendingTaskType.Recoup:
+      return undefined;
+  }
+}
+
+async function makeTransactionRetryNotification(
+  ws: InternalWalletState,
+  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+  pendingTaskId: string,
+  e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+  const txId = convertTaskToTransactionId(pendingTaskId);
+  if (!txId) {
+    return undefined;
+  }
+  const txState = await ws.getTransactionState(ws, tx, txId);
+  if (!txState) {
+    return undefined;
+  }
+  const notif: WalletNotification = {
+    type: NotificationType.TransactionStateTransition,
+    transactionId: txId,
+    oldTxState: txState,
+    newTxState: txState,
+  };
+  if (e) {
+    notif.errorInfo = {
+      code: e.code as number,
+      hint: e.hint,
+    };
+  }
+  return notif;
+}
+
+async function makeExchangeRetryNotification(
+  ws: InternalWalletState,
+  tx: GetReadOnlyAccess<typeof WalletStoresV1>,
+  pendingTaskId: string,
+  e: TalerErrorDetail | undefined,
+): Promise<WalletNotification | undefined> {
+  logger.info("making exchange retry notification");
+  const parsedTaskId = parseTaskIdentifier(pendingTaskId);
+  if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
+    throw Error("invalid task identifier");
+  }
+  const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
+
+  if (!rec) {
+    logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
+    return undefined;
+  }
+
+  const notif: WalletNotification = {
+    type: NotificationType.ExchangeStateTransition,
+    exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
+    oldExchangeState: getExchangeState(rec),
+    newExchangeState: getExchangeState(rec),
+  };
+  if (e) {
+    notif.errorInfo = {
+      code: e.code as number,
+      hint: e.hint,
+    };
+  }
+  return notif;
+}
+
+/**
+ * Convert the task ID for a task that processes a transaction int
+ * the ID for the transaction.
+ */
+function convertTaskToTransactionId(
+  taskId: string,
+): TransactionIdStr | undefined {
+  const parsedTaskId = parseTaskIdentifier(taskId);
+  switch (parsedTaskId.tag) {
+    case PendingTaskType.PeerPullCredit:
+      return constructTransactionIdentifier({
+        tag: TransactionType.PeerPullCredit,
+        pursePub: parsedTaskId.pursePub,
+      });
+    case PendingTaskType.PeerPullDebit:
+      return constructTransactionIdentifier({
+        tag: TransactionType.PeerPullDebit,
+        peerPullDebitId: parsedTaskId.peerPullDebitId,
+      });
+    // FIXME: This doesn't distinguish internal-withdrawal.
+    // Maybe we should have a different task type for that as well?
+    // Or maybe transaction IDs should be valid task identifiers?
+    case PendingTaskType.Withdraw:
+      return constructTransactionIdentifier({
+        tag: TransactionType.Withdrawal,
+        withdrawalGroupId: parsedTaskId.withdrawalGroupId,
+      });
+    case PendingTaskType.PeerPushCredit:
+      return constructTransactionIdentifier({
+        tag: TransactionType.PeerPushCredit,
+        peerPushCreditId: parsedTaskId.peerPushCreditId,
+      });
+    case PendingTaskType.Deposit:
+      return constructTransactionIdentifier({
+        tag: TransactionType.Deposit,
+        depositGroupId: parsedTaskId.depositGroupId,
+      });
+    case PendingTaskType.Refresh:
+      return constructTransactionIdentifier({
+        tag: TransactionType.Refresh,
+        refreshGroupId: parsedTaskId.refreshGroupId,
+      });
+    case PendingTaskType.RewardPickup:
+      return constructTransactionIdentifier({
+        tag: TransactionType.Reward,
+        walletRewardId: parsedTaskId.walletRewardId,
+      });
+    case PendingTaskType.PeerPushDebit:
+      return constructTransactionIdentifier({
+        tag: TransactionType.PeerPushDebit,
+        pursePub: parsedTaskId.pursePub,
+      });
+    case PendingTaskType.Purchase:
+      return constructTransactionIdentifier({
+        tag: TransactionType.Payment,
+        proposalId: parsedTaskId.proposalId,
+      });
+    default:
+      return undefined;
+  }
+}
+
+export interface ActiveTaskIdsResult {
+  taskIds: TaskId[];
+}
+
+export async function getActiveTaskIds(
+  ws: InternalWalletState,
+): Promise<ActiveTaskIdsResult> {
+  const res: ActiveTaskIdsResult = {
+    taskIds: [],
+  };
+  await ws.db
+    .mktx((x) => [
+      x.exchanges,
+      x.refreshGroups,
+      x.withdrawalGroups,
+      x.purchases,
+      x.depositGroups,
+      x.recoupGroups,
+      x.peerPullCredit,
+      x.peerPushDebit,
+      x.peerPullDebit,
+      x.peerPushCredit,
+    ])
+    .runReadWrite(async (tx) => {
+      const active = GlobalIDB.KeyRange.bound(
+        OPERATION_STATUS_ACTIVE_FIRST,
+        OPERATION_STATUS_ACTIVE_LAST,
+      );
+
+      // Withdrawals
+
+      {
+        const activeRecs =
+          await tx.withdrawalGroups.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.Withdraw,
+            withdrawalGroupId: rec.withdrawalGroupId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // Deposits
+
+      {
+        const activeRecs =
+          await tx.depositGroups.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.Deposit,
+            depositGroupId: rec.depositGroupId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // Refreshes
+
+      {
+        const activeRecs =
+          await tx.refreshGroups.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.Refresh,
+            refreshGroupId: rec.refreshGroupId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // Purchases
+
+      {
+        const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.Purchase,
+            proposalId: rec.proposalId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // peer-push-debit
+
+      {
+        const activeRecs =
+          await tx.peerPushDebit.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.PeerPushDebit,
+            pursePub: rec.pursePub,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // peer-push-credit
+
+      {
+        const activeRecs =
+          await tx.peerPushCredit.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.PeerPushCredit,
+            peerPushCreditId: rec.peerPushCreditId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // peer-pull-debit
+
+      {
+        const activeRecs =
+          await tx.peerPullDebit.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.PeerPullDebit,
+            peerPullDebitId: rec.peerPullDebitId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // peer-pull-credit
+
+      {
+        const activeRecs =
+          await tx.peerPullCredit.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.PeerPullCredit,
+            pursePub: rec.pursePub,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // recoup
+
+      {
+        const activeRecs =
+          await tx.recoupGroups.indexes.byStatus.getAll(active);
+        for (const rec of activeRecs) {
+          const taskId = constructTaskIdentifier({
+            tag: PendingTaskType.Recoup,
+            recoupGroupId: rec.recoupGroupId,
+          });
+          res.taskIds.push(taskId);
+        }
+      }
+
+      // exchange update
+
+      {
+        const exchanges = await tx.exchanges.getAll();
+        for (const rec of exchanges) {
+          const taskIdUpdate = constructTaskIdentifier({
+            tag: PendingTaskType.ExchangeUpdate,
+            exchangeBaseUrl: rec.baseUrl,
+          });
+          res.taskIds.push(taskIdUpdate);
+        }
+      }
+
+      // FIXME: Recoup!
+    });
+
+  return res;
+}
diff --git a/packages/taler-wallet-core/src/util/coinSelection.ts 
b/packages/taler-wallet-core/src/util/coinSelection.ts
index e06d7454b..be868867d 100644
--- a/packages/taler-wallet-core/src/util/coinSelection.ts
+++ b/packages/taler-wallet-core/src/util/coinSelection.ts
@@ -56,10 +56,8 @@ import {
 } from "@gnu-taler/taler-util";
 import { DenominationRecord } from "../db.js";
 import {
-  getAutoRefreshExecuteThreshold,
   getExchangeWireDetailsInTx,
   isWithdrawableDenom,
-  WalletDbReadOnlyTransaction,
   WalletDbReadOnlyTransactionArr,
 } from "../index.js";
 import { InternalWalletState } from "../internal-wallet-state.js";
@@ -67,6 +65,7 @@ import {
   getMerchantPaymentBalanceDetails,
   getPeerPaymentBalanceDetailsInTx,
 } from "../operations/balance.js";
+import { getAutoRefreshExecuteThreshold } from "../operations/common.js";
 import { checkDbInvariant, checkLogicInvariant } from "./invariants.js";
 
 const logger = new Logger("coinSelection.ts");
diff --git a/packages/taler-wallet-core/src/util/promiseUtils.ts 
b/packages/taler-wallet-core/src/util/promiseUtils.ts
index d152a12f4..bc1e40260 100644
--- a/packages/taler-wallet-core/src/util/promiseUtils.ts
+++ b/packages/taler-wallet-core/src/util/promiseUtils.ts
@@ -70,3 +70,43 @@ export class AsyncCondition {
     this.promCap = undefined;
   }
 }
+
+/**
+ * Flag that can be raised to notify asynchronous waiters.
+ *
+ * You can think of it as a promise that can
+ * be un-resolved.
+ */
+export class AsyncFlag {
+  private promCap?: OpenedPromise<void> = undefined;
+  private internalFlagRaised: boolean = false;
+
+  constructor() {}
+
+  /**
+   * Wait until the flag is raised.
+   *
+   * Reset if before returning.
+   */
+  wait(): Promise<void> {
+    if (this.internalFlagRaised) {
+      return Promise.resolve();
+    }
+    if (!this.promCap) {
+      this.promCap = openPromise<void>();
+    }
+    return this.promCap.promise;
+  }
+
+  raise(): void {
+    this.internalFlagRaised = true;
+    if (this.promCap) {
+      this.promCap.resolve();
+    }
+  }
+
+  reset(): void {
+    this.internalFlagRaised = false;
+    this.promCap = undefined;
+  }
+}
diff --git a/packages/taler-wallet-core/src/util/query.ts 
b/packages/taler-wallet-core/src/util/query.ts
index d96a03c61..5fba61f11 100644
--- a/packages/taler-wallet-core/src/util/query.ts
+++ b/packages/taler-wallet-core/src/util/query.ts
@@ -919,7 +919,7 @@ export class DbAccess<StoreMap> {
       strStoreNames.push(swi.storeName);
       accessibleStores[swi.storeName] = swi;
     }
-    const tx = this.db.transaction(strStoreNames, "readwrite");
+    const tx = this.db.transaction(strStoreNames, "readonly");
     const readContext = makeReadContext(tx, accessibleStores);
     return runTx(tx, readContext, txf);
   }
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts 
b/packages/taler-wallet-core/src/wallet-api-types.ts
index cdde2ee62..190558e14 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -29,8 +29,6 @@ import {
   AcceptExchangeTosRequest,
   AcceptManualWithdrawalRequest,
   AcceptManualWithdrawalResult,
-  AcceptRewardRequest,
-  AcceptTipResponse,
   AcceptWithdrawalResponse,
   AddExchangeRequest,
   AddGlobalCurrencyAuditorRequest,
@@ -102,8 +100,6 @@ import {
   PreparePeerPushCreditRequest,
   PreparePeerPushCreditResponse,
   PrepareRefundRequest,
-  PrepareRewardRequest,
-  PrepareTipResult as PrepareRewardResult,
   PrepareWithdrawExchangeRequest,
   PrepareWithdrawExchangeResponse,
   RecoverStoredBackupRequest,
@@ -242,7 +238,6 @@ export enum WalletApiOperation {
   DeleteStoredBackup = "deleteStoredBackup",
   RecoverStoredBackup = "recoverStoredBackup",
   UpdateExchangeEntry = "updateExchangeEntry",
-  TestingWaitTasksProcessed = "testingWaitTasksProcessed",
   ListExchangesForScopedCurrency = "listExchangesForScopedCurrency",
   PrepareWithdrawExchange = "prepareWithdrawExchange",
   TestingInfiniteTransactionLoop = "testingInfiniteTransactionLoop",
@@ -1124,15 +1119,6 @@ export type TestingWaitRefreshesFinalOp = {
   response: EmptyObject;
 };
 
-/**
- * Wait until all tasks have been processed and the wallet is idle.
- */
-export type TestingWaitTasksProcessedOp = {
-  op: WalletApiOperation.TestingWaitTasksProcessed;
-  request: EmptyObject;
-  response: EmptyObject;
-};
-
 /**
  * Wait until a transaction is in a particular state.
  */
@@ -1245,7 +1231,6 @@ export type WalletOperations = {
   [WalletApiOperation.ValidateIban]: ValidateIbanOp;
   [WalletApiOperation.TestingWaitTransactionsFinal]: 
TestingWaitTransactionsFinalOp;
   [WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp;
-  [WalletApiOperation.TestingWaitTasksProcessed]: TestingWaitTasksProcessedOp;
   [WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp;
   [WalletApiOperation.TestingWaitTransactionState]: 
TestingWaitTransactionStateOp;
   [WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp;
diff --git a/packages/taler-wallet-core/src/wallet.ts 
b/packages/taler-wallet-core/src/wallet.ts
index 42aa8cdfc..0246597be 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -24,7 +24,6 @@
  */
 import { IDBFactory } from "@gnu-taler/idb-bridge";
 import {
-  AbsoluteTime,
   AmountString,
   Amounts,
   CoinDumpJson,
@@ -33,7 +32,6 @@ import {
   CreateStoredBackupResponse,
   DeleteStoredBackupRequest,
   DenominationInfo,
-  Duration,
   ExchangesShortListResponse,
   GetCurrencySpecificationResponse,
   InitResponse,
@@ -42,15 +40,14 @@ import {
   ListGlobalCurrencyAuditorsResponse,
   ListGlobalCurrencyExchangesResponse,
   Logger,
-  NotificationType,
   PrepareWithdrawExchangeRequest,
   PrepareWithdrawExchangeResponse,
   RecoverStoredBackupRequest,
+  RetryLoopOpts,
   StoredBackupList,
   TalerError,
   TalerErrorCode,
   TalerUriAction,
-  TaskThrottler,
   TestingWaitTransactionRequest,
   TransactionState,
   TransactionType,
@@ -123,8 +120,6 @@ import {
   codecForUserAttentionsRequest,
   codecForValidateIbanRequest,
   codecForWithdrawTestBalance,
-  durationFromSpec,
-  durationMin,
   getErrorDetailFromException,
   j2s,
   parsePaytoUri,
@@ -152,7 +147,6 @@ import {
 } from "./db.js";
 import { DevExperimentHttpLib, applyDevExperiment } from 
"./dev-experiments.js";
 import {
-  ActiveLongpollInfo,
   CancelFn,
   InternalWalletState,
   MerchantInfo,
@@ -172,23 +166,16 @@ import {
   getBackupInfo,
   getBackupRecovery,
   loadBackupRecovery,
-  processBackupForProvider,
   removeBackupProvider,
   runBackupCycle,
   setWalletDeviceId,
 } from "./operations/backup/index.js";
 import { getBalanceDetail, getBalances } from "./operations/balance.js";
-import {
-  TaskRunResult,
-  TaskRunResultType,
-  runTaskWithErrorReporting,
-} from "./operations/common.js";
 import {
   computeDepositTransactionStatus,
   createDepositGroup,
   generateDepositGroupTxId,
   prepareDepositGroup,
-  processDepositGroup,
 } from "./operations/deposits.js";
 import {
   acceptExchangeTermsOfService,
@@ -200,7 +187,6 @@ import {
   getExchangeTos,
   listExchanges,
   lookupExchangeByUri,
-  updateExchangeFromUrlHandler,
 } from "./operations/exchanges.js";
 import {
   computePayMerchantTransactionState,
@@ -209,7 +195,6 @@ import {
   getContractTermsDetails,
   preparePayForTemplate,
   preparePayForUri,
-  processPurchase,
   sharePayment,
   startQueryRefund,
   startRefundQueryForUri,
@@ -218,38 +203,28 @@ import {
   checkPeerPullPaymentInitiation,
   computePeerPullCreditTransactionState,
   initiatePeerPullPayment,
-  processPeerPullCredit,
 } from "./operations/pay-peer-pull-credit.js";
 import {
   computePeerPullDebitTransactionState,
   confirmPeerPullDebit,
   preparePeerPullDebit,
-  processPeerPullDebit,
 } from "./operations/pay-peer-pull-debit.js";
 import {
   computePeerPushCreditTransactionState,
   confirmPeerPushCredit,
   preparePeerPushCredit,
-  processPeerPushCredit,
 } from "./operations/pay-peer-push-credit.js";
 import {
   checkPeerPushDebit,
   computePeerPushDebitTransactionState,
   initiatePeerPushDebit,
-  processPeerPushDebit,
 } from "./operations/pay-peer-push-debit.js";
-import { getPendingOperations } from "./operations/pending.js";
-import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
+import { createRecoupGroup } from "./operations/recoup.js";
 import {
-  autoRefresh,
   computeRefreshTransactionState,
   forceRefresh,
-  processRefreshGroup,
 } from "./operations/refresh.js";
-import {
-  computeRewardTransactionStatus,
-  processTip,
-} from "./operations/reward.js";
+import { computeRewardTransactionStatus } from "./operations/reward.js";
 import {
   runIntegrationTest,
   runIntegrationTest2,
@@ -257,7 +232,6 @@ import {
   waitTransactionState,
   waitUntilAllTransactionsFinal,
   waitUntilRefreshesDone,
-  waitUntilTasksProcessed,
   withdrawTestBalance,
 } from "./operations/testing.js";
 import {
@@ -279,9 +253,9 @@ import {
   createManualWithdrawal,
   getExchangeWithdrawalInfo,
   getWithdrawalDetailsForUri,
-  processWithdrawalGroup,
 } from "./operations/withdraw.js";
-import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
+import { PendingOperationsResponse } from "./pending-types.js";
+import { TaskScheduler } from "./shepherd.js";
 import { assertUnreachable } from "./util/assertUnreachable.js";
 import {
   convertDepositAmount,
@@ -320,184 +294,11 @@ import {
 
 const logger = new Logger("wallet.ts");
 
-/**
- * Call the right handler for a pending operation without doing
- * any special error handling.
- */
-async function callOperationHandler(
-  ws: InternalWalletState,
-  pending: PendingTaskInfo,
-): Promise<TaskRunResult> {
-  switch (pending.type) {
-    case PendingTaskType.ExchangeUpdate:
-      return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
-    case PendingTaskType.Refresh:
-      return await processRefreshGroup(ws, pending.refreshGroupId);
-    case PendingTaskType.Withdraw:
-      return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
-    case PendingTaskType.RewardPickup:
-      return await processTip(ws, pending.tipId);
-    case PendingTaskType.Purchase:
-      return await processPurchase(ws, pending.proposalId);
-    case PendingTaskType.Recoup:
-      return await processRecoupGroup(ws, pending.recoupGroupId);
-    case PendingTaskType.ExchangeCheckRefresh:
-      return await autoRefresh(ws, pending.exchangeBaseUrl);
-    case PendingTaskType.Deposit:
-      return await processDepositGroup(ws, pending.depositGroupId);
-    case PendingTaskType.Backup:
-      return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
-    case PendingTaskType.PeerPushDebit:
-      return await processPeerPushDebit(ws, pending.pursePub);
-    case PendingTaskType.PeerPullCredit:
-      return await processPeerPullCredit(ws, pending.pursePub);
-    case PendingTaskType.PeerPullDebit:
-      return await processPeerPullDebit(ws, pending.peerPullDebitId);
-    case PendingTaskType.PeerPushCredit:
-      return await processPeerPushCredit(ws, pending.peerPushCreditId);
-    default:
-      return assertUnreachable(pending);
-  }
-  throw Error(`not reached ${pending.type}`);
-}
-
-/**
- * Process pending operations.
- */
-export async function runPending(ws: InternalWalletState): Promise<void> {
-  const pendingOpsResponse = await getPendingOperations(ws);
-  for (const p of pendingOpsResponse.pendingOperations) {
-    if (!AbsoluteTime.isExpired(p.timestampDue)) {
-      continue;
-    }
-    await runTaskWithErrorReporting(ws, p.id, async () => {
-      logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
-      return await callOperationHandler(ws, p);
-    });
-  }
-}
-
-export interface RetryLoopOpts {
-  /**
-   * Stop the retry loop when all lifeness-giving pending operations
-   * are done.
-   *
-   * Defaults to false.
-   */
-  stopWhenDone?: boolean;
-}
-
-/**
- * Main retry loop of the wallet.
- *
- * Looks up pending operations from the wallet, runs them, repeat.
- */
 async function runTaskLoop(
   ws: InternalWalletState,
   opts: RetryLoopOpts = {},
 ): Promise<void> {
-  logger.trace(`running task loop opts=${j2s(opts)}`);
-  if (ws.isTaskLoopRunning) {
-    logger.warn(
-      "task loop already running, nesting the wallet-core task loop is 
deprecated and should be avoided",
-    );
-  }
-  const throttler = new TaskThrottler();
-  ws.isTaskLoopRunning = true;
-  for (let iteration = 0; !ws.stopped; iteration++) {
-    const pending = await getPendingOperations(ws);
-    logger.trace(`pending operations: ${j2s(pending)}`);
-    let numGivingLiveness = 0;
-    let numDue = 0;
-    let numThrottled = 0;
-    let minDue: AbsoluteTime = AbsoluteTime.never();
-
-    for (const p of pending.pendingOperations) {
-      if (p.givesLifeness) {
-        numGivingLiveness++;
-      }
-      if (!p.isDue) {
-        continue;
-      }
-      numDue++;
-
-      const isThrottled = throttler.applyThrottle(p.id);
-
-      if (isThrottled) {
-        logger.warn(
-          `task ${p.id} throttled, this is very likely a bug in wallet-core, 
please report`,
-        );
-        numDue--;
-        numThrottled++;
-      } else {
-        minDue = AbsoluteTime.min(minDue, p.timestampDue);
-      }
-    }
-
-    logger.trace(
-      `running task loop, iter=${iteration}, 
#tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, 
#due=${numDue} #throttled=${numThrottled}`,
-    );
-
-    if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
-      logger.warn(`stopping, as no pending operations have lifeness`);
-      ws.isTaskLoopRunning = false;
-      return;
-    }
-
-    if (ws.stopped) {
-      ws.isTaskLoopRunning = false;
-      return;
-    }
-
-    // Make sure that we run tasks that don't give lifeness at least
-    // one time.
-    if (iteration !== 0 && numDue === 0) {
-      // We've executed pending, due operations at least one.
-      // Now we don't have any more operations available,
-      // and need to wait.
-
-      // Wait for at most 5 seconds to the next check.
-      const dt = durationMin(
-        durationFromSpec({
-          seconds: 5,
-        }),
-        Duration.getRemaining(minDue),
-      );
-      logger.trace(`waiting for at most ${dt.d_ms} ms`);
-      const timeout = ws.timerGroup.resolveAfter(dt);
-      // Wait until either the timeout, or we are notified (via the latch)
-      // that more work might be available.
-      await Promise.race([timeout, ws.workAvailable.wait()]);
-      logger.trace(`done waiting for available work`);
-    } else {
-      logger.trace(
-        `running ${pending.pendingOperations.length} pending operations`,
-      );
-      for (const p of pending.pendingOperations) {
-        if (!AbsoluteTime.isExpired(p.timestampDue)) {
-          continue;
-        }
-        logger.trace(`running task ${p.id}`);
-        const res = await runTaskWithErrorReporting(ws, p.id, async () => {
-          return await callOperationHandler(ws, p);
-        });
-        if (!(ws.stopped && res.type === TaskRunResultType.Error)) {
-          ws.notify({
-            type: NotificationType.PendingOperationProcessed,
-            id: p.id,
-            taskResultType: res.type,
-          });
-        }
-        if (ws.stopped) {
-          ws.isTaskLoopRunning = false;
-          return;
-        }
-      }
-    }
-  }
-  logger.trace("exiting wallet task loop");
-  ws.isTaskLoopRunning = false;
-  return;
+  await ws.taskScheduler.run(opts);
 }
 
 /**
@@ -1035,7 +836,10 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
       return await getUserAttentionsUnreadCount(ws, req);
     }
     case WalletApiOperation.GetPendingOperations: {
-      return await getPendingOperations(ws);
+      // FIXME: Eventually remove the handler after deprecation period.
+      return {
+        pendingOperations: [],
+      } satisfies PendingOperationsResponse;
     }
     case WalletApiOperation.SetExchangeTosAccepted: {
       const req = codecForAcceptExchangeTosRequest().decode(payload);
@@ -1066,8 +870,7 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
       return getContractTermsDetails(ws, req.proposalId);
     }
     case WalletApiOperation.RetryPendingNow: {
-      // FIXME: Should we reset all operation retries here?
-      await runPending(ws);
+      logger.error("retryPendingNow currently not implemented");
       return {};
     }
     case WalletApiOperation.SharePayment: {
@@ -1175,10 +978,6 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
       await waitTransactionState(ws, req.transactionId, req.txState);
       return {};
     }
-    case WalletApiOperation.TestingWaitTasksProcessed: {
-      await waitUntilTasksProcessed(ws);
-      return {};
-    }
     case WalletApiOperation.GetCurrencySpecification: {
       // Ignore result, just validate in this mock implementation
       const req = codecForGetCurrencyInfoRequest().decode(payload);
@@ -1451,7 +1250,7 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
     case WalletApiOperation.TestingSetTimetravel: {
       const req = codecForTestingSetTimetravelRequest().decode(payload);
       setDangerousTimetravel(req.offsetMs);
-      ws.workAvailable.trigger();
+      ws.taskScheduler.reload();
       return {};
     }
     case WalletApiOperation.DeleteExchange: {
@@ -1634,11 +1433,6 @@ export class Wallet {
     this.ws.stop();
   }
 
-  async runPending(): Promise<void> {
-    await this.ws.ensureWalletDbOpen();
-    return runPending(this.ws);
-  }
-
   async runTaskLoop(opts?: RetryLoopOpts): Promise<void> {
     await this.ws.ensureWalletDbOpen();
     return runTaskLoop(this.ws, opts);
@@ -1660,11 +1454,6 @@ export class Wallet {
  * This ties together all the operation implementations.
  */
 class InternalWalletStateImpl implements InternalWalletState {
-  /**
-   * @see {@link InternalWalletState.activeLongpoll}
-   */
-  activeLongpoll: ActiveLongpollInfo = {};
-
   cryptoApi: TalerCryptoInterface;
   cryptoDispatcher: CryptoDispatcher;
 
@@ -1697,6 +1486,8 @@ class InternalWalletStateImpl implements 
InternalWalletState {
 
   isTaskLoopRunning: boolean = false;
 
+  taskScheduler: TaskScheduler = new TaskScheduler(this);
+
   config: Readonly<WalletConfig>;
 
   private _db: DbAccess<typeof WalletStoresV1> | undefined = undefined;
@@ -1843,7 +1634,7 @@ class InternalWalletStateImpl implements 
InternalWalletState {
   }
 
   notify(n: WalletNotification): void {
-    logger.trace("Notification", j2s(n));
+    logger.trace(`Notification: ${j2s(n)}`);
     for (const l of this.listeners) {
       const nc = JSON.parse(JSON.stringify(n));
       setTimeout(() => {
@@ -1870,11 +1661,6 @@ class InternalWalletStateImpl implements 
InternalWalletState {
     this.stopped = true;
     this.timerGroup.stopCurrentAndFutureTimers();
     this.cryptoDispatcher.stop();
-    for (const key of Object.keys(this.activeLongpoll)) {
-      logger.trace(`cancelling active longpoll ${key}`);
-      this.activeLongpoll[key].cancel();
-      delete this.activeLongpoll[key];
-    }
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]