gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: wallet-core: test crypto disp


From: gnunet
Subject: [taler-wallet-core] branch master updated: wallet-core: test crypto dispatcher, fix timeout handling
Date: Wed, 04 Jan 2023 13:24:27 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new 60374078f wallet-core: test crypto dispatcher, fix timeout handling
60374078f is described below

commit 60374078f4e41e9398607628d8b33b74bb3431aa
Author: Florian Dold <florian@dold.me>
AuthorDate: Wed Jan 4 13:24:19 2023 +0100

    wallet-core: test crypto dispatcher, fix timeout handling
---
 .../src/crypto/workers/crypto-dispatcher.test.ts   | 130 +++++++++++++++++++++
 .../{cryptoDispatcher.ts => crypto-dispatcher.ts}  |  48 ++++----
 .../src/crypto/workers/nodeThreadWorker.ts         |   2 +-
 .../crypto/workers/synchronousWorkerFactoryNode.ts |   2 +-
 .../workers/synchronousWorkerFactoryPlain.ts       |   2 +-
 packages/taler-wallet-core/src/index.ts            |   2 +-
 .../taler-wallet-core/src/internal-wallet-state.ts |   2 +-
 .../taler-wallet-core/src/operations/refresh.ts    |   2 +-
 packages/taler-wallet-core/src/wallet.ts           |   2 +-
 9 files changed, 159 insertions(+), 33 deletions(-)

diff --git 
a/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts 
b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts
new file mode 100644
index 000000000..b63c9bf11
--- /dev/null
+++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts
@@ -0,0 +1,130 @@
+/*
+ This file is part of GNU Taler
+ (C) 2023 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
+ */
+
+import test from "ava";
+import { CryptoDispatcher, CryptoWorkerFactory } from "./crypto-dispatcher.js";
+import { CryptoWorker, CryptoWorkerResponseMessage } from 
"./cryptoWorkerInterface.js";
+import { SynchronousCryptoWorkerFactoryNode } from 
"./synchronousWorkerFactoryNode.js";
+import { processRequestWithImpl } from "./worker-common.js";
+
+
+export class MyCryptoWorker implements CryptoWorker {
+  /**
+   * Function to be called when we receive a message from the worker thread.
+   */
+  onmessage: undefined | ((m: any) => void) = undefined;
+
+  /**
+   * Function to be called when we receive an error from the worker thread.
+   */
+  onerror: undefined | ((m: any) => void) = undefined;
+
+  /**
+   * Add an event listener for either an "error" or "message" event.
+   */
+  addEventListener(event: "message" | "error", fn: (x: any) => void): void {
+    switch (event) {
+      case "message":
+        this.onmessage = fn;
+        break;
+      case "error":
+        this.onerror = fn;
+        break;
+    }
+  }
+
+  private dispatchMessage(msg: any): void {
+    if (this.onmessage) {
+      this.onmessage(msg);
+    }
+  }
+
+  /**
+   * Send a message to the worker thread.
+   */
+  postMessage(msg: any): void {
+    const handleRequest = async () => {
+      let responseMsg: CryptoWorkerResponseMessage;
+      if (msg.operation === "testSuccess") {
+        responseMsg = {
+          id: msg.id,
+          type: "success",
+          result: {
+            testResult: 42,
+          }
+        }
+      } else if (msg.operation === "testError") {
+        responseMsg = {
+          id: msg.id,
+          type: "error",
+          error: {
+            code: 42,
+            hint: "bla",
+          }
+        }
+      } else if (msg.operation === "testTimeout") {
+        // Don't respond
+        return;
+      }
+      try {
+        setTimeout(() => this.dispatchMessage(responseMsg), 0);
+      } catch (e) {
+        console.error("got error during dispatch", e);
+      }
+    };
+    handleRequest().catch((e) => {
+      console.error("Error while handling crypto request:", e);
+    });
+  }
+
+  /**
+   * Forcibly terminate the worker thread.
+   */
+  terminate(): void {
+    // This is a no-op.
+  }
+}
+
+
+
+export class MyCryptoWorkerFactory implements CryptoWorkerFactory {
+  startWorker(): CryptoWorker {
+    return new MyCryptoWorker();
+  }
+
+  getConcurrency(): number {
+    return 1;
+  }
+}
+
+test("continues after error", async (t) => {
+  const cryptoDisp = new CryptoDispatcher(
+    new MyCryptoWorkerFactory(),
+  );
+  const resp1 = await cryptoDisp.doRpc("testSuccess", 0, {});
+  t.assert((resp1 as any).testResult === 42);
+  const exc = await t.throwsAsync(async() => {
+    const resp2 = await cryptoDisp.doRpc("testError", 0, {});
+  });
+
+  // Check that it still works after one error.
+  const resp2 = await cryptoDisp.doRpc("testSuccess", 0, {});
+  t.assert((resp2 as any).testResult === 42);
+
+  // Check that it still works after timeout.
+  const resp3 = await cryptoDisp.doRpc("testSuccess", 0, {});
+  t.assert((resp3 as any).testResult === 42);
+});
diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts 
b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts
similarity index 94%
rename from packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
rename to packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts
index 88aea71b9..940078ea6 100644
--- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts
@@ -203,13 +203,7 @@ export class CryptoDispatcher {
     ws.idleTimeoutHandle.unref();
   }
 
