gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: wallet-core: implement partia


From: gnunet
Subject: [taler-wallet-core] branch master updated: wallet-core: implement partial withdrawal batching, don't block when generating planchets
Date: Fri, 10 Feb 2023 13:21:45 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new 18c30b9a0 wallet-core: implement partial withdrawal batching, don't 
block when generating planchets
18c30b9a0 is described below

commit 18c30b9a00a4e5dee629f4e06c261509ff7ba455
Author: Florian Dold <florian@dold.me>
AuthorDate: Fri Feb 10 13:21:37 2023 +0100

    wallet-core: implement partial withdrawal batching, don't block when 
generating planchets
---
 packages/taler-harness/src/harness/harness.ts      |  10 +-
 .../src/integrationtests/test-withdrawal-huge.ts   |   3 +-
 packages/taler-util/src/taler-types.ts             |  19 +-
 .../taler-wallet-core/src/operations/withdraw.ts   | 378 ++++++++++-----------
 4 files changed, 207 insertions(+), 203 deletions(-)

diff --git a/packages/taler-harness/src/harness/harness.ts 
b/packages/taler-harness/src/harness/harness.ts
index 3403c266e..4e5d8238c 100644
--- a/packages/taler-harness/src/harness/harness.ts
+++ b/packages/taler-harness/src/harness/harness.ts
@@ -1361,7 +1361,12 @@ export class ExchangeService implements 
ExchangeServiceInterface {
 
     this.exchangeWirewatchProc = this.globalState.spawnService(
       "taler-exchange-wirewatch",
-      ["-c", this.configFilename, ...this.timetravelArgArr],
+      [
+        "-c",
+        this.configFilename,
+        "--longpoll-timeout=5s",
+        ...this.timetravelArgArr,
+      ],
       `exchange-wirewatch-${this.name}`,
     );
 
@@ -1951,6 +1956,9 @@ export class WalletService {
       ],
       `wallet-${this.opts.name}`,
     );
+    logger.info(
+      `hint: connect to wallet using taler-wallet-cli 
--wallet-connection=${unixPath}`,
+    );
   }
 
   async pingUntilAvailable(): Promise<void> {
diff --git 
a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts 
b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
index 579d727b1..437d799b8 100644
--- a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
+++ b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts
@@ -87,9 +87,10 @@ export async function runWithdrawalHugeTest(t: 
GlobalTestState) {
     exchangeBaseUrl: exchange.baseUrl,
   });
 
+  // Results in about 1K coins withdrawn
   await wallet.client.call(WalletApiOperation.WithdrawFakebank, {
     exchange: exchange.baseUrl,
-    amount: "TESTKUDOS:5000",
+    amount: "TESTKUDOS:10000",
     bank: bank.baseUrl,
   });
 
diff --git a/packages/taler-util/src/taler-types.ts 
b/packages/taler-util/src/taler-types.ts
index a9303ed9c..bb15f0494 100644
--- a/packages/taler-util/src/taler-types.ts
+++ b/packages/taler-util/src/taler-types.ts
@@ -951,12 +951,12 @@ export const codecForBlindedDenominationSignature = () =>
     .alternative(DenomKeyType.Rsa, codecForRsaBlindedDenominationSignature())
     .build("BlindedDenominationSignature");
 
