[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] branch master updated (94126e8b -> 26d9e2a8)
From: |
gnunet |
Subject: |
[libeufin] branch master updated (94126e8b -> 26d9e2a8) |
Date: |
Mon, 22 May 2023 16:43:02 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a change to branch master
in repository libeufin.
from 94126e8b Removing obsolete part from README
new 6aaf7564 Tx deduplication for x-libeufin-bank.
new 68a60b68 ISO 20022.
new 14718fdb Conversion service.
new b6f550f6 Conversion service tests.
new 384c6cc1 Conversion service.
new 26d9e2a8 Conversion service tests.
The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 9 +-
.../tech/libeufin/nexus/bankaccount/BankAccount.kt | 18 +-
.../tech/libeufin/nexus/ebics/EbicsClient.kt | 11 +-
.../kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt | 28 +-
.../tech/libeufin/nexus/iso20022/Iso20022.kt | 29 +-
.../kotlin/tech/libeufin/nexus/server/Helpers.kt | 10 +-
.../tech/libeufin/nexus/server/NexusServer.kt | 4 +-
.../nexus/xlibeufinbank/XLibeufinBankNexus.kt | 8 +-
nexus/src/test/kotlin/ConversionServiceTest.kt | 338 ++++++++++++++-------
nexus/src/test/kotlin/Iso20022Test.kt | 17 +-
nexus/src/test/kotlin/MakeEnv.kt | 139 ++++++++-
.../tech/libeufin/sandbox/ConversionService.kt | 138 ++++++---
.../src/main/kotlin/tech/libeufin/sandbox/DB.kt | 14 +-
.../tech/libeufin/sandbox/EbicsProtocolBackend.kt | 4 +-
util/src/main/kotlin/Ebics.kt | 1 -
util/src/main/kotlin/XMLUtil.kt | 4 +-
16 files changed, 553 insertions(+), 219 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index a63e6503..0886e177 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -37,6 +37,7 @@ import startServer
import tech.libeufin.nexus.iso20022.NexusPaymentInitiationData
import tech.libeufin.nexus.iso20022.createPain001document
import tech.libeufin.nexus.iso20022.parseCamtMessage
+import tech.libeufin.nexus.server.EbicsDialects
import tech.libeufin.nexus.server.client
import tech.libeufin.nexus.server.nexusApp
import tech.libeufin.util.*
@@ -137,15 +138,15 @@ class ParseCamt : CliktCommand("Parse camt.05x file,
outputs JSON in libEufin in
private val logLevel by option(
help = "Set the log level to: 'off', 'error', 'warn', 'info', 'debug',
'trace', 'all'"
)
- private val withC54 by option(
- help = "Treats the input as camt.054. Without this option, the" +
- " parser expects a camt.052 or camt.053 and handles them
equally."
+ private val withPfDialect by option(
+ help = "Set the dialect to 'pf' (PostFinance). If not given, it
defaults to GLS."
).flag(default = false)
private val filename by argument("FILENAME", "File in CAMT format")
override fun run() {
setLogLevel(logLevel)
val camtText = File(filename).readText(Charsets.UTF_8)
- val res = parseCamtMessage(XMLUtil.parseStringIntoDom(camtText))
+ val dialect = if (withPfDialect) EbicsDialects.POSTFINANCE.dialectName
else null
+ val res = parseCamtMessage(XMLUtil.parseStringIntoDom(camtText),
dialect)
println(jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(res))
}
}
diff --git
a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
index a1576a05..8f1f6ca5 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt
@@ -29,7 +29,7 @@ import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.*
import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.*
-import tech.libeufin.nexus.xlibeufinbank.processXLibeufinBankMessage
+import tech.libeufin.nexus.xlibeufinbank.ingestXLibeufinBankMessage
import tech.libeufin.util.XMLUtil
import tech.libeufin.util.internalServerError
import java.time.Instant
@@ -204,9 +204,13 @@ fun ingestBankMessagesIntoAccount(
}.orderBy(
Pair(NexusBankMessagesTable.id, SortOrder.ASC)
).forEach {
- val processingResult: IngestedTransactionsCount =
when(BankConnectionType.parseBankConnectionType(conn.type)) {
+ val ingestionResult: IngestedTransactionsCount =
when(BankConnectionType.parseBankConnectionType(conn.type)) {
BankConnectionType.EBICS -> {
val camtString = it.message.bytes.toString(Charsets.UTF_8)
+ /**
+ * NOT validating _again_ the camt document because it was
+ * already validate before being stored into the database.
+ */
val doc = XMLUtil.parseStringIntoDom(camtString)
/**
* Calling the CaMt handler. After its return, all the
Neuxs-meaningful
@@ -214,7 +218,7 @@ fun ingestBankMessagesIntoAccount(
* processed by any facade OR simply be communicated to
the CLI via JSON.
*/
try {
- processCamtMessage(
+ ingestCamtMessageIntoAccount(
bankAccountId,
doc,
it.fetchLevel,
@@ -234,7 +238,7 @@ fun ingestBankMessagesIntoAccount(
" be parsed into JSON by the x-libeufin-bank
ingestion.")
throw internalServerError("Could not ingest
x-libeufin-bank messages.")
}
- processXLibeufinBankMessage(
+ ingestXLibeufinBankMessage(
bankAccountId,
jMessage
)
@@ -246,13 +250,13 @@ fun ingestBankMessagesIntoAccount(
* (1) flagged, (2) skipped when this function will run again, and
(3)
* NEVER deleted from the database.
*/
- if (processingResult.newTransactions == -1) {
+ if (ingestionResult.newTransactions == -1) {
it.errors = true
lastId = it.id.value
return@forEach
}
- totalNew += processingResult.newTransactions
- downloadedTransactions += processingResult.downloadedTransactions
+ totalNew += ingestionResult.newTransactions
+ downloadedTransactions += ingestionResult.downloadedTransactions
/**
* Disk-space conservative check: only store if "yes" was
* explicitly set into the environment variable. Any other
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsClient.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsClient.kt
index 382aefc8..0d3aa67b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsClient.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsClient.kt
@@ -78,7 +78,14 @@ private suspend inline fun HttpClient.postToBank(url:
String, body: String): Str
sealed class EbicsDownloadResult
class EbicsDownloadSuccessResult(
- val orderData: ByteArray
+ val orderData: ByteArray,
+ /**
+ * This value points at the EBICS transaction that carried
+ * the order data contained in this structure. That makes
+ * possible to log the EBICS transaction that carried one
+ * invalid order data, for example.
+ */
+ val transactionID: String? = null
) : EbicsDownloadResult()
class EbicsDownloadEmptyResult(
@@ -244,7 +251,7 @@ suspend fun doEbicsDownloadTransaction(
}
}
logger.debug("Bank acknowledges EBICS download receipt. Transaction ID:
$transactionID.")
- return EbicsDownloadSuccessResult(respPayload)
+ return EbicsDownloadSuccessResult(respPayload, transactionID)
}
// Currently only 1-segment requests.
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt
index e84e9846..50578894 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt
@@ -90,18 +90,31 @@ private fun getFetchLevelFromEbicsOrder(ebicsHistoryType:
String): FetchLevel {
}
}
-fun storeCamt(
+// Validate and store the received document for later ingestion.
+private fun validateAndStoreCamt(
bankConnectionId: String,
camt: String,
- fetchLevel: FetchLevel
+ fetchLevel: FetchLevel,
+ transactionID: String? = null // the EBICS transaction that carried this
camt.
) {
- val camt53doc = XMLUtil.parseStringIntoDom(camt)
- val msgId =
camt53doc.pickStringWithRootNs("/*[1]/*[1]/root:GrpHdr/root:MsgId")
+ val camtDoc = try {
+ XMLUtil.parseStringIntoDom(camt)
+ }
+ catch (e: Exception) {
+ throw badGateway("Could not parse camt document from EBICS transaction
$transactionID")
+ }
+ if (!XMLUtil.validateFromDom(camtDoc))
+ throw badGateway("Camt document from EBICS transaction $transactionID
is invalid")
+
+ val msgId =
camtDoc.pickStringWithRootNs("/*[1]/*[1]/root:GrpHdr/root:MsgId")
logger.info("Camt document '$msgId' received via $fetchLevel.")
transaction {
val conn = NexusBankConnectionEntity.findByName(bankConnectionId)
if (conn == null) {
- throw NexusError(HttpStatusCode.InternalServerError, "bank
connection missing")
+ throw NexusError(
+ HttpStatusCode.InternalServerError,
+ "bank connection missing"
+ )
}
val oldMsg = NexusBankMessageEntity.find {
NexusBankMessagesTable.messageId eq msgId }.firstOrNull()
if (oldMsg == null) {
@@ -164,10 +177,11 @@ private suspend fun fetchEbicsC5x(
is EbicsDownloadSuccessResult -> {
response.orderData.unzipWithLambda {
// logger.debug("Camt entry (filename (in the Zip archive):
${it.first}): ${it.second}")
- storeCamt(
+ validateAndStoreCamt(
bankConnectionId,
it.second,
- getFetchLevelFromEbicsOrder(historyType)
+ getFetchLevelFromEbicsOrder(historyType),
+ transactionID = response.transactionID
)
}
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
index fb65f0c5..14e24485 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt
@@ -30,6 +30,7 @@ import CashAccount
import CreditDebitIndicator
import CurrencyAmount
import CurrencyExchange
+import EntryStatus
import GenericId
import OrganizationIdentification
import PartyIdentification
@@ -581,7 +582,7 @@ private fun
XmlElementDestructor.extractInnerBkTxCd(creditDebitIndicator: Credit
return "XTND-NTAV-NTAV"
}
-private fun XmlElementDestructor.extractInnerTransactions(): CamtReport {
+private fun XmlElementDestructor.extractInnerTransactions(dialect: String? =
null): CamtReport {
val account = requireUniqueChildNamed("Acct") { extractAccount() }
val balances = mapEachChildNamed("Bal") {
@@ -613,8 +614,16 @@ private fun
XmlElementDestructor.extractInnerTransactions(): CamtReport {
// multiple money transactions *within* one Ntry element.
val entries = mapEachChildNamed("Ntry") {
val amount = extractCurrencyAmount()
- val status = requireUniqueChildNamed("Sts") { focusElement.textContent
}.let {
- EntryStatus.valueOf(it)
+ val status = requireUniqueChildNamed("Sts") {
+ val textContent = if (dialect ==
EbicsDialects.POSTFINANCE.dialectName) {
+ requireUniqueChildNamed("Cd") {
+ focusElement.textContent
+ }
+ } else
+ focusElement.textContent
+ textContent.let {
+ EntryStatus.valueOf(it)
+ }
}
val creditDebitIndicator = requireUniqueChildNamed("CdtDbtInd") {
focusElement.textContent }.let {
CreditDebitIndicator.valueOf(it)
@@ -677,7 +686,7 @@ private fun
XmlElementDestructor.extractInnerTransactions(): CamtReport {
* Extract a list of transactions from
* an ISO20022 camt.052 / camt.053 message.
*/
-fun parseCamtMessage(doc: Document): CamtParseResult {
+fun parseCamtMessage(doc: Document, dialect: String? = null): CamtParseResult {
return destructXml(doc) {
requireRootElement("Document") {
// Either bank to customer statement or report
@@ -685,17 +694,17 @@ fun parseCamtMessage(doc: Document): CamtParseResult {
when (focusElement.localName) {
"BkToCstmrAcctRpt" -> {
mapEachChildNamed("Rpt") {
- extractInnerTransactions()
+ extractInnerTransactions(dialect)
}
}
"BkToCstmrStmt" -> {
mapEachChildNamed("Stmt") {
- extractInnerTransactions()
+ extractInnerTransactions(dialect)
}
}
"BkToCstmrDbtCdtNtfctn" -> {
mapEachChildNamed("Ntfctn") {
- extractInnerTransactions()
+ extractInnerTransactions(dialect)
}
}
else -> {
@@ -801,7 +810,7 @@ fun extractPaymentUidFromSingleton(
)
throw internalServerError("Internal reconciliation error
(no EndToEndId)")
}
- return
"${PaymentUidQualifiers.NEXUS_GIVEN}:$expectedEndToEndId"
+ return "${PaymentUidQualifiers.USER_GIVEN}:$expectedEndToEndId"
}
// Didn't return/throw before, it must be an incoming payment.
val maybeAcctSvcrRef = tx.details.accountServicerRef
@@ -846,7 +855,7 @@ fun extractPaymentUidFromSingleton(
* case of DBIT transaction.
* - returns a IngestedTransactionCount object.
*/
-fun processCamtMessage(
+fun ingestCamtMessageIntoAccount(
bankAccountId: String,
camtDoc: Document,
fetchLevel: FetchLevel,
@@ -866,7 +875,7 @@ fun processCamtMessage(
if (acct == null) {
throw NexusError(HttpStatusCode.NotFound, "user not found")
}
- val res = try { parseCamtMessage(camtDoc) } catch (e:
CamtParsingError) {
+ val res = try { parseCamtMessage(camtDoc, dialect) } catch (e:
CamtParsingError) {
logger.warn("Invalid CAMT received from bank: ${e.message}")
newTransactions = -1
return@transaction
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
index 5e386c84..a55cbbd2 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt
@@ -1,13 +1,9 @@
package tech.libeufin.nexus.server
-import CamtBankAccountEntry
import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.http.*
-import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
-import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.*
@@ -78,9 +74,9 @@ enum class EbicsDialects(val dialectName: String) {
* enum class. This way, Nexus has more control when it tries
* to locally reconcile payments.
*/
-enum class PaymentUidQualifiers(qualifierName: String) {
- BANK_GIVEN("bank_given"),
- NEXUS_GIVEN("nexus_given")
+enum class PaymentUidQualifiers {
+ BANK_GIVEN,
+ USER_GIVEN
}
// Valid connection types.
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
index fca81b51..21614fbe 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt
@@ -44,7 +44,7 @@ import org.slf4j.event.Level
import tech.libeufin.nexus.*
import tech.libeufin.nexus.bankaccount.*
import tech.libeufin.nexus.ebics.*
-import tech.libeufin.nexus.iso20022.processCamtMessage
+import tech.libeufin.nexus.iso20022.ingestCamtMessageIntoAccount
import tech.libeufin.util.*
import java.net.URLEncoder
@@ -437,7 +437,7 @@ val nexusApp: Application.() -> Unit = {
defaultConn.dialect
} else null
val msgType = ensureNonNull(call.parameters["type"])
- processCamtMessage(
+ ingestCamtMessageIntoAccount(
ensureNonNull(accountId),
XMLUtil.parseStringIntoDom(call.receiveText()),
when(msgType) {
diff --git
a/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt
index a6193a14..cd37862a 100644
---
a/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt
+++
b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt
@@ -19,12 +19,10 @@ import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.util.*
import io.ktor.util.*
-import io.ktor.util.date.*
import org.jetbrains.exposed.sql.statements.api.ExposedBlob
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.*
import tech.libeufin.nexus.bankaccount.*
-import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.*
import tech.libeufin.util.*
import java.net.MalformedURLException
@@ -328,7 +326,7 @@ class XlibeufinBankConnectionProtocol :
BankConnectionProtocol {
* status, since Sandbox has only one (unnamed) transaction state and
* all transactions are asked as reports.
*/
-fun processXLibeufinBankMessage(
+fun ingestXLibeufinBankMessage(
bankAccountId: String,
data: JsonNode
): IngestedTransactionsCount {
@@ -359,7 +357,7 @@ fun processXLibeufinBankMessage(
)
}
// Searching for duplicates.
- if (findDuplicate(bankAccountId, it.uid) != null) {
+ if (findDuplicate(bankAccountId,
"${PaymentUidQualifiers.BANK_GIVEN}:${it.uid}") != null) {
logger.debug("x-libeufin-bank ingestion: transaction ${it.uid} is
a duplicate, skipping.")
return@forEach
}
@@ -377,7 +375,7 @@ fun processXLibeufinBankMessage(
* state.
*/
this.status = EntryStatus.BOOK
- this.accountTransactionId = it.uid
+ this.accountTransactionId =
"${PaymentUidQualifiers.BANK_GIVEN}:${it.uid}"
this.transactionJson = jacksonObjectMapper(
).writeValueAsString(it.exportAsCamtModel())
this.creditDebitIndicator = direction.exportAsCamtDirection()
diff --git a/nexus/src/test/kotlin/ConversionServiceTest.kt
b/nexus/src/test/kotlin/ConversionServiceTest.kt
index 39222d7a..356fcda8 100644
--- a/nexus/src/test/kotlin/ConversionServiceTest.kt
+++ b/nexus/src/test/kotlin/ConversionServiceTest.kt
@@ -1,33 +1,47 @@
-import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.*
+import io.ktor.client.engine.cio.*
import io.ktor.client.engine.mock.*
-import io.ktor.client.plugins.*
import io.ktor.client.request.*
-import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.testing.*
import kotlinx.coroutines.*
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
-import org.junit.Ignore
import org.junit.Test
import tech.libeufin.nexus.server.nexusApp
import tech.libeufin.sandbox.*
import tech.libeufin.util.parseAmount
class ConversionServiceTest {
+ private fun CoroutineScope.launchBuyinMonitor(httpClient: HttpClient): Job
{
+ val job = launch {
+ /**
+ * The runInterruptible wrapper lets code without suspension
+ * points be cancel()'d. Without it, such code would ignore
+ * any call to cancel() and the test never return.
+ */
+ runInterruptible {
+ buyinMonitor(
+ demobankName = "default",
+ accountToCredit = "exchange-0",
+ client = httpClient
+ )
+ }
+ }
+ return job
+ }
/**
- * Testing the buy-in monitor in the normal case: Nexus
- * communicates a new incoming fiat transaction and the
- * monitor wires funds to the exchange.
+ * Testing the buy-in monitor in all the HTTP scenarios,
+ * successful case, client's and server's error cases.
*/
@Test
fun buyinTest() {
- // First create an incoming fiat payment _at Nexus_.
- // This payment is addressed to the Nexus user whose
- // (Nexus) credentials will be used by Sandbox to fetch
- // new incoming fiat payments.
+ // 1, testing the successful case.
+ /* First create an incoming fiat payment _at Nexus_.
+ This payment is addressed to the Nexus user whose
+ (Nexus) credentials will be used by Sandbox to fetch
+ new incoming fiat payments. */
withTestDatabase {
prepSandboxDb(currency = "REGIO")
prepNexusDb()
@@ -46,23 +60,13 @@ class ConversionServiceTest {
)
// Start Nexus, to let it serve the fiat transaction.
testApplication {
+ val client = this.createClient {
+ followRedirects = false
+ }
application(nexusApp)
// Start the buy-in monitor to let it download the fiat
transaction.
runBlocking {
- val job = launch {
- /**
- * The runInterruptible wrapper lets code without
suspension
- * points be cancel()'d. Without it, such code would
ignore
- * any call to cancel() and the test never return.
- */
- runInterruptible {
- buyinMonitor(
- demobankName = "default",
- accountToCredit = "exchange-0",
- client = client
- )
- }
- }
+ val job = launchBuyinMonitor(client)
delay(1000L) // Lets the DB persist.
job.cancelAndJoin()
}
@@ -91,9 +95,103 @@ class ConversionServiceTest {
// and the regional currency.
assert(boughtIn.subject == reservePub && boughtIn.currency ==
"REGIO")
}
+ // 2, testing the client side error case.
+ assertException<BuyinClientError>(
+ {
+ runBlocking {
+ /**
+ * As soon as the buy-in monitor requests again the
history
+ * to Nexus, it'll get 400 from the mock client.
+ */
+ launchBuyinMonitor(getMockedClient {
respondBadRequest() })
+ }
+ }
+ )
+ /**
+ * 3, testing the server side error case. Here the monitor should
+ * NOT throw any error and instead keep operating normally. This
allows
+ * Sandbox to tolerate server errors and retry the requests.
+ */
+ runBlocking {
+ /**
+ * As soon as the buy-in monitor requests again the history
+ * to Nexus, it'll get 500 from the mock client.
+ */
+ val job = launchBuyinMonitor(getMockedClient {
respondError(HttpStatusCode.InternalServerError) })
+ delay(1000L)
+ // Getting here means no exceptions. Can now cancel the
service.
+ job.cancelAndJoin()
+ }
+ /**
+ * 4, testing the unhandled error case. This case is treated
+ * as a client error, to signal the calling logic to intervene.
+ */
+ assertException<BuyinClientError>(
+ {
+ runBlocking {
+ /**
+ * As soon as the buy-in monitor requests again the
history
+ * to Nexus, it'll get 307 from the mock client.
+ */
+ launchBuyinMonitor(getMockedClient { respondRedirect()
})
+ }
+ }
+ )
+ }
+ }
+ private fun CoroutineScope.launchCashoutMonitor(httpClient: HttpClient):
Job {
+ val job = launch {
+ /**
+ * The runInterruptible wrapper lets code without suspension
+ * points be cancel()'d. Without it, such code would ignore
+ * any call to cancel() and the test never return.
+ */
+ runInterruptible {
+ /**
+ * Without the runBlocking wrapper, cashoutMonitor doesn't
+ * compile. That's because it is a 'suspend' function and
+ * it needs a coroutine environment to execute;
runInterruptible
+ * does NOT provide one. Furthermore, replacing runBlocking
+ * with "launch {}" would nullify runInterruptible, due to
other
+ * jobs that cashoutMonitor internally launches and would
escape
+ * the interruptible policy.
+ */
+ runBlocking { cashoutMonitor(httpClient) }
+ }
}
+ return job
}
+ // This function mocks a 500 response to a cash-out request.
+ private fun MockRequestHandleScope.mock500Response(): HttpResponseData {
+ return respondError(HttpStatusCode.InternalServerError)
+ }
+ // This function implements a mock server that checks the currency in the
cash-out request.
+ private suspend fun MockRequestHandleScope.inspectCashoutCurrency(request:
HttpRequestData): HttpResponseData {
+ // Asserting that the currency is indeed the FIAT.
+ return if (request.url.encodedPath ==
"/bank-accounts/foo/payment-initiations" && request.method == HttpMethod.Post) {
+ val body =
jacksonObjectMapper().readTree(request.body.toByteArray())
+ val postedAmount = body.get("amount").asText()
+ assert(parseAmount(postedAmount).currency == "FIAT")
+ respondOk("cash-out-nonce")
+ } else {
+ println("Cash-out monitor wrongly requested to: ${request.url}")
+ // This is a minimal Web server that support only the above
endpoint.
+ respondError(status = HttpStatusCode.NotImplemented)
+ }
+ }
+
+ // Abstracts the mock handler installation.
+ private fun getMockedClient(handler:
MockRequestHandleScope.(HttpRequestData) -> HttpResponseData): HttpClient {
+ return HttpClient(MockEngine) {
+ followRedirects = false
+ engine {
+ addHandler {
+ request -> handler(request)
+ }
+ }
+ }
+ }
/**
* Checks that the cash-out monitor reacts after
* a CRDT transaction arrives at the designated account.
@@ -106,59 +204,37 @@ class ConversionServiceTest {
cashoutCurrency = "FIAT"
)
prepNexusDb()
- wireTransfer(
- debitAccount = "foo",
- creditAccount = "admin",
- subject = "fiat #0",
- amount = "REGIO:3"
- )
testApplication {
+ val client = this.createClient {
+ followRedirects = false
+ }
application(nexusApp)
- /**
- * This construct allows to capture the HTTP request that the
cash-out
- * monitor (that runs in Sandbox) does to Nexus letting check
that the
- * currency mentioned in the fiat payment initiations is
indeed the fiat
- * currency.
- */
+ // Mock server to intercept and inspect the cash-out request.
val checkCurrencyClient = HttpClient(MockEngine) {
+ followRedirects = false
engine {
addHandler {
- request ->
- if (request.url.encodedPath ==
"/bank-accounts/foo/payment-initiations" && request.method == HttpMethod.Post) {
- val body =
jacksonObjectMapper().readTree(request.body.toByteArray())
- val postedAmount = body.get("amount").asText()
- assert(parseAmount(postedAmount).currency ==
"FIAT")
- respondOk("cash-out-nonce")
- } else {
- println("Cash-out monitor wrongly requested
to: ${request.url}")
- // This is a minimal Web server that support
only the above endpoint.
- respondError(status =
HttpStatusCode.NotImplemented)
- }
+ request -> inspectCashoutCurrency(request)
}
}
}
+ // Starting the cash-out monitor with the mocked client.
runBlocking {
- val job = launch {
- /**
- * The runInterruptible wrapper lets code without
suspension
- * points be cancel()'d. Without it, such code would
ignore
- * any call to cancel() and the test never return.
- */
- runInterruptible {
- /**
- * Without the runBlocking wrapper, cashoutMonitor
doesn't
- * compile. That's because it is a 'suspend'
function and
- * it needs a coroutine environment to execute;
runInterruptible
- * does NOT provide one. Furthermore, replacing
runBlocking
- * with "launch {}" would nullify
runInterruptible, due to other
- * jobs that cashoutMonitor internally launches
and would escape
- * the interruptible policy.
- */
- runBlocking { cashoutMonitor(checkCurrencyClient) }
- }
- }
+ var job = launchCashoutMonitor(checkCurrencyClient)
+ // Following are various cases of a cash-out scenario.
+
+ /**
+ * 1, Ordinary/successful case. We test that the
conversion
+ * service sent indeed one request to Nexus and that the
currency
+ * is correct.
+ */
+ wireTransfer(
+ debitAccount = "foo",
+ creditAccount = "admin",
+ subject = "fiat #0",
+ amount = "REGIO:3"
+ )
delay(1000L) // Lets DB persist the information.
- job.cancelAndJoin()
// Checking now the Sandbox side, and namely that one
// cash-out operation got carried out.
transaction {
@@ -170,48 +246,104 @@ class ConversionServiceTest {
*/
assert(op.maybeNexusResposnse == "cash-out-nonce")
}
- }
- }
- }
- }
-
- /**
- * Tests whether the conversion service is able to skip
- * submissions that had problems and proceed to new ones.
- ----------------------------------------------------------
- * Ignoring the test because the new version just fails the
- * process on client side errors. Still however keeping the
- * (ignored) test as a model to create faulty situations.
- */
- @Ignore
- @Test
- fun testWrongSubmissionSkip() {
- withTestDatabase {
- prepSandboxDb(); prepNexusDb()
- val engine400 = MockEngine { respondBadRequest() }
- val mockedClient = HttpClient(engine400)
- runBlocking {
- val monitorJob = async(Dispatchers.IO) {
cashoutMonitor(mockedClient) }
- launch {
+ /* 2, Internal server error case. We test that after
requesting
+ * to a failing Nexus, the last accounted cash-out did NOT
increase.
+ */
+ job.cancelAndJoin()
+ val error500Client = HttpClient(MockEngine) {
+ followRedirects = false
+ engine {
+ addHandler {
+ request -> mock500Response()
+ }
+ }
+ }
+ job = launchCashoutMonitor(error500Client)
+ // Sending a new payment to trigger the conversion service.
wireTransfer(
debitAccount = "foo",
creditAccount = "admin",
- subject = "fiat",
- amount = "TESTKUDOS:3"
+ subject = "fiat #1",
+ amount = "REGIO:2"
)
- // Give enough time to let a flawed monitor submit the
request twice.
- delay(6000)
+ delay(1000L) // Lets the reaction complete.
+ job.cancelAndJoin()
transaction {
- // The request was submitted only once.
- assert(CashoutSubmissionEntity.all().count() == 1L)
- // The monitor marked it as failed.
- assert(CashoutSubmissionEntity.all().first().hasErrors)
- // The submission pointer got advanced by one.
-
assert(getBankAccountFromLabel("admin").lastFiatSubmission?.id?.value == 1L)
+ val bankaccount = getBankAccountFromLabel("admin")
+ // Checks that the counter did NOT increase.
+ assert(bankaccount.lastFiatSubmission?.id?.value == 1L)
}
- monitorJob.cancel()
+ /* Removing now the mocked 500 response and checking that
+ * the problematic cash-out get then sent. */
+ job = launchCashoutMonitor(client) // Should find the non
cashed-out wire transfer and react.
+ delay(1000L) // Lets the reaction complete.
+ job.cancelAndJoin()
+ transaction {
+ val bankaccount = getBankAccountFromLabel("admin")
+ // Checks that the once failing cash-out did go
through.
+ assert(bankaccount.lastFiatSubmission?.subject ==
"fiat #1")
+ }
+ /**
+ * 3, testing the client error case, where
+ * the conversion service is supposed to throw exception.
+ */
+ assertException<CashoutClientError>({
+ runBlocking {
+ launchCashoutMonitor(
+ httpClient = getMockedClient {
+ tech.libeufin.sandbox.logger.debug("MOCK
400")
+ /**
+ * This causes the cash-out request sent
to Nexus to
+ * respond with 400.
+ */
+ respondBadRequest()
+ }
+ )
+ // Triggering now a cash-out operation via a new
wire transfer to admin.
+ wireTransfer(
+ debitAccount = "foo",
+ creditAccount = "admin",
+ subject = "fiat #2",
+ amount = "REGIO:22"
+ )
+ }})
+ /**
+ * 4, checking a redirect response. Because this is an
unhandled
+ * error case, it is treated as a client error. No need
to wire a
+ * new cash-out to trigger a cash-out request, since the
last failed
+ * one will be retried.
+ */
+ assertException<CashoutClientError>({
+ runBlocking {
+ launchCashoutMonitor(
+ getMockedClient {
+ /**
+ * This causes the cash-out request sent
to Nexus to
+ * respond with 307 Temporary Redirect.
+ */
+ respondRedirect()
+ }
+ )
+ }
+ })
+ /* 5, Mocking a network error. The previous failed
cash-out
+ will again trigger the service to POST to Nexus. Here
the
+ monitor tolerates the failure, as it's not due to its
state
+ and should be temporary.
+ */
+ var requestMade = false
+ job = launchCashoutMonitor(
+ getMockedClient {
+ requestMade = true
+ throw Exception("Network Issue.")
+ }
+ )
+ delay(2000L) // Lets the reaction complete.
+ // asserting that the service is still running after the
failed request.
+ assert(requestMade && job.isActive)
+ job.cancelAndJoin()
}
}
}
}
-}
+}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/Iso20022Test.kt
b/nexus/src/test/kotlin/Iso20022Test.kt
index 58777cba..bc5d7379 100644
--- a/nexus/src/test/kotlin/Iso20022Test.kt
+++ b/nexus/src/test/kotlin/Iso20022Test.kt
@@ -9,7 +9,7 @@ import org.junit.Ignore
import org.junit.Test
import org.w3c.dom.Document
import poFiCamt052
-import poFiCamt054
+import poFiCamt054_2019
import prepNexusDb
import tech.libeufin.nexus.bankaccount.getBankAccount
import tech.libeufin.nexus.iso20022.*
@@ -21,13 +21,6 @@ import tech.libeufin.util.DestructionError
import tech.libeufin.util.XMLUtil
import tech.libeufin.util.destructXml
import withTestDatabase
-import java.math.BigDecimal
-import java.time.LocalDateTime
-import java.time.ZoneId
-import java.time.ZoneOffset
-import java.time.ZonedDateTime
-import java.time.format.DateTimeFormatter
-import java.util.TimeZone
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -125,15 +118,15 @@ class Iso20022Test {
@Test
fun parsePoFiCamt054() {
- val doc = XMLUtil.parseStringIntoDom(poFiCamt054)
- parseCamtMessage(doc)
+ val doc = XMLUtil.parseStringIntoDom(poFiCamt054_2019)
+ parseCamtMessage(doc, dialect = "pf")
}
@Test
fun ingestPoFiCamt054() {
- val doc = XMLUtil.parseStringIntoDom(poFiCamt054)
+ val doc = XMLUtil.parseStringIntoDom(poFiCamt054_2019)
withTestDatabase { prepNexusDb()
- processCamtMessage(
+ ingestCamtMessageIntoAccount(
"foo",
doc,
FetchLevel.NOTIFICATION,
diff --git a/nexus/src/test/kotlin/MakeEnv.kt b/nexus/src/test/kotlin/MakeEnv.kt
index c64d20e8..f3c71a11 100644
--- a/nexus/src/test/kotlin/MakeEnv.kt
+++ b/nexus/src/test/kotlin/MakeEnv.kt
@@ -6,7 +6,6 @@ import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.*
import tech.libeufin.nexus.dbCreateTables
import tech.libeufin.nexus.dbDropTables
-import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.BankConnectionType
import tech.libeufin.nexus.server.FetchLevel
import tech.libeufin.nexus.server.FetchSpecAllJson
@@ -495,7 +494,143 @@ fun genNexusIncomingCamt(
)
)
-val poFiCamt054: String = """
+// Comes from a "mit Sammelbuchung" sample.
+// "mit Einzelbuchung" sample didn't have the "Ustrd"
+// See:
https://www.postfinance.ch/de/support/services/dokumente/musterfiles-fuer-geschaeftskunden.html
+val poFiCamt054_2019: String = """
+<?xml version="1.0" encoding="UTF-8"?>
+<Document xmlns="urn:iso:std:iso:20022:tech:xsd:camt.054.001.08"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:iso:std:iso:20022:tech:xsd:camt.054.001.08
file:///C:/Users/burkhalterl/Documents/Musterfiles%20ISOV19/Schemen/camt.054.001.08.xsd">
+ <BkToCstmrDbtCdtNtfctn>
+ <GrpHdr>
+ <MsgId>20200618375204295372463</MsgId>
+ <CreDtTm>2022-03-08T23:31:31</CreDtTm>
+ <MsgPgntn>
+ <PgNb>1</PgNb>
+ <LastPgInd>true</LastPgInd>
+ </MsgPgntn>
+ <AddtlInf>SPS/2.0/PROD</AddtlInf>
+ </GrpHdr>
+ <Ntfctn>
+ <Id>20200618375204295372465</Id>
+ <CreDtTm>2022-03-08T23:31:31</CreDtTm>
+ <FrToDt>
+ <FrDtTm>2022-03-08T00:00:00</FrDtTm>
+ <ToDtTm>2022-03-08T23:59:59</ToDtTm>
+ </FrToDt>
+ <Acct>
+ <Id>
+ <IBAN>${FOO_USER_IBAN}</IBAN>
+ </Id>
+ <Ccy>CHF</Ccy>
+ <Ownr>
+ <Nm>Robert Schneider SA Grands magasins
Biel/Bienne</Nm>
+ </Ownr>
+ </Acct>
+ <Ntry>
+ <NtryRef>CH2909000000250094239</NtryRef>
+ <Amt Ccy="CHF">501.05</Amt>
+ <CdtDbtInd>CRDT</CdtDbtInd>
+ <RvslInd>false</RvslInd>
+ <Sts>
+ <Cd>BOOK</Cd>
+ </Sts>
+ <BookgDt>
+ <Dt>2022-03-08</Dt>
+ </BookgDt>
+ <ValDt>
+ <Dt>2022-03-08</Dt>
+ </ValDt>
+ <AcctSvcrRef>1000000000000000</AcctSvcrRef>
+ <BkTxCd>
+ <Domn>
+ <Cd>PMNT</Cd>
+ <Fmly>
+ <Cd>RCDT</Cd>
+
<SubFmlyCd>AUTT</SubFmlyCd>
+ </Fmly>
+ </Domn>
+ </BkTxCd>
+ <NtryDtls>
+ <Btch>
+ <NbOfTxs>1</NbOfTxs>
+ </Btch>
+ <TxDtls>
+ <Refs>
+
<AcctSvcrRef>2000000000000000</AcctSvcrRef>
+
<InstrId>1006265-25bbb3b1a</InstrId>
+
<EndToEndId>NOTPROVIDED</EndToEndId>
+
<UETR>b009c997-97b3-4a9c-803c-d645a7276b0</UETR>
+ <Prtry>
+ <Tp>00</Tp>
+
<Ref>00000000000000000000020</Ref>
+ </Prtry>
+ </Refs>
+ <Amt Ccy="CHF">501.05</Amt>
+ <CdtDbtInd>CRDT</CdtDbtInd>
+ <BkTxCd>
+ <Domn>
+ <Cd>PMNT</Cd>
+ <Fmly>
+
<Cd>RCDT</Cd>
+
<SubFmlyCd>AUTT</SubFmlyCd>
+ </Fmly>
+ </Domn>
+ </BkTxCd>
+ <RltdPties>
+ <Dbtr>
+ <Pty>
+
<Nm>Bernasconi Maria</Nm>
+
<PstlAdr>
+
<AdrLine>Place de la Gare 12</AdrLine>
+
<AdrLine>2502 Biel/Bienne</AdrLine>
+
</PstlAdr>
+ </Pty>
+ </Dbtr>
+ <DbtrAcct>
+ <Id>
+
<IBAN>CH5109000000250092291</IBAN>
+ </Id>
+ </DbtrAcct>
+ <CdtrAcct>
+ <Id>
+
<IBAN>CH2909000000250094239</IBAN>
+ </Id>
+ </CdtrAcct>
+ </RltdPties>
+ <RltdAgts>
+ <DbtrAgt>
+ <FinInstnId>
+
<BICFI>POFICHBEXXX</BICFI>
+
<Nm>POSTFINANCE AG</Nm>
+
<PstlAdr>
+
<AdrLine>MINGERSTRASSE , 20</AdrLine>
+
<AdrLine>3030 BERN</AdrLine>
+
</PstlAdr>
+ </FinInstnId>
+ </DbtrAgt>
+ </RltdAgts>
+ <RmtInf>
+ <Ustrd>Muster</Ustrd>
+ <Ustrd>
Musterfile</Ustrd>
+ <Strd>
+
<AddtlRmtInf>?REJECT?0</AddtlRmtInf>
+
<AddtlRmtInf>?ERROR?000</AddtlRmtInf>
+ </Strd>
+ </RmtInf>
+ <RltdDts>
+
<AccptncDtTm>2022-03-08T20:00:00</AccptncDtTm>
+ </RltdDts>
+ </TxDtls>
+ </NtryDtls>
+ <AddtlNtryInf>SAMMELGUTSCHRIFT FÜR KONTO:
CH2909000000250094239 VERARBEITUNG VOM 08.03.2022 PAKET ID:
200000000000XXX</AddtlNtryInf>
+ </Ntry>
+ </Ntfctn>
+ </BkToCstmrDbtCdtNtfctn>
+</Document>
+
+""".trimIndent()
+
+val poFiCamt054_2013: String = """
<?xml version="1.0" encoding="UTF-8"?>
<Document xmlns="urn:iso:std:iso:20022:tech:xsd:camt.054.001.04"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:iso:std:iso:20022:tech:xsd:camt.054.001.04
camt.054.001.04.xsd">
<BkToCstmrDbtCdtNtfctn>
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index 31ad5b88..f19718fa 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,15 +1,12 @@
package tech.libeufin.sandbox
import CamtBankAccountEntry
-import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.jsonMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
-import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
@@ -64,6 +61,20 @@ data class NexusTransactions(
val transactions: List<TransactionItem>
)
+/**
+ * This exception signals that the buy-in service could NOT
+ * GET the list of fiat transactions from Nexus due to a client
+ * error. Because this is fatal (e.g. wrong credentials, URL not found..),
+ * the service should be stopped.
+ */
+class BuyinClientError : Exception()
+
+/**
+ * This exception signals that POSTing a cash-out operation
+ * to Nexus failed due to the client. This is a fatal condition
+ * therefore the monitor should be stopped.
+ */
+class CashoutClientError : Exception()
/**
* Executes the 'block' function every 'loopNewReqMs' milliseconds.
* Does not exit/fail the process upon exceptions - just logs them.
@@ -74,11 +85,14 @@ fun downloadLoop(block: () -> Unit) {
runBlocking {
while(true) {
try { block() }
+ catch (e: BuyinClientError) {
+ logger.error("The buy-in monitor had a client error while
GETting new" +
+ " transactions from Neuxs. Stopping it")
+ // Rethrowing and let the caller manage it
+ throw e
+ }
+ // Tolerating any other error type that's not due to the client.
catch (e: Exception) {
- /**
- * Not exiting to tolerate network issues, or optimistically
- * tolerate problems not caused by Sandbox itself.
- */
logger.error("Sandbox fiat-incoming monitor excepted:
${e.message}")
}
delay(newIterationTimeout)
@@ -99,12 +113,21 @@ private fun applyBuyinRatioAndFees(
): BigDecimal =
((amount * ratiosAndFees.buy_at_ratio.toBigDecimal())
- ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits()
+
+private fun ensureDisabledRedirects(client: HttpClient) {
+ client.config {
+ if (followRedirects) throw Exception(
+ "HTTP client follows redirects, please disable."
+ )
+ }
+}
/**
* This function downloads the incoming fiat transactions from Nexus,
* stores them into the database and triggers the related wire transfer
- * to the Taler exchange (to be specified in 'accountToCredit'). In case
- * of errors, it pauses and retries when the server fails, but _fails_ when
- * the client does.
+ * to the Taler exchange (to be specified in 'accountToCredit'). Once
+ * started, this function is not supposed to return, except on _client
+ * side_ errors. On server side errors it pauses and retries. When
+ * it returns, the caller is expected to handle the error.
*/
fun buyinMonitor(
demobankName: String, // used to get config values.
@@ -112,38 +135,60 @@ fun buyinMonitor(
accountToCredit: String,
accountToDebit: String = "admin"
) {
+ ensureDisabledRedirects(client)
val demobank = ensureDemobank(demobankName)
+ /**
+ * Getting the config values to send authenticated requests
+ * to Nexus. Sandbox needs one account at Nexus before being
+ * able to use these values.
+ */
val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl)
val usernameAtNexus =
getConfigValueOrThrow(demobank.config::usernameAtNexus)
val passwordAtNexus =
getConfigValueOrThrow(demobank.config::passwordAtNexus)
+ /**
+ * This is the endpoint where Nexus serves all the transactions that
+ * have ingested from the fiat bank.
+ */
val endpoint = "bank-accounts/$usernameAtNexus/transactions"
val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) +
"?long_poll_ms=$waitTimeout"
// downloadLoop does already try-catch (without failing the process).
downloadLoop {
+ /**
+ * This bank account will act as the debtor, once a new fiat
+ * payment is detected. It's the debtor that pays the related
+ * regional amount to the exchange, in order to start a withdrawal
+ * operation (in regional coins).
+ */
val debitBankAccount = getBankAccountFromLabel(accountToDebit)
+ /**
+ * Setting the 'start' URI param in the following command
+ * lets Sandbox receive only unseen payments from Nexus.
+ */
val uriWithStart =
"$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}"
runBlocking {
// Maybe get new fiat transactions.
- logger.debug("GETting fiat transactions from: ${uriWithStart}")
- val resp = client.get(uriWithStart) { basicAuth(usernameAtNexus,
passwordAtNexus) }
+ logger.debug("GETting fiat transactions from: $uriWithStart")
+ val resp = client.get(uriWithStart) {
+ expectSuccess = false // Avoids excepting on !2xx
+ basicAuth(usernameAtNexus, passwordAtNexus)
+ }
// The server failed, pause and try again
if (resp.status.value.toString().startsWith('5')) {
- logger.error("Buy-in monitor caught a failing to Nexus. Pause
and retry.")
+ logger.error("Buy-in monitor requested to a failing Nexus.
Retry.")
logger.error("Nexus responded: ${resp.bodyAsText()}")
- delay(2000L)
return@runBlocking
}
// The client failed, fail the process.
if (resp.status.value.toString().startsWith('4')) {
- logger.error("Buy-in monitor failed at GETting to Nexus. Fail
Sandbox.")
+ logger.error("Buy-in monitor failed at GETting to Nexus.
Stopping the buy-in monitor.")
logger.error("Nexus responded: ${resp.bodyAsText()}")
- exitProcess(1)
+ throw BuyinClientError()
}
// Expect 200 OK. What if 3xx?
if (resp.status.value != HttpStatusCode.OK.value) {
logger.error("Unhandled response status ${resp.status.value},
failing Sandbox")
- exitProcess(1)
+ throw BuyinClientError()
}
// Nexus responded 200 OK, analyzing the result.
/**
@@ -155,21 +200,9 @@ fun buyinMonitor(
NexusTransactions::class.java
) // errors are logged by the caller (without failing).
respObj.transactions.forEach {
- /**
- * If the payment doesn't contain a reserve public key,
- * continue the iteration with the new payment.
- */
+ // Ignoring payments with an invalid reserved public key.
if
(extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null)
return@forEach
- /**
- * The payment had a reserve public key in the subject, wire
it to
- * the exchange. NOTE: this ensures that all the payments
that the
- * exchange gets will NOT trigger any reimbursement, because
they have
- * a valid reserve public key. Reimbursements would in fact
introduce
- * significant friction, because they need to target _fiat_
bank accounts
- * (the customers'), whereas the entity that _now_ pays the
exchange is
- * "admin", which lives in the regional circuit.
- */
// Extracts the amount and checks it's at most two fractional
digits.
val maybeValidAmount = it.camtData.amount.value
if (!validatePlainAmount(maybeValidAmount)) {
@@ -238,6 +271,7 @@ suspend fun cashoutMonitor(
demobankName: String = "default", // used to get config values.
dbEventTimeout: Long = 0 // 0 waits forever.
) {
+ ensureDisabledRedirects(httpClient)
// Register for a REGIO_TX event.
val eventChannel = buildChannelName(
NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
@@ -266,15 +300,15 @@ suspend fun cashoutMonitor(
val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
val paymentInitEndpoint = nexusBaseUrl.run {
- var ret = this
- if (!ret.endsWith('/'))
- ret += '/'
+ var nexusBaseUrlFromConfig = this
+ if (!nexusBaseUrlFromConfig.endsWith('/'))
+ nexusBaseUrlFromConfig += '/'
/**
* WARNING: Nexus gives the possibility to have bank account names
* DIFFERENT from their owner's username. Sandbox however MUST have
* its Nexus bank account named THE SAME as its username.
*/
- ret + "bank-accounts/$usernameAtNexus/payment-initiations"
+ nexusBaseUrlFromConfig +
"bank-accounts/$usernameAtNexus/payment-initiations"
}
while (true) {
val listenHandle = PostgresListenHandle(eventChannel)
@@ -284,17 +318,24 @@ suspend fun cashoutMonitor(
// arrived _before_ the LISTEN.
var newTxs = getUnsubmittedTransactions(watchedBankAccount)
// Data found, UNLISTEN.
- if (newTxs.isNotEmpty())
+ if (newTxs.isNotEmpty()) {
+ logger.debug("Found cash-out's without waiting any DB event.")
listenHandle.postgresUnlisten()
+ }
// Data not found, wait.
else {
+ logger.debug("Need to wait a DB event for new cash-out's")
val isNotificationArrived =
listenHandle.waitOnIODispatchers(dbEventTimeout)
if (isNotificationArrived && listenHandle.receivedPayload ==
"CRDT")
newTxs = getUnsubmittedTransactions(watchedBankAccount)
}
- if (newTxs.isEmpty())
+ if (newTxs.isEmpty()) {
+ logger.debug("DB event timeout expired")
continue
+ }
+ logger.debug("POSTing new cash-out's")
newTxs.forEach {
+ logger.debug("POSTing cash-out '${it.subject}' to
$paymentInitEndpoint")
val body = object {
/**
* This field is UID of the request _as assigned by the
@@ -306,7 +347,7 @@ suspend fun cashoutMonitor(
val uid = it.accountServicerReference
val iban = it.creditorIban
val bic = it.creditorBic
- val amount = "${config.cashoutCurrency}:${it.amount}" //
FIXME: need fiat currency here.
+ val amount = "${config.cashoutCurrency}:${it.amount}"
val subject = it.subject
val name = it.creditorName
}
@@ -322,25 +363,36 @@ suspend fun cashoutMonitor(
catch (e: Exception) {
logger.error("Cash-out monitor could not reach Nexus. Pause
and retry")
logger.error(e.message)
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000)
return@forEach
}
// Server fault. Pause and retry.
if (resp.status.value.toString().startsWith('5')) {
logger.error("Cash-out monitor POSTed to a failing Nexus.
Pause and retry")
- logger.error(resp.bodyAsText())
+ logger.error("Server responded: ${resp.bodyAsText()}")
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000L)
+ return@forEach
}
// Client fault, fail Sandbox.
if (resp.status.value.toString().startsWith('4')) {
- logger.error("Cash-out monitor failed at POSTing to Nexus.
Fail Sandbox")
+ logger.error("Cash-out monitor failed at POSTing to Nexus.")
logger.error("Nexus responded: ${resp.bodyAsText()}")
- exitProcess(1)
+ throw CashoutClientError()
}
// Expecting 200 OK. What if 3xx?
if (resp.status.value != HttpStatusCode.OK.value) {
- logger.error("Cash-out monitor, unhandled response status:
${resp.status.value}. Fail Sandbox")
- exitProcess(1)
+ logger.error("Cash-out monitor, unhandled response status:
${resp.status.value}.")
+ throw CashoutClientError()
}
// Successful case, mark the wire transfer as submitted,
// and advance the pointer to the last submitted payment.
@@ -348,9 +400,7 @@ suspend fun cashoutMonitor(
transaction {
CashoutSubmissionEntity.new {
localTransaction = it.id
- hasErrors = false
submissionTime = resp.responseTime.timestamp
- isSubmitted = true
/**
* The following block associates the submitted payment
* to the UID that Nexus assigned to it. It is currently
not
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index 57e00f77..d9fb393b 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -648,17 +648,13 @@ object BankAccountReportsTable : IntIdTable() {
}
/**
- * This table tracks the submissions of fiat payment instructions
- * that Sandbox sends to Nexus. Every fiat payment instruction is
- * related to a confirmed cash-out operation. The cash-out confirmation
- * is effective once the customer sends a local wire transfer to the
- * "admin" bank account. Such wire transfer is tracked by the
'localTransaction'
- * column.
+ * This table tracks the cash-out requests that Sandbox sends to Nexus.
+ * Only successful requests make it to this table. Failed request would
+ * either _stop_ the conversion service (for client-side errors) or get retried
+ * at a later time (for server-side errors.)
*/
object CashoutSubmissionsTable: LongIdTable() {
val localTransaction = reference("localTransaction",
BankAccountTransactionsTable).uniqueIndex()
- val isSubmitted = bool("isSubmitted").default(false)
- val hasErrors = bool("hasErrors")
val maybeNexusResponse = text("maybeNexusResponse").nullable()
val submissionTime = long("submissionTime").nullable() // failed don't
have it.
}
@@ -666,8 +662,6 @@ object CashoutSubmissionsTable: LongIdTable() {
class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) {
companion object :
LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable)
var localTransaction by CashoutSubmissionsTable.localTransaction
- var isSubmitted by CashoutSubmissionsTable.isSubmitted
- var hasErrors by CashoutSubmissionsTable.hasErrors
var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse
var submissionTime by CashoutSubmissionsTable.submissionTime
}
diff --git
a/sandbox/src/main/kotlin/tech/libeufin/sandbox/EbicsProtocolBackend.kt
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/EbicsProtocolBackend.kt
index d79d0ab2..fc240963 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/EbicsProtocolBackend.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/EbicsProtocolBackend.kt
@@ -1017,8 +1017,8 @@ private fun ensureEbicsHost(requestHostID: String):
EbicsHostPublicInfo {
}
fun receiveEbicsXmlInternal(xmlData: String): Document {
// logger.debug("Data received: $xmlData")
- val requestDocument: Document? = XMLUtil.parseStringIntoDom(xmlData)
- if (requestDocument == null ||
(!XMLUtil.validateFromDom(requestDocument))) {
+ val requestDocument: Document = XMLUtil.parseStringIntoDom(xmlData)
+ if (!XMLUtil.validateFromDom(requestDocument)) {
println("Problematic document was: $requestDocument")
throw EbicsInvalidXmlError()
}
diff --git a/util/src/main/kotlin/Ebics.kt b/util/src/main/kotlin/Ebics.kt
index 737039a6..da814d5d 100644
--- a/util/src/main/kotlin/Ebics.kt
+++ b/util/src/main/kotlin/Ebics.kt
@@ -264,7 +264,6 @@ fun createEbicsRequestForUploadInitialization(
return XMLUtil.convertDomToString(doc)
}
-
fun createEbicsRequestForDownloadInitialization(
subscriberDetails: EbicsClientSubscriberDetails,
orderType: String,
diff --git a/util/src/main/kotlin/XMLUtil.kt b/util/src/main/kotlin/XMLUtil.kt
index e9074cb4..aa96e00f 100644
--- a/util/src/main/kotlin/XMLUtil.kt
+++ b/util/src/main/kotlin/XMLUtil.kt
@@ -225,6 +225,8 @@ class XMLUtil private constructor() {
}
}
}
+
+ // FIXME: need here the "2019" Swiss versions of camt and pain.
val schemaInputs: Array<Source> = listOf(
"xsd/ebics_H004.xsd",
"xsd/ebics_hev.xsd",
@@ -232,7 +234,7 @@ class XMLUtil private constructor() {
"xsd/camt.053.001.02.xsd",
"xsd/camt.054.001.02.xsd",
"xsd/pain.001.001.03.xsd",
- "xsd/pain.001.001.03.ch.02.xsd"
+ "xsd/pain.001.001.03.ch.02.xsd" // Swiss 2013 version.
).map {
val stream =
classLoader.getResourceAsStream(it) ?: throw
FileNotFoundException("Schema file $it not found.")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (94126e8b -> 26d9e2a8),
gnunet <=
- [libeufin] 03/06: Conversion service., gnunet, 2023/05/22
- [libeufin] 06/06: Conversion service tests., gnunet, 2023/05/22
- [libeufin] 04/06: Conversion service tests., gnunet, 2023/05/22
- [libeufin] 01/06: Tx deduplication for x-libeufin-bank., gnunet, 2023/05/22
- [libeufin] 05/06: Conversion service., gnunet, 2023/05/22
- [libeufin] 02/06: ISO 20022., gnunet, 2023/05/22