Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ springOutboxPublisher.publish(
> **Note:** Spring and Kafka versions are not forced by okapi — you control them.
> Okapi uses plain JDBC internally — it works with any `PlatformTransactionManager` (JPA, JDBC, jOOQ, Exposed, etc.).

`okapi-spring-boot` requires a `TransactionRunner` bean to bracket each scheduler tick in a transaction. The autoconfiguration derives one from any `PlatformTransactionManager` on the classpath (`spring-boot-starter-jdbc` or `spring-boot-starter-data-jpa` provide one out of the box) — no extra wiring needed in typical setups. If your application has no `PlatformTransactionManager` (single-instance, no transaction infrastructure) you must opt in explicitly:

```kotlin
@Bean
fun outboxTransactionRunner(): TransactionRunner = object : TransactionRunner {
override fun <T> runInTransaction(block: () -> T): T = block()
}
```

Without bracketing, `FOR UPDATE SKIP LOCKED` collapses to the single SELECT statement under JDBC auto-commit, which silently allows duplicate delivery across processor instances. This opt-in is intentionally manual to keep accidental misconfiguration out of multi-instance deployments.

## How It Works

Okapi implements the [transactional outbox pattern](https://softwaremill.com/microservices-101/) (see also: [microservices.io description](https://microservices.io/patterns/data/transactional-outbox.html)):
Expand Down
7 changes: 7 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ h2 = "2.4.240"
micrometer = "1.16.5"
jmh = "1.37"
jmhGradlePlugin = "0.7.3"
# Hibernate version aligned with what Spring 7.x targets (Hibernate 7.0 ORM); only used in the
# integration-tests module to exercise JpaTransactionManager fail-fast extraction.
hibernate = "7.1.4.Final"

[libraries]
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
Expand All @@ -44,10 +47,14 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka
springContext = { module = "org.springframework:spring-context", version.ref = "spring" }
springTx = { module = "org.springframework:spring-tx", version.ref = "spring" }
springJdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring" }
springOrm = { module = "org.springframework:spring-orm", version.ref = "spring" }
hibernateCore = { module = "org.hibernate.orm:hibernate-core", version.ref = "hibernate" }
springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" }
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
springBootStarterActuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springBoot" }
# Spring Boot 4.0 split TransactionAutoConfiguration into a dedicated module (was in spring-boot-autoconfigure in 3.x).
springBootTransaction = { module = "org.springframework.boot:spring-boot-transaction", version.ref = "springBoot" }
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" }
micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" }
Expand Down
13 changes: 13 additions & 0 deletions okapi-integration-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,18 @@ dependencies {
testImplementation(libs.springContext)
testImplementation(libs.springTx)
testImplementation(libs.springBootAutoconfigure)
testImplementation(libs.springBootTest)
testImplementation(libs.springJdbc)
// Spring Boot 4.x doesn't pull AssertJ transitively but ApplicationContextRunner needs it
testImplementation(libs.assertjCore)

// Exposed-Spring bridge (proves autoconfig works with non-DataSourceTransactionManager PTMs)
testImplementation(libs.exposedCore)
testImplementation(libs.exposedJdbc)
testImplementation(libs.exposedSpringTransaction)

// JPA + Hibernate — proves extractDataSource() pulls JpaTransactionManager.getDataSource()
// and validatePtmDataSourceMatch fails fast on PTM↔DataSource mismatch under JPA.
testImplementation(libs.springOrm)
testImplementation(libs.hibernateCore)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,26 @@ class PostgresTestSupport {
}
}

private fun runLiquibase() {
val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password)
val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection))
Liquibase("com/softwaremill/okapi/db/postgres/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") }
connection.close()
private fun runLiquibase() = runOkapiLiquibaseOn(container)
}

/**
* Applies okapi's PostgreSQL Liquibase changelog to the given container. For tests that manage
* their own PostgreSQL containers (e.g. 2-DataSource setups) and can't use the single-container
* `PostgresTestSupport` class.
*/
fun runOkapiLiquibaseOn(container: PostgreSQLContainer<Nothing>) {
DriverManager.getConnection(container.jdbcUrl, container.username, container.password).use { conn ->
val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(conn))
Liquibase("com/softwaremill/okapi/db/postgres/changelog.xml", ClassLoaderResourceAccessor(), db).use {
it.update("")
}
}
}