-export class WithdrawResponse {
+export class ExchangeWithdrawResponse {
   ev_sig: BlindedDenominationSignature;
 }
 
-export class WithdrawBatchResponse {
-  ev_sigs: WithdrawResponse[];
+export class ExchangeWithdrawBatchResponse {
+  ev_sigs: ExchangeWithdrawResponse[];
 }
 
 export interface MerchantPayResponse {
@@ -1476,13 +1476,13 @@ export const codecForRecoupConfirmation = (): 
Codec<RecoupConfirmation> =>
     .property("old_coin_pub", codecOptional(codecForString()))
     .build("RecoupConfirmation");
 
-export const codecForWithdrawResponse = (): Codec<WithdrawResponse> =>
-  buildCodecForObject<WithdrawResponse>()
+export const codecForWithdrawResponse = (): Codec<ExchangeWithdrawResponse> =>
+  buildCodecForObject<ExchangeWithdrawResponse>()
     .property("ev_sig", codecForBlindedDenominationSignature())
     .build("WithdrawResponse");
 
-export const codecForWithdrawBatchResponse = (): Codec<WithdrawBatchResponse> 
=>
-  buildCodecForObject<WithdrawBatchResponse>()
+export const codecForWithdrawBatchResponse = (): 
Codec<ExchangeWithdrawBatchResponse> =>
+  buildCodecForObject<ExchangeWithdrawBatchResponse>()
     .property("ev_sigs", codecForList(codecForWithdrawResponse()))
     .build("WithdrawBatchResponse");
 
@@ -1753,6 +1753,11 @@ export interface ExchangeWithdrawRequest {
   coin_ev: CoinEnvelope;
 }
 
+export interface ExchangeBatchWithdrawRequest {
+  planchets: ExchangeWithdrawRequest[];
+}
+
+
 export interface ExchangeRefreshRevealRequest {
   new_denoms_h: HashCodeString[];
   coin_evs: CoinEnvelope[];
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts 
b/packages/taler-wallet-core/src/operations/withdraw.ts
index caa280fe5..987a5e062 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -59,9 +59,11 @@ import {
   TransactionType,
   UnblindedSignature,
   URL,
-  WithdrawBatchResponse,
-  WithdrawResponse,
+  ExchangeWithdrawBatchResponse,
+  ExchangeWithdrawResponse,
   WithdrawUriInfoResponse,
+  ExchangeBatchWithdrawRequest,
+  WalletNotification,
 } from "@gnu-taler/taler-util";
 import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
 import {
@@ -93,6 +95,7 @@ import {
 import { walletCoreDebugFlags } from "../util/debugFlags.js";
 import {
   HttpRequestLibrary,
+  HttpResponse,
   readSuccessResponseJsonOrErrorCode,
   readSuccessResponseJsonOrThrow,
   throwUnexpectedRequestError,
@@ -455,136 +458,21 @@ async function processPlanchetGenerate(
     });
 }
 
-/**
- * Send the withdrawal request for a generated planchet to the exchange.
- *
- * The verification of the response is done asynchronously to enable 
parallelism.
- */
-async function processPlanchetExchangeRequest(
-  ws: InternalWalletState,
-  wgContext: WithdrawalGroupContext,
-  coinIdx: number,
-): Promise<WithdrawResponse | undefined> {
-  const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
-  logger.info(
-    `processing planchet exchange request 
${withdrawalGroup.withdrawalGroupId}/${coinIdx}`,
-  );
-  const d = await ws.db
-    .mktx((x) => [
-      x.withdrawalGroups,
-      x.planchets,
-      x.exchanges,
-      x.denominations,
-    ])
-    .runReadOnly(async (tx) => {
-      let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
-        withdrawalGroup.withdrawalGroupId,
-        coinIdx,
-      ]);
-      if (!planchet) {
-        return;
-      }
-      if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
-        logger.warn("processPlanchet: planchet already withdrawn");
-        return;
-      }
-      const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
-      if (!exchange) {
-        logger.error("db inconsistent: exchange for planchet not found");
-        return;
-      }
-
-      const denom = await ws.getDenomInfo(
-        ws,
-        tx,
-        withdrawalGroup.exchangeBaseUrl,
-        planchet.denomPubHash,
-      );
-
-      if (!denom) {
-        logger.error("db inconsistent: denom for planchet not found");
-        return;
-      }
-
-      logger.trace(
-        `processing planchet #${coinIdx} in withdrawal 
${withdrawalGroup.withdrawalGroupId}`,
-      );
+interface WithdrawalRequestBatchArgs {
+  /**
+   * Use the batched request on the network level.
+   * Not supported by older exchanges.
+   */
+  useBatchRequest: boolean;
 
-      const reqBody: ExchangeWithdrawRequest = {
-        denom_pub_hash: planchet.denomPubHash,
-        reserve_sig: planchet.withdrawSig,
-        coin_ev: planchet.coinEv,
-      };
-      const reqUrl = new URL(
-        `reserves/${withdrawalGroup.reservePub}/withdraw`,
-        exchange.baseUrl,
-      ).href;
+  coinStartIndex: number;
 
-      return { reqUrl, reqBody };
-    });
+  batchSize: number;
+}
 
-  if (!d) {
-    return;
-  }
-  const { reqUrl, reqBody } = d;
-
-  try {
-    const resp = await ws.http.postJson(reqUrl, reqBody);
-    if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
-      logger.info("withdrawal requires KYC");
-      const respJson = await resp.json();
-      const uuidResp = codecForWalletKycUuid().decode(respJson);
-      logger.info(`kyc uuid response: ${j2s(uuidResp)}`);
-      await ws.db
-        .mktx((x) => [x.planchets, x.withdrawalGroups])
-        .runReadWrite(async (tx) => {
-          let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
-            withdrawalGroup.withdrawalGroupId,
-            coinIdx,
-          ]);
-          if (!planchet) {
-            return;
-          }
-          planchet.planchetStatus = PlanchetStatus.KycRequired;
-          const wg2 = await tx.withdrawalGroups.get(
-            withdrawalGroup.withdrawalGroupId,
-          );
-          if (!wg2) {
-            return;
-          }
-          wg2.kycPending = {
-            paytoHash: uuidResp.h_payto,
-            requirementRow: uuidResp.requirement_row,
-          };
-          await tx.planchets.put(planchet);
-          await tx.withdrawalGroups.put(wg2);
-        });
-      return;
-    }
-    const r = await readSuccessResponseJsonOrThrow(
-      resp,
-      codecForWithdrawResponse(),
-    );
-    return r;
-  } catch (e) {
-    const errDetail = getErrorDetailFromException(e);
-    logger.trace("withdrawal request failed", e);
-    logger.trace(String(e));
-    await ws.db
-      .mktx((x) => [x.planchets])
-      .runReadWrite(async (tx) => {
-        let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
-          withdrawalGroup.withdrawalGroupId,
-          coinIdx,
-        ]);
-        if (!planchet) {
-          return;
-        }
-        planchet.lastError = errDetail;
-        await tx.planchets.put(planchet);
-      });
-    return;
-  }
+interface WithdrawalBatchResult {
+  coinIdxs: number[];
+  batchResp: ExchangeWithdrawBatchResponse;
 }
 
 /**
@@ -595,15 +483,18 @@ async function processPlanchetExchangeRequest(
 async function processPlanchetExchangeBatchRequest(
   ws: InternalWalletState,
   wgContext: WithdrawalGroupContext,
-): Promise<WithdrawBatchResponse | undefined> {
+  args: WithdrawalRequestBatchArgs,
+): Promise<WithdrawalBatchResult> {
   const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
   logger.info(
-    `processing planchet exchange batch request 
${withdrawalGroup.withdrawalGroupId}`,
+    `processing planchet exchange batch request 
${withdrawalGroup.withdrawalGroupId}, start=${args.coinStartIndex}, 
len=${args.batchSize}`,
   );
-  const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
-    .map((x) => x.count)
-    .reduce((a, b) => a + b);
-  const d = await ws.db
+
+  const batchReq: ExchangeBatchWithdrawRequest = { planchets: [] };
+  // Indices of coins that are included in the batch request
+  const coinIdxs: number[] = [];
+
+  await ws.db
     .mktx((x) => [
       x.withdrawalGroups,
       x.planchets,
@@ -611,26 +502,22 @@ async function processPlanchetExchangeBatchRequest(
       x.denominations,
     ])
     .runReadOnly(async (tx) => {
-      const reqBody: { planchets: ExchangeWithdrawRequest[] } = {
-        planchets: [],
-      };
-      const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
-      if (!exchange) {
-        logger.error("db inconsistent: exchange for planchet not found");
-        return;
-      }
-
-      for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
+      for (
+        let coinIdx = args.coinStartIndex;
+        coinIdx < args.coinStartIndex + args.batchSize &&
+        coinIdx < wgContext.numPlanchets;
+        coinIdx++
+      ) {
         let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
           withdrawalGroup.withdrawalGroupId,
           coinIdx,
         ]);
         if (!planchet) {
-          return;
+          continue;
         }
         if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
           logger.warn("processPlanchet: planchet already withdrawn");
-          return;
+          continue;
         }
         const denom = await ws.getDenomInfo(
           ws,
@@ -641,7 +528,7 @@ async function processPlanchetExchangeBatchRequest(
 
         if (!denom) {
           logger.error("db inconsistent: denom for planchet not found");
-          return;
+          continue;
         }
 
         const planchetReq: ExchangeWithdrawRequest = {
@@ -649,35 +536,145 @@ async function processPlanchetExchangeBatchRequest(
           reserve_sig: planchet.withdrawSig,
           coin_ev: planchet.coinEv,
         };
-        reqBody.planchets.push(planchetReq);
+        batchReq.planchets.push(planchetReq);
+        coinIdxs.push(coinIdx);
       }
-      return reqBody;
     });
 
-  if (!d) {
+  if (batchReq.planchets.length == 0) {
+    logger.warn("empty withdrawal batch");
+    return {
+      batchResp: { ev_sigs: [] },
+      coinIdxs: [],
+    };
+  }
+
+  async function handleKycRequired(resp: HttpResponse, startIdx: number) {
+    logger.info("withdrawal requires KYC");
+    const respJson = await resp.json();
+    const uuidResp = codecForWalletKycUuid().decode(respJson);
+    logger.info(`kyc uuid response: ${j2s(uuidResp)}`);
+    await ws.db
+      .mktx((x) => [x.planchets, x.withdrawalGroups])
+      .runReadWrite(async (tx) => {
+        for (let i = 0; i < startIdx; i++) {
+          let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+            withdrawalGroup.withdrawalGroupId,
+            coinIdxs[i],
+          ]);
+          if (!planchet) {
+            continue;
+          }
+          planchet.planchetStatus = PlanchetStatus.KycRequired;
+          await tx.planchets.put(planchet);
+        }
+        const wg2 = await tx.withdrawalGroups.get(
+          withdrawalGroup.withdrawalGroupId,
+        );
+        if (!wg2) {
+          return;
+        }
+        wg2.kycPending = {
+          paytoHash: uuidResp.h_payto,
+          requirementRow: uuidResp.requirement_row,
+        };
+        await tx.withdrawalGroups.put(wg2);
+      });
     return;
   }
 
-  const reqUrl = new URL(
-    `reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
-    withdrawalGroup.exchangeBaseUrl,
-  ).href;
+  async function storeCoinError(e: any, coinIdx: number) {
+    const errDetail = getErrorDetailFromException(e);
+    logger.trace("withdrawal request failed", e);
+    logger.trace(String(e));
+    await ws.db
+      .mktx((x) => [x.planchets])
+      .runReadWrite(async (tx) => {
+        let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+          withdrawalGroup.withdrawalGroupId,
+          coinIdx,
+        ]);
+        if (!planchet) {
+          return;
+        }
+        planchet.lastError = errDetail;
+        await tx.planchets.put(planchet);
+      });
+  }
 
-  const resp = await ws.http.postJson(reqUrl, d);
-  const r = await readSuccessResponseJsonOrThrow(
-    resp,
-    codecForWithdrawBatchResponse(),
-  );
-  return r;
+  // FIXME: handle individual error codes better!
+
+  if (args.useBatchRequest) {
+    const reqUrl = new URL(
+      `reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
+      withdrawalGroup.exchangeBaseUrl,
+    ).href;
+
+    try {
+      const resp = await ws.http.postJson(reqUrl, batchReq);
+      if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
+        await handleKycRequired(resp, 0);
+      }
+      const r = await readSuccessResponseJsonOrThrow(
+        resp,
+        codecForWithdrawBatchResponse(),
+      );
+      return {
+        coinIdxs,
+        batchResp: r,
+      };
+    } catch (e) {
+      await storeCoinError(e, coinIdxs[0]);
+      return {
+        batchResp: { ev_sigs: [] },
+        coinIdxs: [],
+      };
+    }
+  } else {
+    // We emulate the batch response here by making multiple individual 
requests
+    const responses: ExchangeWithdrawBatchResponse = {
+      ev_sigs: [],
+    };
+    for (let i = 0; i < batchReq.planchets.length; i++) {
+      try {
+        const p = batchReq.planchets[i];
+        const reqUrl = new URL(
+          `reserves/${withdrawalGroup.reservePub}/withdraw`,
+          withdrawalGroup.exchangeBaseUrl,
+        ).href;
+        const resp = await ws.http.postJson(reqUrl, p);
+        if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
+          await handleKycRequired(resp, i);
+          // We still return blinded coins that we could actually withdraw.
+          return {
+            coinIdxs,
+            batchResp: responses,
+          };
+        }
+        const r = await readSuccessResponseJsonOrThrow(
+          resp,
+          codecForWithdrawResponse(),
+        );
+        responses.ev_sigs.push(r);
+      } catch (e) {
+        await storeCoinError(e, coinIdxs[i]);
+      }
+    }
+    return {
+      coinIdxs,
+      batchResp: responses,
+    };
+  }
 }
 
 async function processPlanchetVerifyAndStoreCoin(
   ws: InternalWalletState,
   wgContext: WithdrawalGroupContext,
   coinIdx: number,
-  resp: WithdrawResponse,
+  resp: ExchangeWithdrawResponse,
 ): Promise<void> {
   const withdrawalGroup = wgContext.wgRecord;
+  logger.info(`checking and storing planchet idx=${coinIdx}`);
   const d = await ws.db
     .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations])
     .runReadOnly(async (tx) => {
@@ -791,6 +788,14 @@ async function processPlanchetVerifyAndStoreCoin(
 
   wgContext.planchetsFinished.add(planchet.coinPub);
 
+  // We create the notification here, as the async transaction below
+  // allows other planchet withdrawals to change wgContext.planchetsFinished
+  const notification: WalletNotification = {
+    type: NotificationType.CoinWithdrawn,
+    numTotal: wgContext.numPlanchets,
+    numWithdrawn: wgContext.planchetsFinished.size,
+  }
+
   // Check if this is the first time that the whole
   // withdrawal succeeded.  If so, mark the withdrawal
   // group as finished.
@@ -814,11 +819,7 @@ async function processPlanchetVerifyAndStoreCoin(
     });
 
   if (firstSuccess) {
-    ws.notify({
-      type: NotificationType.CoinWithdrawn,
-      numTotal: wgContext.numPlanchets,
-      numWithdrawn: wgContext.planchetsFinished.size,
-    });
+    ws.notify(notification);
   }
 }
 
@@ -1150,8 +1151,6 @@ export async function processWithdrawalGroup(
     wgRecord: withdrawalGroup,
   };
 
-  let work: Promise<void>[] = [];
-
   await ws.db
     .mktx((x) => [x.planchets])
     .runReadOnly(async (tx) => {
@@ -1165,44 +1164,35 @@ export async function processWithdrawalGroup(
       }
     });
 
+  // We sequentially generate planchets, so that
+  // large withdrawal groups don't make the wallet unresponsive.
   for (let i = 0; i < numTotalCoins; i++) {
-    work.push(processPlanchetGenerate(ws, withdrawalGroup, i));
+    await processPlanchetGenerate(ws, withdrawalGroup, i);
   }
 
-  // Generate coins concurrently (parallelism only happens in the crypto API 
workers)
-  await Promise.all(work);
-
-  work = [];
+  const maxBatchSize = 100;
 
-  if (ws.batchWithdrawal) {
-    const resp = await processPlanchetExchangeBatchRequest(ws, wgContext);
-    if (!resp) {
-      throw Error("unable to do batch withdrawal");
-    }
-    for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
+  for (let i = 0; i < numTotalCoins; i += maxBatchSize) {
+    const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, {
+      batchSize: maxBatchSize,
+      coinStartIndex: i,
+      useBatchRequest: ws.batchWithdrawal,
+    });
+    let work: Promise<void>[] = [];
+    work = [];
+    for (let j = 0; j < resp.coinIdxs.length; j++) {
       work.push(
         processPlanchetVerifyAndStoreCoin(
           ws,
           wgContext,
-          coinIdx,
-          resp.ev_sigs[coinIdx],
+          resp.coinIdxs[j],
+          resp.batchResp.ev_sigs[j],
         ),
       );
     }
-  } else {
-    for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
-      const resp = await processPlanchetExchangeRequest(ws, wgContext, 
coinIdx);
-      if (!resp) {
-        continue;
-      }
-      work.push(
-        processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp),
-      );
-    }
+    await Promise.all(work);
   }
 
-  await Promise.all(work);
-
   let numFinished = 0;
   let numKycRequired = 0;
   let finishedForFirstTime = false;

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]