gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated: implement fancy scheduling


From: gnunet
Subject: [libeufin] branch master updated: implement fancy scheduling
Date: Fri, 19 Jun 2020 12:52:28 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository libeufin.

The following commit(s) were added to refs/heads/master by this push:
     new 88302c6  implement fancy scheduling
88302c6 is described below

commit 88302c6447139a4687c43cc1837bddf3b3eeae06
Author: Florian Dold <florian.dold@gmail.com>
AuthorDate: Fri Jun 19 16:22:19 2020 +0530

    implement fancy scheduling
---
 nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt    |   4 +
 .../main/kotlin/tech/libeufin/nexus/Scheduling.kt  | 151 +++++++++++++--------
 .../tech/libeufin/nexus/bankaccount/BankAccount.kt |  10 +-
 .../tech/libeufin/nexus/server/NexusServer.kt      |  48 +++++--
 4 files changed, 143 insertions(+), 70 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 4e4ec41..1ae35ca 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -343,6 +343,8 @@ object NexusScheduledTasksTable : IntIdTable() {
     val taskType = text("taskType")
     val taskCronspec = text("taskCronspec")
     val taskParams = text("taskParams")
+    val nextScheduledExecutionSec = 
long("nextScheduledExecutionSec").nullable()
+    val prevScheduledExecutionSec = 
long("lastScheduledExecutionSec").nullable()
 }
 
 class NexusScheduledTaskEntity(id: EntityID<Int>) : IntEntity(id) {
@@ -354,6 +356,8 @@ class NexusScheduledTaskEntity(id: EntityID<Int>) : 
IntEntity(id) {
     var taskType by NexusScheduledTasksTable.taskType
     var taskCronspec by NexusScheduledTasksTable.taskCronspec
     var taskParams by NexusScheduledTasksTable.taskParams
+    var nextScheduledExecutionSec by 
NexusScheduledTasksTable.nextScheduledExecutionSec
+    var prevScheduledExecutionSec by 
NexusScheduledTasksTable.prevScheduledExecutionSec
 }
 
 fun dbCreateTables(dbName: String) {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index c530adf..a0ea050 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -19,83 +19,122 @@
 
 package tech.libeufin.nexus
 
+import com.cronutils.model.definition.CronDefinitionBuilder
+import com.cronutils.model.time.ExecutionTime
+import com.cronutils.parser.CronParser
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
 import io.ktor.client.HttpClient
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.time.delay
 import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal
+import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
 import tech.libeufin.nexus.bankaccount.submitAllPaymentInitiations
-import tech.libeufin.nexus.server.FetchLevel
 import tech.libeufin.nexus.server.FetchSpecJson
-import tech.libeufin.nexus.server.FetchSpecLatestJson
-import java.io.PrintWriter
-import java.io.StringWriter
+import java.lang.IllegalArgumentException
 import java.time.Duration
+import java.time.Instant
+import java.time.ZonedDateTime
 
-/** Crawls all the facades, and requests history for each of its creators. */
-suspend fun downloadTalerFacadesTransactions(httpClient: HttpClient, 
fetchSpec: FetchSpecJson) {
-    val work = mutableListOf<Pair<String, String>>()
-    transaction {
-        TalerFacadeStateEntity.all().forEach {
-            logger.debug("Fetching history for facade: ${it.id.value}, bank 
account: ${it.bankAccount}")
-            work.add(Pair(it.facade.creator.id.value, it.bankAccount))
+private data class TaskSchedule(
+    val taskId: Int,
+    val name: String,
+    val type: String,
+    val resourceType: String,
+    val resourceId: String,
+    val params: String
+)
+
+private suspend fun runTask(client: HttpClient, sched: TaskSchedule) {
+    logger.info("running task $sched")
+    try {
+
+        when (sched.resourceType) {
+            "bank-account" -> {
+                when (sched.type) {
+                    "fetch" -> {
+                        @Suppress("BlockingMethodInNonBlockingContext")
+                        val fetchSpec = 
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
+                        fetchBankAccountTransactions(client, fetchSpec, 
sched.resourceId)
+                    }
+                    "submit" -> {
+                        submitAllPaymentInitiations(client, sched.resourceId)
+                    }
+                    else -> {
+                        logger.error("task type ${sched.type} not understood")
+                    }
+                }
+            }
+            else -> logger.error("task on resource ${sched.resourceType} not 
understood")
         }
-    }
-    work.forEach {
-        fetchTransactionsInternal(
-            client = httpClient,
-            fetchSpec = fetchSpec,
-            userId = it.first,
-            accountid = it.second
-        )
+    } catch (e: Exception) {
+        logger.error("Exception during task $sched", e)
     }
 }
 
-
-private inline fun reportAndIgnoreErrors(f: () -> Unit) {
-    try {
-        f()
-    } catch (e: java.lang.Exception) {
-        logger.error("ignoring exception", e)
+object NexusCron {
+    val parser = run {
+        val cronDefinition =
+            CronDefinitionBuilder.defineCron()
+                .withSeconds().and()
+                .withMinutes().optional().and()
+                .withDayOfMonth().optional().and()
+                .withMonth().optional().and()
+                .withDayOfWeek().optional()
+                .and().instance()
+        CronParser(cronDefinition)
     }
 }
 
-fun moreFrequentBackgroundTasks(httpClient: HttpClient) {
+fun startOperationScheduler(httpClient: HttpClient) {
     GlobalScope.launch {
         while (true) {
-            logger.debug("Running more frequent background jobs")
-            reportAndIgnoreErrors {
-                downloadTalerFacadesTransactions(
-                    httpClient,
-                    FetchSpecLatestJson(
-                        FetchLevel.ALL,
-                        null
-                    )
-                )
+            logger.info("running schedule loop")
+
+            // First, assign next execution time stamps to all tasks that need 
them
+            transaction {
+                NexusScheduledTaskEntity.find {
+                    NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+                }.forEach {
+                    val cron = try {
+                        NexusCron.parser.parse(it.taskCronspec)
+                    } catch (e: IllegalArgumentException) {
+                        logger.error("invalid cronspec in schedule 
${it.resourceType}/${it.resourceId}/${it.taskName}")
+                        return@forEach
+                    }
+                    val zonedNow = ZonedDateTime.now()
+                    val et = ExecutionTime.forCron(cron)
+                    val next = et.nextExecution(zonedNow)
+                    logger.info("scheduling task ${it.taskName} at $next (now 
is $zonedNow)")
+                    it.nextScheduledExecutionSec = next.get().toEpochSecond()
+                }
             }
-            // FIXME: should be done automatically after raw ingestion
-            reportAndIgnoreErrors { ingestTalerTransactions() }
-            reportAndIgnoreErrors { submitAllPaymentInitiations(httpClient) }
-            logger.debug("More frequent background jobs done")
-            delay(Duration.ofSeconds(1))
-        }
-    }
-}
 
-fun lessFrequentBackgroundTasks(httpClient: HttpClient) {
-    GlobalScope.launch {
-        while (true) {
-            logger.debug("Less frequent background job")
-            try {
-                //downloadTalerFacadesTransactions(httpClient, "C53")
-            } catch (e: Exception) {
-                val sw = StringWriter()
-                val pw = PrintWriter(sw)
-                e.printStackTrace(pw)
-                logger.info("==== Less frequent background task exception 
====\n${sw}======")
+            val nowSec = Instant.now().epochSecond
+            // Second, find tasks that are due
+            val dueTasks = transaction {
+                NexusScheduledTaskEntity.find {
+                    NexusScheduledTasksTable.nextScheduledExecutionSec lessEq 
nowSec
+                }.map {
+                    TaskSchedule(it.id.value, it.taskName, it.taskType, 
it.resourceType, it.resourceId, it.taskParams)
+                }
             }
-            delay(Duration.ofSeconds(10))
+
+            // Execute those due tasks
+            dueTasks.forEach {
+                runTask(httpClient, it)
+                transaction {
+                    val t = NexusScheduledTaskEntity.findById(it.taskId)
+                    if (t != null) {
+                        // Reset next scheduled execution
+                        t.nextScheduledExecutionSec = null
+                        t.prevScheduledExecutionSec = nowSec
+                    }
+                }
+            }
+
+            // Wait a bit
+            delay(Duration.ofSeconds(1))
         }
     }
 }
diff --git 
a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
index bf672ce..396a45f 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
@@ -58,7 +58,7 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, 
paymentInitiationId:
 /**
  * Submit all pending prepared payments.
  */
-suspend fun submitAllPaymentInitiations(httpClient: HttpClient) {
+suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: 
String) {
     data class Submission(
         val id: Long
     )
@@ -233,10 +233,9 @@ fun addPaymentInitiation(paymentData: Pain001Data, 
debitorAccount: NexusBankAcco
     }
 }
 
-suspend fun fetchTransactionsInternal(
+suspend fun fetchBankAccountTransactions(
     client: HttpClient,
     fetchSpec: FetchSpecJson,
-    userId: String,
     accountid: String
 ) {
     val res = transaction {
@@ -261,18 +260,17 @@ suspend fun fetchTransactionsInternal(
     }
     when (res.connectionType) {
         "ebics" -> {
-            // FIXME(dold): Support fetching not only the latest transactions.
-            // It's not clear what's the nicest way to support this.
             fetchEbicsBySpec(
                 fetchSpec,
                 client,
                 res.connectionName
             )
-            ingestBankMessagesIntoAccount(res.connectionName, accountid)
         }
         else -> throw NexusError(
             HttpStatusCode.BadRequest,
             "Connection type '${res.connectionType}' not implemented"
         )
     }
+    ingestBankMessagesIntoAccount(res.connectionName, accountid)
+    ingestTalerTransactions()
 }
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt 
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index ca8cf5a..64d52ba 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -53,12 +53,13 @@ import org.jetbrains.exposed.sql.transactions.transaction
 import org.slf4j.event.Level
 import tech.libeufin.nexus.*
 import tech.libeufin.nexus.bankaccount.addPaymentInitiation
-import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal
+import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
 import tech.libeufin.nexus.bankaccount.getPaymentInitiation
 import tech.libeufin.nexus.bankaccount.submitPaymentInitiation
 import tech.libeufin.nexus.ebics.*
 import tech.libeufin.util.*
 import tech.libeufin.util.logger
+import java.lang.IllegalArgumentException
 import java.net.URLEncoder
 import java.util.zip.InflaterInputStream
 
@@ -260,8 +261,7 @@ fun serverMain(dbName: String) {
             return@intercept
         }
 
-        lessFrequentBackgroundTasks(client)
-        moreFrequentBackgroundTasks(client)
+        startOperationScheduler(client)
 
         routing {
             // Shows information about the requesting user.
@@ -379,6 +379,11 @@ fun serverMain(dbName: String) {
                     if (bankAccount == null) {
                         throw NexusError(HttpStatusCode.NotFound, "unknown 
bank account")
                     }
+                    try {
+                        NexusCron.parser.parse(schedSpec.cronspec)
+                    } catch (e: IllegalArgumentException) {
+                        throw NexusError(HttpStatusCode.BadRequest, "bad cron 
spec: ${e.message}")
+                    }
                     when (schedSpec.type) {
                         "fetch" -> {
                             val fetchSpec = 
jacksonObjectMapper().treeToValue(schedSpec.params, FetchSpecJson::class.java)
@@ -386,8 +391,18 @@ fun serverMain(dbName: String) {
                                 throw NexusError(HttpStatusCode.BadRequest, 
"bad fetch spec")
                             }
                         }
+                        "submit" -> {}
                         else -> throw NexusError(HttpStatusCode.BadRequest, 
"unsupported task type")
                     }
+                    val oldSchedTask = NexusScheduledTaskEntity.find {
+                        (NexusScheduledTasksTable.taskName eq schedSpec.name) 
and
+                                (NexusScheduledTasksTable.resourceType eq 
"bank-account") and
+                                (NexusScheduledTasksTable.resourceId eq 
accountId)
+
+                    }.firstOrNull()
+                    if (oldSchedTask != null) {
+                        throw NexusError(HttpStatusCode.BadRequest, "schedule 
task already exists")
+                    }
                     NexusScheduledTaskEntity.new {
                         resourceType = "bank-account"
                         resourceId = accountId
@@ -401,12 +416,30 @@ fun serverMain(dbName: String) {
                 call.respond(object { })
             }
 
-            get("/bank-accounts/{accountid}/schedule/{taskid}") {
-
+            get("/bank-accounts/{accountId}/schedule/{taskId}") {
+                call.respond(object { })
             }
 
-            delete("/bank-accounts/{accountid}/schedule/{taskid}") {
+            delete("/bank-accounts/{accountId}/schedule/{taskId}") {
+                logger.info("schedule delete requested")
+                val accountId = ensureNonNull(call.parameters["accountId"])
+                val taskId = ensureNonNull(call.parameters["taskId"])
+                transaction {
+                    val bankAccount = 
NexusBankAccountEntity.findById(accountId)
+                    if (bankAccount == null) {
+                        throw NexusError(HttpStatusCode.NotFound, "unknown 
bank account")
+                    }
+                    val oldSchedTask = NexusScheduledTaskEntity.find {
+                        (NexusScheduledTasksTable.taskName eq taskId) and
+                                (NexusScheduledTasksTable.resourceType eq 
"bank-account") and
+                                (NexusScheduledTasksTable.resourceId eq 
accountId)
 
+                    }.firstOrNull()
+                    if (oldSchedTask != null) {
+                        oldSchedTask.delete()
+                    }
+                }
+                call.respond(object { })
             }
 
             get("/bank-accounts/{accountid}") {
@@ -517,10 +550,9 @@ fun serverMain(dbName: String) {
                         null
                     )
                 }
-                fetchTransactionsInternal(
+                fetchBankAccountTransactions(
                     client,
                     fetchSpec,
-                    user.id.value,
                     accountid
                 )
                 call.respondText("Collection performed")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]