/** Builds a plain `PGSimpleDataSource` pointing at the given container. */
fun pgDataSourceOf(container: PostgreSQLContainer<Nothing>): DataSource = PGSimpleDataSource().apply {
setURL(container.jdbcUrl)
user = container.username
password = container.password
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package com.softwaremill.okapi.test.transaction

import com.softwaremill.okapi.core.DeliveryInfo
import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxMessage
import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.TransactionRunner
import com.softwaremill.okapi.postgres.PostgresOutboxStore
import com.softwaremill.okapi.springboot.OkapiLiquibaseAutoConfiguration
import com.softwaremill.okapi.springboot.OutboxAutoConfiguration
import com.softwaremill.okapi.springboot.SpringConnectionProvider
import com.softwaremill.okapi.springboot.SpringOutboxPublisher
import com.softwaremill.okapi.springboot.SpringTransactionRunner
import com.softwaremill.okapi.test.support.CountingDataSource
import com.softwaremill.okapi.test.support.RecordingMessageDeliverer
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager
import org.postgresql.ds.PGSimpleDataSource
import org.springframework.boot.autoconfigure.AutoConfigurations
import org.springframework.boot.test.context.runner.ApplicationContextRunner
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.support.TransactionTemplate
import org.testcontainers.containers.PostgreSQLContainer
import java.time.Clock
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import javax.sql.DataSource

/**
* Proves [OutboxAutoConfiguration]'s `okapiTransactionRunner` bean factory works with a
* non-`DataSourceTransactionManager` `PlatformTransactionManager` — specifically Exposed's
* `SpringTransactionManager` bridge. If the autoconfig assumed DST, this test would fail.
*
* Two assertions:
* 1. With processor disabled: a single Spring TX wrapping `springOutboxPublisher.publish()`
* borrows exactly one physical connection from the pool. This proves the autoconfig-built
* runner correctly bridges to `SpringConnectionProvider` through Spring's `ConnectionHolder`.
* 2. With processor enabled (200ms tick): an entry published inside a Spring TX is later
* delivered by the background scheduler — proving each scheduler tick is itself bracketed
* by a Spring TX driven by the bridged PTM.
*
* Liquibase schema migration is handled by `okapi-spring-boot` autoconfiguration; no manual
* setup needed.
*/
class ExposedSpringBridgeEndToEndTest : FunSpec({

val container = PostgreSQLContainer<Nothing>("postgres:16")
lateinit var counter: CountingDataSource

beforeSpec {
container.start()
val raw = PGSimpleDataSource().apply {
setURL(container.jdbcUrl)
user = container.username
password = container.password
}
counter = CountingDataSource(raw)
}

afterSpec { container.stop() }

// Both okapi-postgres and okapi-mysql are on the test classpath (shared integration-tests
// module). Explicitly register PostgresOutboxStore so the autoconfig's MySQL path is
// unambiguously skipped — otherwise MySQL's `FOR UPDATE SKIP LOCKED` with `FORCE INDEX`
// hint would fail on Postgres.
fun runner(recorder: RecordingMessageDeliverer): ApplicationContextRunner = ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiLiquibaseAutoConfiguration::class.java))
.withBean(DataSource::class.java, { counter as DataSource })
.withBean(MessageDeliverer::class.java, { recorder })
.withBean(PlatformTransactionManager::class.java, { SpringTransactionManager(counter) })
.withBean(PostgresOutboxStore::class.java, {
PostgresOutboxStore(SpringConnectionProvider(counter), Clock.systemUTC())
})

test("publish inside Spring TX driven by Exposed-bridge PTM uses a single physical connection") {
// Disable processor only — purger is left at its default 1h interval so it never ticks
// during this test, but its enabled=true keeps `okapiTransactionRunner` factory active
// (the factory is gated on at least one scheduler being enabled).
runner(RecordingMessageDeliverer())
.withPropertyValues("okapi.processor.enabled=false")
.run { ctx ->
ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf<SpringTransactionRunner>()

resetCounterAndTruncate(counter)

val publisher = ctx.getBean(SpringOutboxPublisher::class.java)
val tm = ctx.getBean(PlatformTransactionManager::class.java)

TransactionTemplate(tm).execute {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"abc-123"}"""),
RecordingDeliveryInfo,
)
}

counter.opened.get() shouldBe counter.closed.get()
counter.opened.get() shouldBe 1
}
}

test("processor tick under Exposed-bridge PTM claims and delivers a published entry") {
val recorder = RecordingMessageDeliverer()
runner(recorder)
.withPropertyValues("okapi.processor.interval=200ms")
.run { ctx ->
resetCounterAndTruncate(counter)

val publisher = ctx.getBean(SpringOutboxPublisher::class.java)
val tm = ctx.getBean(PlatformTransactionManager::class.java)

TransactionTemplate(tm).execute {
publisher.publish(
OutboxMessage("order.created", """{"orderId":"xyz-789"}"""),
RecordingDeliveryInfo,
)
}

val deadline = System.currentTimeMillis() + 5_000
while (recorder.deliveryCount() == 0 && System.currentTimeMillis() < deadline) {
Thread.sleep(50)
}
recorder.deliveryCount() shouldBe 1
}
}

// The single-process happy-path tests above would silently pass even if the autoconfig had
// re-introduced the auto-commit fallback (the bug KOJAK-67 fixes). This test exercises
// contention: 5 concurrent processor invocations against 50 published entries, each tick
// bracketed by the autoconfig-built TransactionRunner. With proper TX bracketing the
// FOR UPDATE SKIP LOCKED rows stay locked across claim+update — no amplification. With a
// no-op TR (or auto-commit fallback) the lock is released between claim and update,
// multiple processors deliver the same entry, and `assertNoAmplification` throws.
test("autoconfig-built TransactionRunner prevents delivery amplification under concurrent processor invocations") {
val recorder = RecordingMessageDeliverer()
runner(recorder)
// Disable processor only — purger stays at its default 1h interval (won't fire in test)
// but keeps `okapiTransactionRunner` factory active.
.withPropertyValues("okapi.processor.enabled=false")
.run { ctx ->
resetCounterAndTruncate(counter)

val publisher = ctx.getBean(SpringOutboxPublisher::class.java)
val tm = ctx.getBean(PlatformTransactionManager::class.java)
val processor = ctx.getBean(OutboxProcessor::class.java)
val transactionRunner = ctx.getBean(TransactionRunner::class.java)

val entryCount = 50
val processorCount = 5

repeat(entryCount) { i ->
TransactionTemplate(tm).execute {
publisher.publish(
OutboxMessage("test.event", """{"i":$i}"""),
RecordingDeliveryInfo,
)
}
}

val barrier = CyclicBarrier(processorCount)
val executor = Executors.newVirtualThreadPerTaskExecutor()
val futures = (1..processorCount).map {
CompletableFuture.supplyAsync(
{
barrier.await(10, TimeUnit.SECONDS)
transactionRunner.runInTransaction { processor.processNext(entryCount) }
},
executor,
)
}
CompletableFuture.allOf(*futures.toTypedArray()).get(30, TimeUnit.SECONDS)
executor.shutdown()

recorder.assertNoAmplification()
recorder.deliveryCount() shouldBe entryCount
}
}

// Purger uses a different code path than the processor — native SQL delete with limit, no
// claim/update state machine. Under the Exposed `SpringTransactionManager` bridge this needs
// its own E2E coverage: a regression where the bridge mishandles bracketing of the bulk delete
// (e.g. an Exposed upgrade that changes statement execution) would silently leave DELIVERED
// rows accumulating without breaking any other test.
test("purger tick under Exposed-bridge PTM removes DELIVERED entries past retention") {
val recorder = RecordingMessageDeliverer()
runner(recorder)
.withPropertyValues(
"okapi.processor.interval=100ms",
"okapi.purger.interval=200ms",
"okapi.purger.retention=1ms",
)
.run { ctx ->
resetCounterAndTruncate(counter)

val publisher = ctx.getBean(SpringOutboxPublisher::class.java)
val tm = ctx.getBean(PlatformTransactionManager::class.java)

TransactionTemplate(tm).execute {
publisher.publish(
OutboxMessage("test.purger", """{"k":"v"}"""),
RecordingDeliveryInfo,
)
}

val deliveredDeadline = System.currentTimeMillis() + 5_000
while (recorder.deliveryCount() == 0 && System.currentTimeMillis() < deliveredDeadline) {
Thread.sleep(50)
}
recorder.deliveryCount() shouldBe 1

val purgedDeadline = System.currentTimeMillis() + 5_000
while (rowCount(counter) > 0 && System.currentTimeMillis() < purgedDeadline) {
Thread.sleep(100)
}
rowCount(counter) shouldBe 0
}
}
})

private fun rowCount(counter: CountingDataSource): Int = counter.delegate.connection.use { c ->
c.createStatement().use { stmt ->
stmt.executeQuery("SELECT COUNT(*) FROM okapi_outbox").use { rs ->
rs.next()
rs.getInt(1)
}
}
}

private fun resetCounterAndTruncate(counter: CountingDataSource) {
counter.delegate.connection.use { c ->
c.createStatement().use { it.execute("TRUNCATE TABLE okapi_outbox") }
}
counter.opened.set(0)
counter.closed.set(0)
}

private object RecordingDeliveryInfo : DeliveryInfo {
override val type: String = "recording"
override fun serialize(): String = """{"type":"recording"}"""
}
Loading