-  handleWorkerError(ws: WorkerInfo, e: any): void {
-    if (ws.currentWorkItem) {
-      logger.error(`error in worker during ${ws.currentWorkItem.operation}`, 
e);
-    } else {
-      logger.error("error in worker", e);
-    }
-    logger.error(e.message);
+  private resetWorker(ws: WorkerInfo, e: any): void {
     try {
       if (ws.w) {
         ws.w.terminate();
@@ -227,6 +221,16 @@ export class CryptoDispatcher {
     this.findWork(ws);
   }
 
+  handleWorkerError(ws: WorkerInfo, e: any): void {
+    if (ws.currentWorkItem) {
+      logger.error(`error in worker during ${ws.currentWorkItem.operation}`, 
e);
+    } else {
+      logger.error("error in worker", e);
+    }
+    logger.error(e.message);
+    this.resetWorker(ws, e);
+  }
+
   private findWork(ws: WorkerInfo): void {
     // try to find more work for this worker
     for (let i = 0; i < NUM_PRIO; i++) {
@@ -304,7 +308,7 @@ export class CryptoDispatcher {
     }
   }
 
-  private doRpc<T>(
+  doRpc<T>(
     operation: string,
     priority: number,
     req: unknown,
@@ -355,30 +359,22 @@ export class CryptoDispatcher {
     // (The worker child process won't keep us alive either, because we un-ref
     // it to make sure it doesn't keep us alive if there is no work.)
     return new Promise<T>((resolve, reject) => {
-      let timedOut = false;
-      const timeout = timer.after(10000, () => {
-        logger.warn(`crypto RPC call ('${operation}') timed out`);
-        timedOut = true;
-        reject(new Error(`crypto RPC call ('${operation}') timed out`));
-        if (workItem.state === WorkItemState.Running) {
-          workItem.state = WorkItemState.Finished;
-          this.numBusy--;
-        }
-      });
+      let timeoutHandle: TimerHandle | undefined = undefined;
+      const timeoutMs = 5000;
+      const onTimeout = () => {
+        // FIXME: Maybe destroy and re-init worker if request is in processing
+        // state and really taking too long?
+        logger.warn(`crypto RPC call ('${operation}') has been queued for a 
long time`);
+        timeoutHandle = timer.after(timeoutMs, onTimeout);
+      };
       myProm.promise
         .then((x) => {
-          if (timedOut) {
-            return;
-          }
-          timeout.clear();
+          timeoutHandle?.clear();
           resolve(x);
         })
         .catch((x) => {
           logger.info(`crypto RPC call ${operation} threw`);
-          if (timedOut) {
-            return;
-          }
-          timeout.clear();
+          timeoutHandle?.clear();
           reject(x);
         });
     });
diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts 
b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
index 634c891b6..db8bb4737 100644
--- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts
@@ -21,7 +21,7 @@ import { Logger } from "@gnu-taler/taler-util";
 import os from "os";
 import url from "url";
 import { nativeCryptoR } from "../cryptoImplementation.js";
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
+import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
 import { CryptoWorker } from "./cryptoWorkerInterface.js";
 import { processRequestWithImpl } from "./worker-common.js";
 
diff --git 
a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts 
b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts
index 46cf12915..90f9a43fa 100644
--- 
a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts
+++ 
b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts
@@ -17,7 +17,7 @@
 /**
  * Imports.
  */
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
+import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
 import { CryptoWorker } from "./cryptoWorkerInterface.js";
 import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js";
 
diff --git 
a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
 
b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
index d0c8e4b3a..66381bc0e 100644
--- 
a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
+++ 
b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts
@@ -17,7 +17,7 @@
 /**
  * Imports.
  */
-import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
+import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
 import { CryptoWorker } from "./cryptoWorkerInterface.js";
 import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js";
 
diff --git a/packages/taler-wallet-core/src/index.ts 
b/packages/taler-wallet-core/src/index.ts
index 7cc23aa88..e48c9430f 100644
--- a/packages/taler-wallet-core/src/index.ts
+++ b/packages/taler-wallet-core/src/index.ts
@@ -37,7 +37,7 @@ export type { CryptoWorker } from 
"./crypto/workers/cryptoWorkerInterface.js";
 export {
   CryptoWorkerFactory,
   CryptoDispatcher,
-} from "./crypto/workers/cryptoDispatcher.js";
+} from "./crypto/workers/crypto-dispatcher.js";
 
 export * from "./pending-types.js";
 
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts 
b/packages/taler-wallet-core/src/internal-wallet-state.ts
index ebb9cdb9b..93d813cc9 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -41,7 +41,7 @@ import {
   CoinRefreshRequest,
   RefreshReason,
 } from "@gnu-taler/taler-util";
-import { CryptoDispatcher } from "./crypto/workers/cryptoDispatcher.js";
+import { CryptoDispatcher } from "./crypto/workers/crypto-dispatcher.js";
 import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
 import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from 
"./db.js";
 import { PendingOperationsResponse } from "./pending-types.js";
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts 
b/packages/taler-wallet-core/src/operations/refresh.ts
index 806e4a246..eeff84be6 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -52,7 +52,7 @@ import {
   DerivedRefreshSession,
   RefreshNewDenomInfo,
 } from "../crypto/cryptoTypes.js";
-import { CryptoApiStoppedError } from "../crypto/workers/cryptoDispatcher.js";
+import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
 import {
   CoinRecord,
   CoinSourceType,
diff --git a/packages/taler-wallet-core/src/wallet.ts 
b/packages/taler-wallet-core/src/wallet.ts
index f442e678a..1defff0d2 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -106,7 +106,7 @@ import { TalerCryptoInterface } from 
"./crypto/cryptoImplementation.js";
 import {
   CryptoDispatcher,
   CryptoWorkerFactory,
-} from "./crypto/workers/cryptoDispatcher.js";
+} from "./crypto/workers/crypto-dispatcher.js";
 import { clearDatabase } from "./db-utils.js";
 import {
   AuditorTrustRecord,

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]