[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 05/05: Long-polling.
From: |
gnunet |
Subject: |
[libeufin] 05/05: Long-polling. |
Date: |
Fri, 10 Mar 2023 17:44:35 +0100 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit cc87817b4adab5172aa7cb3a86580892cfc71c56
Author: MS <ms@taler.net>
AuthorDate: Fri Mar 10 17:43:23 2023 +0100
Long-polling.
Using DB notifications for /history/incoming at
the Taler facade.
---
nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt | 12 +-
nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 3 +-
nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 161 +++++++++++++++++----
.../tech/libeufin/nexus/server/NexusServer.kt | 1 -
4 files changed, 144 insertions(+), 33 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
index 3decc126..bdaa0ec3 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt
@@ -60,13 +60,13 @@ fun findPermission(p: Permission): NexusPermissionEntity? {
* Require that the authenticated user has at least one of the listed
permissions.
*
* Throws a NexusError if the authenticated user for the request doesn't have
any of
- * listed the permissions.
+ * listed the permissions. It returns the username of the authorized user.
*/
-fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery) {
- transaction {
+fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery):
String {
+ val username = transaction {
val user = authenticateRequest(this@requirePermission)
if (user.superuser) {
- return@transaction
+ return@transaction user.username
}
var foundPermission = false
for (pr in perms) {
@@ -82,8 +82,10 @@ fun ApplicationRequest.requirePermission(vararg perms:
PermissionQuery) {
perms.joinToString(" | ") { "${it.resourceId}
${it.resourceType} ${it.permissionName}" }
throw NexusError(
HttpStatusCode.Forbidden,
- "User ${user.id.value} has insufficient permissions (needs
$possiblePerms."
+ "User ${user.username} has insufficient permissions (needs
$possiblePerms)."
)
}
+ user.username
}
+ return username
}
\ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index b1ddbd17..f8ce7bd4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -29,6 +29,7 @@ import tech.libeufin.nexus.iso20022.EntryStatus
import tech.libeufin.util.EbicsInitState
import java.sql.Connection
+
/**
* This table holds the values that exchange gave to issue a payment,
* plus a reference to the prepared pain.001 version of. Note that
@@ -101,7 +102,6 @@ object TalerIncomingPaymentsTable : LongIdTable() {
val debtorPaytoUri = text("incomingPaytoUri")
}
-
class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) {
companion object :
LongEntityClass<TalerIncomingPaymentEntity>(TalerIncomingPaymentsTable)
@@ -343,7 +343,6 @@ class EbicsSubscriberEntity(id: EntityID<Long>) :
LongEntity(id) {
}
object NexusUsersTable : LongIdTable() {
-
val username = text("username")
val passwordHash = text("password")
val superuser = bool("superuser")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index 52dca293..c9e73662 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -19,6 +19,7 @@
package tech.libeufin.nexus
+import UtilError
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.server.application.ApplicationCall
import io.ktor.server.application.call
@@ -33,16 +34,24 @@ import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.util.*
+import io.ktor.util.*
+import kotlinx.coroutines.*
+import net.taler.wallet.crypto.Base32Crockford
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.sql.*
+import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
+import org.postgresql.jdbc.PgConnection
import tech.libeufin.nexus.bankaccount.addPaymentInitiation
import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
+import tech.libeufin.nexus.bankaccount.getBankAccount
import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.*
import tech.libeufin.util.*
import java.net.URL
+import java.util.concurrent.atomic.AtomicReference
+import javax.xml.crypto.Data
import kotlin.math.abs
import kotlin.math.min
@@ -66,7 +75,8 @@ data class TalerTransferResponse(
)
/**
- * History accounting data structures
+ * History accounting data structures, typically
+ * used to build JSON responses.
*/
data class TalerIncomingBankTransaction(
val row_id: Long,
@@ -122,21 +132,16 @@ fun getComparisonOperator(delta: Int, start: Long, table:
IdTable<Long>): Op<Boo
}
fun expectLong(param: String?): Long {
- if (param == null) {
- throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not
Long")
- }
- return try {
- param.toLong()
- } catch (e: Exception) {
- throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not
Long")
+ if (param == null) throw badRequest("'$param' is not Long")
+ return try { param.toLong() } catch (e: Exception) {
+ throw badRequest("'$param' is not Long")
}
}
-/** Helper handling 'start' being optional and its dependence on 'delta'. */
+// Helper handling 'start' being optional and its dependence on 'delta'.
fun handleStartArgument(start: String?, delta: Int): Long {
if (start == null) {
- if (delta >= 0)
- return -1
+ if (delta >= 0) return -1
return Long.MAX_VALUE
}
return expectLong(start)
@@ -307,8 +312,35 @@ fun talerFilter(
reservePublicKey = reservePub
timestampMs = System.currentTimeMillis()
debtorPaytoUri = buildIbanPaytoUri(
- debtorIban, debtorAgent.bic, debtorName
+ debtorIban,
+ debtorAgent.bic,
+ debtorName
+ )
+ }
+ val dbTx = TransactionManager.currentOrNull() ?: throw NexusError(
+ HttpStatusCode.InternalServerError,
+ "talerFilter(): unexpected execution out of a DB transaction"
+ )
+ /**
+ * Without COMMIT here, the woken up LISTENer won't
+ * find the record in the database.
+ */
+ dbTx.commit()
+ // Only supporting Postgres' NOTIFY.
+ if (dbTx.isPostgres()) {
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+ payment.bankAccount.iban
+ )
+ logger.debug("NOTIFYing on domain" +
+ " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+ " for IBAN: ${payment.bankAccount.iban}. Resulting channel" +
+ " name: $channelName.")
+ val notifyHandle = PostgresListenNotify(
+ dbTx.getPgConnection(),
+ channelName
)
+ notifyHandle.postgresNotify()
}
}
@@ -445,27 +477,106 @@ private suspend fun historyOutgoing(call:
ApplicationCall) {
)
}
-/**
- * Handle a /taler-wire-gateway/history/incoming request.
- */
+// Handle a /taler-wire-gateway/history/incoming request.
private suspend fun historyIncoming(call: ApplicationCall) {
val facadeId = expectNonNull(call.parameters["fcid"])
- call.request.requirePermission(PermissionQuery("facade", facadeId,
"facade.talerwiregateway.history"))
+ val username = call.request.requirePermission(
+ PermissionQuery(
+ "facade",
+ facadeId,
+ "facade.talerwiregateway.history"
+ )
+ )
+ val longPollTimeoutPar = call.parameters["long_poll_ms"]
+ val longPollTimeout = if (longPollTimeoutPar != null) {
+ val longPollTimeoutValue = try { longPollTimeoutPar.toLong() }
+ catch (e: Exception) {
+ throw badRequest("long_poll_ms value is invalid")
+ }
+ longPollTimeoutValue
+ } else null
val param = call.expectUrlParameter("delta")
- val delta: Int = try {
- param.toInt()
- } catch (e: Exception) {
+ val delta: Int = try { param.toInt() } catch (e: Exception) {
throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not
Int")
}
val start: Long =
handleStartArgument(call.request.queryParameters["start"], delta)
val history = TalerIncomingHistory()
val startCmpOp = getComparisonOperator(delta, start,
TalerIncomingPaymentsTable)
- transaction {
- val orderedPayments = TalerIncomingPaymentEntity.find {
- startCmpOp
- }.orderTaler(delta)
- if (orderedPayments.isNotEmpty()) {
- orderedPayments.subList(0, min(abs(delta),
orderedPayments.size)).forEach {
+ /**
+ * The following block checks first for results, and then LISTEN
+ * _only if_ the client gave the long_poll_ms parameter.
+ */
+ var resultOrWait: Pair<
+ List<TalerIncomingPaymentEntity>,
+ PostgresListenNotify?
+ > = transaction {
+ val res = TalerIncomingPaymentEntity.find { startCmpOp
}.orderTaler(delta)
+ // Register to Postgres notifications, if no results arrived.
+ if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) {
+ // Getting the IBAN to build the unique channel name.
+ val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId
}.firstOrNull()
+ if (f == null) throw internalServerError(
+ "Handling request for facade '$facadeId', but that's not found
in the database."
+ )
+ val fState = FacadeStateEntity.find {
+ FacadeStateTable.facade eq f.id.value
+ }.firstOrNull()
+ if (fState == null) throw internalServerError(
+ "Facade '$facadeId' exist but has no state."
+ )
+ val bankAccount = getBankAccount(fState.bankAccount)
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+ bankAccount.iban
+ )
+ logger.debug("LISTENing on domain " +
+ "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
+ " for IBAN: ${bankAccount.iban} with timeout:
$longPollTimeoutPar." +
+ " Resulting channel name: $channelName"
+ )
+ val listenHandle = PostgresListenNotify(
+ this.getPgConnection(),
+ channelName
+ )
+ listenHandle.postrgesListen()
+ return@transaction Pair(res, listenHandle)
+ }
+ Pair(res, null)
+ }
+ /**
+ * Wait here by releasing the execution, or proceed to response if didn't
sleep.
+ * The right condition only silences the compiler, because when the
timeout is null
+ * the left condition is always false (no listen-notify object.)
+ */
+ if (resultOrWait.second != null && longPollTimeout != null) {
+ logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar
ms")
+ val listenHandle = resultOrWait.second!!
+ val notificationArrived =
listenHandle.postgresWaitNotification(longPollTimeout)
+ if (notificationArrived) {
+ val likelyNewPayments = transaction {
+ // addLogger(StdOutSqlLogger)
+ TalerIncomingPaymentEntity.find { startCmpOp
}.orderTaler(delta)
+ }
+ /**
+ * NOTE: the query can still have zero results despite the
+ * notification. That happens when the 'start' URI param is
+ * higher than the ID of the new row in the database. Not
+ * an error.
+ */
+ resultOrWait = Pair(likelyNewPayments, null)
+ }
+ }
+ /**
+ * Whether because of a timeout or a notification or of never slept, here
it
+ * proceeds to the response (== resultOrWait.first IS EFFECTIVE).
+ */
+ val maybeNewPayments = resultOrWait.first
+ if (maybeNewPayments.isNotEmpty()) {
+ transaction {
+ maybeNewPayments.subList(
+ 0,
+ min(abs(delta), maybeNewPayments.size)
+ ).forEach {
history.incoming_transactions.add(
TalerIncomingBankTransaction(
// Rounded timestamp
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index 79ab9adf..2632100f 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -57,7 +57,6 @@ import tech.libeufin.util.*
import java.net.BindException
import java.net.URLEncoder
import kotlin.system.exitProcess
-
/**
* Return facade state depending on the type.
*/
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.