gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-iono] branch master updated: fix uv loop, redo threading code


From: gnunet
Subject: [taler-iono] branch master updated: fix uv loop, redo threading code
Date: Sat, 17 Jul 2021 04:54:26 +0200

This is an automated email from the git hooks/post-receive script.

jonathan-buchanan pushed a commit to branch master
in repository iono.

The following commit(s) were added to refs/heads/master by this push:
     new 62545d0  fix uv loop, redo threading code
62545d0 is described below

commit 62545d0c3e98472d0f0d945e9a4c5cce5ea03baa
Author: Jonathan Buchanan <jonathan.russ.buchanan@gmail.com>
AuthorDate: Fri Jul 16 22:54:02 2021 -0400

    fix uv loop, redo threading code
---
 iono/iono.cpp   |  50 +++++++++----------
 iono/iono.swift | 148 +++++++++++++++++++++++++++++++++++++++++++-------------
 2 files changed, 139 insertions(+), 59 deletions(-)

diff --git a/iono/iono.cpp b/iono/iono.cpp
index cec96f1..9a7d6e4 100644
--- a/iono/iono.cpp
+++ b/iono/iono.cpp
@@ -22,6 +22,7 @@
 #include <uv.h>
 
 #define NODE_WANT_INTERNALS 1
+#include <env.h>
 #include <node_binding.h>
 
 #include <iostream>
@@ -35,22 +36,23 @@ struct __IonoInstance
     std::unique_ptr<node::CommonEnvironmentSetup> setup;
     v8::Isolate *isolate;
     node::Environment *env;
+    uv_loop_t *event_loop;
     uv_async_t async_notify;
-    
+
     bool break_requested;
-    
+
     /* Notifications to swift */
     __NotifyHandler notification_handler;
     void *notification_userdata;
-    
+
     __IonoInstance();
-    
+
     char *
     evalJs(const char *js);
-    
+
     void
     runNode();
-    
+
     void
     makeCallback(const char *callback);
 };
@@ -126,16 +128,16 @@ static void
 getModuleCode(const v8::FunctionCallbackInfo<v8::Value> &args);
 
 static const std::string main_code = "const publicRequire ="
