[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] branch master updated: implement fancy scheduling
From: |
gnunet |
Subject: |
[libeufin] branch master updated: implement fancy scheduling |
Date: |
Fri, 19 Jun 2020 12:52:28 +0200 |
This is an automated email from the git hooks/post-receive script.
dold pushed a commit to branch master
in repository libeufin.
The following commit(s) were added to refs/heads/master by this push:
new 88302c6 implement fancy scheduling
88302c6 is described below
commit 88302c6447139a4687c43cc1837bddf3b3eeae06
Author: Florian Dold <florian.dold@gmail.com>
AuthorDate: Fri Jun 19 16:22:19 2020 +0530
implement fancy scheduling
---
nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 4 +
.../main/kotlin/tech/libeufin/nexus/Scheduling.kt | 151 +++++++++++++--------
.../tech/libeufin/nexus/bankaccount/BankAccount.kt | 10 +-
.../tech/libeufin/nexus/server/NexusServer.kt | 48 +++++--
4 files changed, 143 insertions(+), 70 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 4e4ec41..1ae35ca 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -343,6 +343,8 @@ object NexusScheduledTasksTable : IntIdTable() {
val taskType = text("taskType")
val taskCronspec = text("taskCronspec")
val taskParams = text("taskParams")
+ val nextScheduledExecutionSec =
long("nextScheduledExecutionSec").nullable()
+ val prevScheduledExecutionSec =
long("lastScheduledExecutionSec").nullable()
}
class NexusScheduledTaskEntity(id: EntityID<Int>) : IntEntity(id) {
@@ -354,6 +356,8 @@ class NexusScheduledTaskEntity(id: EntityID<Int>) :
IntEntity(id) {
var taskType by NexusScheduledTasksTable.taskType
var taskCronspec by NexusScheduledTasksTable.taskCronspec
var taskParams by NexusScheduledTasksTable.taskParams
+ var nextScheduledExecutionSec by
NexusScheduledTasksTable.nextScheduledExecutionSec
+ var prevScheduledExecutionSec by
NexusScheduledTasksTable.prevScheduledExecutionSec
}
fun dbCreateTables(dbName: String) {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index c530adf..a0ea050 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -19,83 +19,122 @@
package tech.libeufin.nexus
+import com.cronutils.model.definition.CronDefinitionBuilder
+import com.cronutils.model.time.ExecutionTime
+import com.cronutils.parser.CronParser
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.HttpClient
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.delay
import org.jetbrains.exposed.sql.transactions.transaction
-import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal
+import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
import tech.libeufin.nexus.bankaccount.submitAllPaymentInitiations
-import tech.libeufin.nexus.server.FetchLevel
import tech.libeufin.nexus.server.FetchSpecJson
-import tech.libeufin.nexus.server.FetchSpecLatestJson
-import java.io.PrintWriter
-import java.io.StringWriter
+import java.lang.IllegalArgumentException
import java.time.Duration
+import java.time.Instant
+import java.time.ZonedDateTime
-/** Crawls all the facades, and requests history for each of its creators. */
-suspend fun downloadTalerFacadesTransactions(httpClient: HttpClient,
fetchSpec: FetchSpecJson) {
- val work = mutableListOf<Pair<String, String>>()
- transaction {
- TalerFacadeStateEntity.all().forEach {
- logger.debug("Fetching history for facade: ${it.id.value}, bank
account: ${it.bankAccount}")
- work.add(Pair(it.facade.creator.id.value, it.bankAccount))
+private data class TaskSchedule(
+ val taskId: Int,
+ val name: String,
+ val type: String,
+ val resourceType: String,
+ val resourceId: String,
+ val params: String
+)
+
+private suspend fun runTask(client: HttpClient, sched: TaskSchedule) {
+ logger.info("running task $sched")
+ try {
+
+ when (sched.resourceType) {
+ "bank-account" -> {
+ when (sched.type) {
+ "fetch" -> {
+ @Suppress("BlockingMethodInNonBlockingContext")
+ val fetchSpec =
jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
+ fetchBankAccountTransactions(client, fetchSpec,
sched.resourceId)
+ }
+ "submit" -> {
+ submitAllPaymentInitiations(client, sched.resourceId)
+ }
+ else -> {
+ logger.error("task type ${sched.type} not understood")
+ }
+ }
+ }
+ else -> logger.error("task on resource ${sched.resourceType} not
understood")
}
- }
- work.forEach {
- fetchTransactionsInternal(
- client = httpClient,
- fetchSpec = fetchSpec,
- userId = it.first,
- accountid = it.second
- )
+ } catch (e: Exception) {
+ logger.error("Exception during task $sched", e)
}
}
-
-private inline fun reportAndIgnoreErrors(f: () -> Unit) {
- try {
- f()
- } catch (e: java.lang.Exception) {
- logger.error("ignoring exception", e)
+object NexusCron {
+ val parser = run {
+ val cronDefinition =
+ CronDefinitionBuilder.defineCron()
+ .withSeconds().and()
+ .withMinutes().optional().and()
+ .withDayOfMonth().optional().and()
+ .withMonth().optional().and()
+ .withDayOfWeek().optional()
+ .and().instance()
+ CronParser(cronDefinition)
}
}
-fun moreFrequentBackgroundTasks(httpClient: HttpClient) {
+fun startOperationScheduler(httpClient: HttpClient) {
GlobalScope.launch {
while (true) {
- logger.debug("Running more frequent background jobs")
- reportAndIgnoreErrors {
- downloadTalerFacadesTransactions(
- httpClient,
- FetchSpecLatestJson(
- FetchLevel.ALL,
- null
- )
- )
+ logger.info("running schedule loop")
+
+ // First, assign next execution time stamps to all tasks that need
them
+ transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+ }.forEach {
+ val cron = try {
+ NexusCron.parser.parse(it.taskCronspec)
+ } catch (e: IllegalArgumentException) {
+ logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
+ return@forEach
+ }
+ val zonedNow = ZonedDateTime.now()
+ val et = ExecutionTime.forCron(cron)
+ val next = et.nextExecution(zonedNow)
+ logger.info("scheduling task ${it.taskName} at $next (now
is $zonedNow)")
+ it.nextScheduledExecutionSec = next.get().toEpochSecond()
+ }
}
- // FIXME: should be done automatically after raw ingestion
- reportAndIgnoreErrors { ingestTalerTransactions() }
- reportAndIgnoreErrors { submitAllPaymentInitiations(httpClient) }
- logger.debug("More frequent background jobs done")
- delay(Duration.ofSeconds(1))
- }
- }
-}
-fun lessFrequentBackgroundTasks(httpClient: HttpClient) {
- GlobalScope.launch {
- while (true) {
- logger.debug("Less frequent background job")
- try {
- //downloadTalerFacadesTransactions(httpClient, "C53")
- } catch (e: Exception) {
- val sw = StringWriter()
- val pw = PrintWriter(sw)
- e.printStackTrace(pw)
- logger.info("==== Less frequent background task exception
====\n${sw}======")
+ val nowSec = Instant.now().epochSecond
+ // Second, find tasks that are due
+ val dueTasks = transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec lessEq
nowSec
+ }.map {
+ TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
+ }
}
- delay(Duration.ofSeconds(10))
+
+ // Execute those due tasks
+ dueTasks.forEach {
+ runTask(httpClient, it)
+ transaction {
+ val t = NexusScheduledTaskEntity.findById(it.taskId)
+ if (t != null) {
+ // Reset next scheduled execution
+ t.nextScheduledExecutionSec = null
+ t.prevScheduledExecutionSec = nowSec
+ }
+ }
+ }
+
+ // Wait a bit
+ delay(Duration.ofSeconds(1))
}
}
}
diff --git
a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
index bf672ce..396a45f 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
@@ -58,7 +58,7 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient,
paymentInitiationId:
/**
* Submit all pending prepared payments.
*/
-suspend fun submitAllPaymentInitiations(httpClient: HttpClient) {
+suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid:
String) {
data class Submission(
val id: Long
)
@@ -233,10 +233,9 @@ fun addPaymentInitiation(paymentData: Pain001Data,
debitorAccount: NexusBankAcco
}
}
-suspend fun fetchTransactionsInternal(
+suspend fun fetchBankAccountTransactions(
client: HttpClient,
fetchSpec: FetchSpecJson,
- userId: String,
accountid: String
) {
val res = transaction {
@@ -261,18 +260,17 @@ suspend fun fetchTransactionsInternal(
}
when (res.connectionType) {
"ebics" -> {
- // FIXME(dold): Support fetching not only the latest transactions.
- // It's not clear what's the nicest way to support this.
fetchEbicsBySpec(
fetchSpec,
client,
res.connectionName
)
- ingestBankMessagesIntoAccount(res.connectionName, accountid)
}
else -> throw NexusError(
HttpStatusCode.BadRequest,
"Connection type '${res.connectionType}' not implemented"
)
}
+ ingestBankMessagesIntoAccount(res.connectionName, accountid)
+ ingestTalerTransactions()
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index ca8cf5a..64d52ba 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -53,12 +53,13 @@ import org.jetbrains.exposed.sql.transactions.transaction
import org.slf4j.event.Level
import tech.libeufin.nexus.*
import tech.libeufin.nexus.bankaccount.addPaymentInitiation
-import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal
+import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
import tech.libeufin.nexus.bankaccount.getPaymentInitiation
import tech.libeufin.nexus.bankaccount.submitPaymentInitiation
import tech.libeufin.nexus.ebics.*
import tech.libeufin.util.*
import tech.libeufin.util.logger
+import java.lang.IllegalArgumentException
import java.net.URLEncoder
import java.util.zip.InflaterInputStream
@@ -260,8 +261,7 @@ fun serverMain(dbName: String) {
return@intercept
}
- lessFrequentBackgroundTasks(client)
- moreFrequentBackgroundTasks(client)
+ startOperationScheduler(client)
routing {
// Shows information about the requesting user.
@@ -379,6 +379,11 @@ fun serverMain(dbName: String) {
if (bankAccount == null) {
throw NexusError(HttpStatusCode.NotFound, "unknown
bank account")
}
+ try {
+ NexusCron.parser.parse(schedSpec.cronspec)
+ } catch (e: IllegalArgumentException) {
+ throw NexusError(HttpStatusCode.BadRequest, "bad cron
spec: ${e.message}")
+ }
when (schedSpec.type) {
"fetch" -> {
val fetchSpec =
jacksonObjectMapper().treeToValue(schedSpec.params, FetchSpecJson::class.java)
@@ -386,8 +391,18 @@ fun serverMain(dbName: String) {
throw NexusError(HttpStatusCode.BadRequest,
"bad fetch spec")
}
}
+ "submit" -> {}
else -> throw NexusError(HttpStatusCode.BadRequest,
"unsupported task type")
}
+ val oldSchedTask = NexusScheduledTaskEntity.find {
+ (NexusScheduledTasksTable.taskName eq schedSpec.name)
and
+ (NexusScheduledTasksTable.resourceType eq
"bank-account") and
+ (NexusScheduledTasksTable.resourceId eq
accountId)
+
+ }.firstOrNull()
+ if (oldSchedTask != null) {
+ throw NexusError(HttpStatusCode.BadRequest, "schedule
task already exists")
+ }
NexusScheduledTaskEntity.new {
resourceType = "bank-account"
resourceId = accountId
@@ -401,12 +416,30 @@ fun serverMain(dbName: String) {
call.respond(object { })
}
- get("/bank-accounts/{accountid}/schedule/{taskid}") {
-
+ get("/bank-accounts/{accountId}/schedule/{taskId}") {
+ call.respond(object { })
}
- delete("/bank-accounts/{accountid}/schedule/{taskid}") {
+ delete("/bank-accounts/{accountId}/schedule/{taskId}") {
+ logger.info("schedule delete requested")
+ val accountId = ensureNonNull(call.parameters["accountId"])
+ val taskId = ensureNonNull(call.parameters["taskId"])
+ transaction {
+ val bankAccount =
NexusBankAccountEntity.findById(accountId)
+ if (bankAccount == null) {
+ throw NexusError(HttpStatusCode.NotFound, "unknown
bank account")
+ }
+ val oldSchedTask = NexusScheduledTaskEntity.find {
+ (NexusScheduledTasksTable.taskName eq taskId) and
+ (NexusScheduledTasksTable.resourceType eq
"bank-account") and
+ (NexusScheduledTasksTable.resourceId eq
accountId)
+ }.firstOrNull()
+ if (oldSchedTask != null) {
+ oldSchedTask.delete()
+ }
+ }
+ call.respond(object { })
}
get("/bank-accounts/{accountid}") {
@@ -517,10 +550,9 @@ fun serverMain(dbName: String) {
null
)
}
- fetchTransactionsInternal(
+ fetchBankAccountTransactions(
client,
fetchSpec,
- user.id.value,
accountid
)
call.respondText("Collection performed")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [libeufin] branch master updated: implement fancy scheduling,
gnunet <=