gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 05/08: background jobs


From: gnunet
Subject: [libeufin] 05/08: background jobs
Date: Wed, 08 Feb 2023 14:32:21 +0100

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit c696a3748b72eb5eb83f576885b3ea28d9c814bc
Author: MS <ms@taler.net>
AuthorDate: Wed Feb 8 14:15:19 2023 +0100

    background jobs
    
    fix invocation of root coroutine.
---
 nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt  | 10 ++-
 .../main/kotlin/tech/libeufin/nexus/Scheduling.kt  | 88 ++++++++++------------
 2 files changed, 47 insertions(+), 51 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index 6795f8e0..f09a152b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -30,6 +30,10 @@ import com.github.ajalt.clikt.parameters.types.int
 import execThrowableOrTerminate
 import com.github.ajalt.clikt.core.*
 import com.github.ajalt.clikt.parameters.options.*
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.newSingleThreadContext
 import startServer
 import tech.libeufin.nexus.iso20022.parseCamtMessage
 import tech.libeufin.nexus.server.client
@@ -70,10 +74,8 @@ class Serve : CliktCommand("Run nexus HTTP server") {
     private val logLevel by option()
     override fun run() {
         setLogLevel(logLevel)
-        execThrowableOrTerminate {
-            dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME))
-        }
-        startOperationScheduler(client)
+        execThrowableOrTerminate { 
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
+        CoroutineScope(Dispatchers.IO).launch(fallback) { 
startOperationScheduler(client) }
         if (withUnixSocket != null) {
             startServer(
                 withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index d4420db6..86c86a37 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -24,9 +24,8 @@ import com.cronutils.model.time.ExecutionTime
 import com.cronutils.parser.CronParser
 import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.client.HttpClient
-import kotlinx.coroutines.CoroutineExceptionHandler
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.*
+import kotlinx.coroutines.GlobalScope.coroutineContext
 import kotlinx.coroutines.time.delay
 import org.jetbrains.exposed.sql.transactions.transaction
 import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
@@ -36,6 +35,7 @@ import java.lang.IllegalArgumentException
 import java.time.Duration
 import java.time.Instant
 import java.time.ZonedDateTime
+import kotlin.coroutines.coroutineContext
 import kotlin.system.exitProcess
 
 private data class TaskSchedule(
@@ -96,59 +96,53 @@ object NexusCron {
         CronParser(cronDefinition)
     }
 }
-/**
- * Fails whenever a unmanaged Throwable reaches the root coroutine.
- */
+
+// Fails whenever a unmanaged Throwable reaches the root coroutine.
 val fallback = CoroutineExceptionHandler { _, err ->
     logger.error(err.stackTraceToString())
     exitProcess(1)
 }
-fun startOperationScheduler(httpClient: HttpClient) {
-    GlobalScope.launch(fallback) {
-        while (true) {
-            // First, assign next execution time stamps to all tasks that need 
them
-            transaction {
-                NexusScheduledTaskEntity.find {
-                    NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
-                }.forEach {
-                    val cron = try {
-                        NexusCron.parser.parse(it.taskCronspec)
-                    } catch (e: IllegalArgumentException) {
-                        logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
-                        return@forEach
-                    }
-                    val zonedNow = ZonedDateTime.now()
-                    val et = ExecutionTime.forCron(cron)
-                    val next = et.nextExecution(zonedNow)
-                    logger.info("scheduling task ${it.taskName} at $next (now 
is $zonedNow)")
-                    it.nextScheduledExecutionSec = next.get().toEpochSecond()
+suspend fun startOperationScheduler(httpClient: HttpClient) {
+    while (true) {
+        // First, assign next execution time stamps to all tasks that need them
+        transaction {
+            NexusScheduledTaskEntity.find {
+                NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+            }.forEach {
+                val cron = try {
+                    NexusCron.parser.parse(it.taskCronspec)
+                } catch (e: IllegalArgumentException) {
+                    logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
+                    return@forEach
                 }
+                val zonedNow = ZonedDateTime.now()
+                val et = ExecutionTime.forCron(cron)
+                val next = et.nextExecution(zonedNow)
+                logger.info("scheduling task ${it.taskName} at $next (now is 
$zonedNow)")
+                it.nextScheduledExecutionSec = next.get().toEpochSecond()
             }
-
-            val nowSec = Instant.now().epochSecond
-            // Second, find tasks that are due
-            val dueTasks = transaction {
-                NexusScheduledTaskEntity.find {
-                    NexusScheduledTasksTable.nextScheduledExecutionSec lessEq 
nowSec
-                }.map {
-                    TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
-                }
+        }
+        val nowSec = Instant.now().epochSecond
+        // Second, find tasks that are due
+        val dueTasks = transaction {
+            NexusScheduledTaskEntity.find {
+                NexusScheduledTasksTable.nextScheduledExecutionSec lessEq 
nowSec
+            }.map {
+                TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
             }
-            // Execute those due tasks
-            dueTasks.forEach {
-                runTask(httpClient, it)
-                transaction {
-                    val t = NexusScheduledTaskEntity.findById(it.taskId)
-                    if (t != null) {
-                        // Reset next scheduled execution
-                        t.nextScheduledExecutionSec = null
-                        t.prevScheduledExecutionSec = nowSec
-                    }
+        } // Execute those due tasks
+        dueTasks.forEach {
+            runTask(httpClient, it)
+            transaction {
+                val t = NexusScheduledTaskEntity.findById(it.taskId)
+                if (t != null) {
+                    // Reset next scheduled execution
+                    t.nextScheduledExecutionSec = null
+                    t.prevScheduledExecutionSec = nowSec
                 }
             }
-
-            // Wait a bit
-            delay(Duration.ofSeconds(1))
         }
+        // Wait a bit
+        delay(Duration.ofSeconds(1))
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]