-                                    "  
require('module').createRequire(process.cwd() + '/');"
-                                    "  globalThis.require = publicRequire;"
-                                    "  
require('vm').runInThisContext(process.argv[1]);global.__node_run = (x) => {"
-                                    "  0 && console.log('running code', x);"
-                                    "  global.eval(x);"
-                                    "};"
-                                    ""
-                                    "global.__native_onMessage = (x) => {"
-                                    "  0 && console.log('got 
__native_onMessage', x);"
-                                    "};";
+                                     "  
require('module').createRequire(process.cwd() + '/');"
+                                     "  globalThis.require = publicRequire;"
+                                     "  
require('vm').runInThisContext(process.argv[1]);"
+                                     "global.__node_run = (x) => {"
+                                     "  0 && console.log('running code', x);"
+                                     "  global.eval(x);"
+                                     "};"
+                                     "global.__native_onMessage = (x) => {"
+                                     "  0 && console.log('got 
__native_onMessage', x);"
+                                     "};";
 
 static void
 _register_iono();
@@ -144,16 +146,10 @@ __IonoInstance::__IonoInstance() :
     break_requested(false),
     notification_handler(nullptr)
 {
-    {
-        uv_loop_t *loop = uv_default_loop();
-        uv_async_init(loop, &async_notify, &notifyCallback);
-        async_notify.data = this;
-    }
-    
     std::vector<std::string> args = { "node" };
     std::vector<std::string> exec_args;
     std::vector<std::string> errors;
-    
+
     if (nullptr == platform)
     {
         int exit_code = node::InitializeNodeWithArgs(&args, &exec_args, 
&errors);
@@ -178,6 +174,10 @@ __IonoInstance::__IonoInstance() :
 
     isolate = setup->isolate();
     env = setup->env();
+    event_loop = setup->event_loop();
+
+    uv_async_init(event_loop, &async_notify, &notifyCallback);
+    async_notify.data = this;
 
     {
         v8::Locker locker(isolate);
@@ -186,7 +186,7 @@ __IonoInstance::__IonoInstance() :
         v8::Context::Scope context_scope(setup->context());
 
         node::LoadEnvironment(env, main_code.c_str());
-        
+
         v8::Local<v8::ObjectTemplate> data_template = 
v8::ObjectTemplate::New(isolate);
         data_template->SetInternalFieldCount(1);
         v8::Local<v8::Object> data_object = 
data_template->NewInstance(setup->context()).ToLocalChecked();
@@ -242,7 +242,7 @@ __IonoInstance::runNode() {
     v8::Context::Scope context_scope(setup->context());
     break_requested = false;
     while (true) {
-        uv_run(uv_default_loop(), UV_RUN_ONCE);
+        uv_run(event_loop, UV_RUN_ONCE);
         platform->DrainTasks(isolate);
         if (break_requested)
             break;
diff --git a/iono/iono.swift b/iono/iono.swift
index 23e0104..1f07140 100644
--- a/iono/iono.swift
+++ b/iono/iono.swift
@@ -27,48 +27,127 @@ func notification_callback(payload: 
Optional<UnsafePointer<Int8>>,
     native.internalOnNotify(payload: string)
 }
 
+struct Queue<T> {
+    var contents: [T]
+
+    init() {
+        self.contents = []
+    }
+
+    mutating func push(_ element: T) {
+        contents.append(element)
+    }
+
+    mutating func pop() -> T? {
+        if (contents.isEmpty) {
+            return nil
+        } else {
+            return contents.remove(at: 0)
+        }
+    }
+}
+
+class NodeThread: Thread {
+    var iono: Iono!
+    var workQueue: Queue<() -> ()>
+    var initialized: Bool
+    var initCondition: NSCondition
+
+    override init() {
+        self.workQueue = Queue<() -> ()>()
+        self.initialized = false
+        self.initCondition = NSCondition()
+        super.init()
+    }
+
+    override func main() {
+        iono.instance = __initNative()
+        __setNotifyHandler(iono.instance, notification_callback, 
Unmanaged.passUnretained(iono).toOpaque())
+        self.initialized = true
+        initCondition.broadcast()
+        while true {
+            __runNode(iono.instance)
+            while let workItem = workQueue.pop() {
+                workItem()
+            }
+            if iono.stopped {
+                break
+            }
+        }
+    }
+
+    func waitUntilInitialized(block: @escaping () -> ()) {
+        if (self.initialized) {
+            block()
+            return
+        }
+
+        initCondition.lock()
+        while (!self.initialized) {
+            initCondition.wait()
+        }
+
+        block()
+
+        initCondition.unlock()
+    }
+}
+
 public class Iono {
+    var stopped: Bool
+    var thread: NodeThread
+
     var instance: OpaquePointer!
-    var work_queue: DispatchQueue
-    var initialization_group: DispatchGroup
-    var messageHandler: IonoMessageHandler?
-    
-    public init() {
-        work_queue = DispatchQueue(label: "NodeQueue", qos: .userInitiated)
-        initialization_group = DispatchGroup()
-        initialization_group.notify(queue: work_queue) {
-            self.instance = __initNative()
-            __setNotifyHandler(self.instance, notification_callback, 
Unmanaged.passUnretained(self).toOpaque())
-        }
+    public var messageHandler: IonoMessageHandler?
+
+    public init() { // We need to be calling runNode!
+        self.stopped = false
+        self.thread = NodeThread()
+        self.thread.iono = self
+
+        self.thread.start()
     }
-    
+
     deinit {
         __destroyNative(instance)
     }
-    
-    private func scheduleNodeThreadAsync(block: @escaping () -> Void) {
-        initialization_group.wait()
-        work_queue.async(execute: block)
-        notifyNative()
+
+    private func scheduleNodeThreadAsync(block: @escaping () -> ()) {
+        thread.waitUntilInitialized {
+            self.thread.workQueue.push(block)
+            self.notifyNative()
+        }
     }
-    
+
     private func scheduleNodeThreadSync(block: @escaping () -> Void) {
-        initialization_group.wait()
-        work_queue.sync(execute: block)
-        notifyNative()
+        var hasExecuted = false
+        let executeCondition = NSCondition()
+
+        executeCondition.lock()
+        thread.waitUntilInitialized {
+            self.thread.workQueue.push {
+                block()
+                hasExecuted = true
+                executeCondition.broadcast()
+            }
+            self.notifyNative()
+        }
+        while (!hasExecuted) {
+            executeCondition.wait()
+        }
+        executeCondition.unlock()
     }
-    
+
     public func internalOnNotify(payload: String) {
         if let handler = messageHandler {
             handler.handleMessage(message: payload)
         }
     }
-    
+
     public func notifyNative() {
-        initialization_group.wait()
         __notifyNative(instance)
     }
-    
+
     public func evalSimpleJs(source: String) -> String {
         var result: String?
         scheduleNodeThreadSync {
@@ -80,13 +159,13 @@ public class Iono {
         }
         return result!
     }
-    
+
     public func evalNodeCode(source: String) {
         scheduleNodeThreadAsync {
             __makeCallbackNative(self.instance, source.cString(using: .utf8))
         }
     }
-    
+
     public func sendMessage(message: String) {
         let encoded = message.data(using: .utf8)!.base64EncodedString()
         let source = """
@@ -99,15 +178,16 @@ public class Iono {
         """
         evalNodeCode(source: source)
     }
-    
+
     public func waitStopped() {
-        
-    }
-    
-    public func putModuleCode(modName: String, code: String) {
         scheduleNodeThreadSync {
-            __putModuleCodeNative(self.instance, modName.cString(using: .utf8),
-                                  code.cString(using: .utf8))
+            self.stopped = true
         }
+        thread.cancel()
+    }
+
+    public func putModuleCode(modName: String, code: String) {
+        __putModuleCodeNative(self.instance, modName.cString(using: .utf8),
+                              code.cString(using: .utf8))
     }
 }

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]