Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions delayedqueue-jvm/api/delayedqueue-jvm.api
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ public abstract interface class org/funfix/delayedqueue/jvm/CronService {

public abstract interface class org/funfix/delayedqueue/jvm/DelayedQueue {
public abstract fun containsMessage (Ljava/lang/String;)Z
public abstract fun countMessages ()I
public abstract fun dropAllMessages (Ljava/lang/String;)I
public abstract fun dropMessage (Ljava/lang/String;)Z
public abstract fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService;
public abstract fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;
public abstract fun listMessages (II)Ljava/util/List;
public abstract fun offerBatch (Ljava/util/List;)Ljava/util/List;
public abstract fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
public abstract fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
Expand All @@ -161,6 +163,7 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueInMemory : org/funfix
public static final field Companion Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory$Companion;
public synthetic fun <init> (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;Ljava/lang/String;Ljava/time/Clock;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun containsMessage (Ljava/lang/String;)Z
public fun countMessages ()I
public static final fun create ()Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory;
public static final fun create (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;)Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory;
public static final fun create (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;Ljava/lang/String;)Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory;
Expand All @@ -169,6 +172,7 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueInMemory : org/funfix
public fun dropMessage (Ljava/lang/String;)Z
public fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService;
public fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;
public fun listMessages (II)Ljava/util/List;
public fun offerBatch (Ljava/util/List;)Ljava/util/List;
public fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
public fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
Expand All @@ -191,12 +195,14 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueJDBC : java/lang/Auto
public synthetic fun <init> (Lorg/funfix/delayedqueue/jvm/internals/jdbc/Database;Lorg/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter;Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;Ljava/time/Clock;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close ()V
public fun containsMessage (Ljava/lang/String;)Z
public fun countMessages ()I
public static final fun create (Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;)Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBC;
public static final fun create (Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;Ljava/time/Clock;)Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBC;
public fun dropAllMessages (Ljava/lang/String;)I
public fun dropMessage (Ljava/lang/String;)Z
public fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService;
public fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;
public fun listMessages (II)Ljava/util/List;
public fun offerBatch (Ljava/util/List;)Ljava/util/List;
public fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
public fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome;
Expand Down
5 changes: 4 additions & 1 deletion delayedqueue-jvm/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ dependencies {
testRuntimeOnly(libs.junit.platform.launcher)
}

tasks.test { useJUnitPlatform() }
tasks.test {
useJUnitPlatform()
jvmArgs("-Xmx1g")
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,40 @@ public interface DelayedQueue<A> {
)
public fun dropAllMessages(confirm: String): Int

/**
* Returns the total number of messages currently in the queue.
*
* This includes both messages that are ready for processing and messages scheduled for the
* future.
*
* @return the number of messages
* @throws ResourceUnavailableException if the operation fails after retries
* @throws InterruptedException if the current thread is interrupted
*/
@Throws(ResourceUnavailableException::class, InterruptedException::class)
public fun countMessages(): Int

/**
* Returns a paginated, read-only list of messages currently in the queue, ordered by
* [ScheduledMessage.scheduleAt] ascending.
*
* This is intended for observability and visualization. The returned [ScheduledMessage]
* instances are snapshots with [ScheduledMessage.canUpdate] always set to `false`.
*
* @param limit the maximum number of entries to return (must be positive)
* @param offset the number of entries to skip (must be non-negative)
* @return an unmodifiable list of scheduled messages
* @throws IllegalArgumentException if limit is not positive or offset is negative
* @throws ResourceUnavailableException if the operation fails after retries
* @throws InterruptedException if the current thread is interrupted
*/
@Throws(
IllegalArgumentException::class,
ResourceUnavailableException::class,
InterruptedException::class,
)
public fun listMessages(limit: Int, offset: Int): List<ScheduledMessage<A>>

/** Utilities for installing cron-like schedules. */
public fun getCron(): CronService<A>
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.min
import org.funfix.delayedqueue.jvm.internals.CronServiceImpl

/**
Expand Down Expand Up @@ -309,6 +310,34 @@ private constructor(
return lock.withLock { map.containsKey(key) }
}

override fun countMessages(): Int {
return lock.withLock { map.size }
}

override fun listMessages(limit: Int, offset: Int): List<ScheduledMessage<A>> {
require(limit > 0) { "limit must be positive, got: $limit" }
require(offset >= 0) { "offset must be non-negative, got: $offset" }

return lock.withLock {
val sorted = order.toList() // already sorted by scheduleAt via TreeSet
if (offset >= sorted.size) {
Collections.emptyList()
} else {
val end = min(offset + limit, sorted.size)
Collections.unmodifiableList(
sorted.subList(offset, end).map { msg ->
ScheduledMessage(
key = msg.key,
payload = msg.payload,
scheduleAt = msg.scheduleAt,
canUpdate = false,
)
}
)
}
}
}

override fun dropAllMessages(confirm: String): Int {
require(confirm == "Yes, please, I know what I'm doing!") {
"Incorrect confirmation string for dropAllMessages"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.funfix.delayedqueue.jvm
import java.security.MessageDigest
import java.time.Clock
import java.time.Instant
import java.util.Collections
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
Expand Down Expand Up @@ -292,6 +293,7 @@ private constructor(
)
emptyMap() // Trigger fallback
}

else -> throw e // Other exceptions propagate
}
}
Expand Down Expand Up @@ -529,6 +531,37 @@ private constructor(
database.withConnection { connection -> adapter.checkIfKeyExists(connection, key, pKind) }
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun countMessages(): Int = withRetries {
database.withConnection { connection -> adapter.countMessages(connection, pKind) }
}

@Throws(
IllegalArgumentException::class,
ResourceUnavailableException::class,
InterruptedException::class,
)
override fun listMessages(limit: Int, offset: Int): List<ScheduledMessage<A>> {
require(limit > 0) { "limit must be positive, got: $limit" }
require(offset >= 0) { "offset must be non-negative, got: $offset" }

return withRetries {
database.withConnection { connection ->
val rows = adapter.listMessages(connection, pKind, limit, offset)
Collections.unmodifiableList(
rows.map { row ->
ScheduledMessage(
key = row.data.pKey,
payload = serializer.deserialize(row.data.payload),
scheduleAt = row.data.scheduledAt,
canUpdate = false,
)
}
)
}
}
}

@Throws(
IllegalArgumentException::class,
ResourceUnavailableException::class,
Expand Down Expand Up @@ -601,8 +634,10 @@ private constructor(
JdbcDriver.Sqlite -> SqliteMigrations.getMigrations(config.tableName)
JdbcDriver.PostgreSQL ->
PostgreSQLMigrations.getMigrations(config.tableName)

JdbcDriver.MsSqlServer ->
MsSqlServerMigrations.getMigrations(config.tableName)

JdbcDriver.MariaDB -> MariaDBMigrations.getMigrations(config.tableName)
JdbcDriver.MySQL -> MySQLMigrations.getMigrations(config.tableName)
JdbcDriver.Oracle -> OracleMigrations.getMigrations(config.tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,57 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t
}
}

/** Counts all rows with a specific kind. */
fun countMessages(conn: SafeConnection, kind: String): Int {
val sql =
"""
SELECT COUNT(*) FROM ${conn.quote(tableName)}
WHERE ${conn.quote("pKind")} = ?
"""
return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.executeQuery().use { rs ->
rs.next()
rs.getInt(1)
}
}
}

/** Lists rows with a specific kind, ordered by scheduledAt ascending, with pagination. */
open fun listMessages(
conn: SafeConnection,
kind: String,
limit: Int,
offset: Int,
): List<DBTableRowWithId> {
val sql =
"""
SELECT
${conn.quote("id")},
${conn.quote("pKey")},
${conn.quote("pKind")},
${conn.quote("payload")},
${conn.quote("scheduledAt")},
${conn.quote("scheduledAtInitially")},
${conn.quote("lockUuid")},
${conn.quote("createdAt")}
FROM ${conn.quote(tableName)}
WHERE ${conn.quote("pKind")} = ?
ORDER BY ${conn.quote("scheduledAt")}, ${conn.quote("pKey")}
LIMIT $limit OFFSET $offset
"""
return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}

/**
* Acquires many messages optimistically by updating them with a lock. Returns the number of
* messages acquired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,41 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) :
}
}

override fun listMessages(
conn: SafeConnection,
kind: String,
limit: Int,
offset: Int,
): List<DBTableRowWithId> {
val sql =
"""
SELECT
[id],
[pKey],
[pKind],
[payload],
[scheduledAt],
[scheduledAtInitially],
[lockUuid],
[createdAt]
FROM [$tableName]
WHERE [pKind] = ?
ORDER BY [scheduledAt], [pKey]
OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}

override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,41 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) :
}
}

override fun listMessages(
conn: SafeConnection,
kind: String,
limit: Int,
offset: Int,
): List<DBTableRowWithId> {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "pKind" = ?
ORDER BY "scheduledAt", "pKey"
OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}

override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
Expand Down
Loading