diff --git a/delayedqueue-jvm/api/delayedqueue-jvm.api b/delayedqueue-jvm/api/delayedqueue-jvm.api index a12ef36..7cb97c8 100644 --- a/delayedqueue-jvm/api/delayedqueue-jvm.api +++ b/delayedqueue-jvm/api/delayedqueue-jvm.api @@ -144,10 +144,12 @@ public abstract interface class org/funfix/delayedqueue/jvm/CronService { public abstract interface class org/funfix/delayedqueue/jvm/DelayedQueue { public abstract fun containsMessage (Ljava/lang/String;)Z + public abstract fun countMessages ()I public abstract fun dropAllMessages (Ljava/lang/String;)I public abstract fun dropMessage (Ljava/lang/String;)Z public abstract fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService; public abstract fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig; + public abstract fun listMessages (II)Ljava/util/List; public abstract fun offerBatch (Ljava/util/List;)Ljava/util/List; public abstract fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; public abstract fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; @@ -161,6 +163,7 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueInMemory : org/funfix public static final field Companion Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory$Companion; public synthetic fun (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;Ljava/lang/String;Ljava/time/Clock;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public fun containsMessage (Ljava/lang/String;)Z + public fun countMessages ()I public static final fun create ()Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory; public static final fun create (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;)Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory; public static final fun create (Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig;Ljava/lang/String;)Lorg/funfix/delayedqueue/jvm/DelayedQueueInMemory; @@ -169,6 +172,7 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueInMemory : org/funfix public fun dropMessage (Ljava/lang/String;)Z public fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService; public fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig; + public fun listMessages (II)Ljava/util/List; public fun offerBatch (Ljava/util/List;)Ljava/util/List; public fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; public fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; @@ -191,12 +195,14 @@ public final class org/funfix/delayedqueue/jvm/DelayedQueueJDBC : java/lang/Auto public synthetic fun (Lorg/funfix/delayedqueue/jvm/internals/jdbc/Database;Lorg/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter;Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;Ljava/time/Clock;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close ()V public fun containsMessage (Ljava/lang/String;)Z + public fun countMessages ()I public static final fun create (Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;)Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBC; public static final fun create (Lorg/funfix/delayedqueue/jvm/MessageSerializer;Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBCConfig;Ljava/time/Clock;)Lorg/funfix/delayedqueue/jvm/DelayedQueueJDBC; public fun dropAllMessages (Ljava/lang/String;)I public fun dropMessage (Ljava/lang/String;)Z public fun getCron ()Lorg/funfix/delayedqueue/jvm/CronService; public fun getTimeConfig ()Lorg/funfix/delayedqueue/jvm/DelayedQueueTimeConfig; + public fun listMessages (II)Ljava/util/List; public fun offerBatch (Ljava/util/List;)Ljava/util/List; public fun offerIfNotExists (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; public fun offerOrUpdate (Ljava/lang/String;Ljava/lang/Object;Ljava/time/Instant;)Lorg/funfix/delayedqueue/jvm/OfferOutcome; diff --git a/delayedqueue-jvm/build.gradle.kts b/delayedqueue-jvm/build.gradle.kts index 412bbed..23a6ca4 100644 --- a/delayedqueue-jvm/build.gradle.kts +++ b/delayedqueue-jvm/build.gradle.kts @@ -39,4 +39,7 @@ dependencies { testRuntimeOnly(libs.junit.platform.launcher) } -tasks.test { useJUnitPlatform() } +tasks.test { + useJUnitPlatform() + jvmArgs("-Xmx1g") +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueue.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueue.kt index 34238c5..df5cde2 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueue.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueue.kt @@ -159,6 +159,40 @@ public interface DelayedQueue { ) public fun dropAllMessages(confirm: String): Int + /** + * Returns the total number of messages currently in the queue. + * + * This includes both messages that are ready for processing and messages scheduled for the + * future. + * + * @return the number of messages + * @throws ResourceUnavailableException if the operation fails after retries + * @throws InterruptedException if the current thread is interrupted + */ + @Throws(ResourceUnavailableException::class, InterruptedException::class) + public fun countMessages(): Int + + /** + * Returns a paginated, read-only list of messages currently in the queue, ordered by + * [ScheduledMessage.scheduleAt] ascending. + * + * This is intended for observability and visualization. The returned [ScheduledMessage] + * instances are snapshots with [ScheduledMessage.canUpdate] always set to `false`. + * + * @param limit the maximum number of entries to return (must be positive) + * @param offset the number of entries to skip (must be non-negative) + * @return an unmodifiable list of scheduled messages + * @throws IllegalArgumentException if limit is not positive or offset is negative + * @throws ResourceUnavailableException if the operation fails after retries + * @throws InterruptedException if the current thread is interrupted + */ + @Throws( + IllegalArgumentException::class, + ResourceUnavailableException::class, + InterruptedException::class, + ) + public fun listMessages(limit: Int, offset: Int): List> + /** Utilities for installing cron-like schedules. */ public fun getCron(): CronService } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueInMemory.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueInMemory.kt index f67981a..8bc951a 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueInMemory.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueInMemory.kt @@ -25,6 +25,7 @@ import java.util.UUID import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +import kotlin.math.min import org.funfix.delayedqueue.jvm.internals.CronServiceImpl /** @@ -309,6 +310,34 @@ private constructor( return lock.withLock { map.containsKey(key) } } + override fun countMessages(): Int { + return lock.withLock { map.size } + } + + override fun listMessages(limit: Int, offset: Int): List> { + require(limit > 0) { "limit must be positive, got: $limit" } + require(offset >= 0) { "offset must be non-negative, got: $offset" } + + return lock.withLock { + val sorted = order.toList() // already sorted by scheduleAt via TreeSet + if (offset >= sorted.size) { + Collections.emptyList() + } else { + val end = min(offset + limit, sorted.size) + Collections.unmodifiableList( + sorted.subList(offset, end).map { msg -> + ScheduledMessage( + key = msg.key, + payload = msg.payload, + scheduleAt = msg.scheduleAt, + canUpdate = false, + ) + } + ) + } + } + } + override fun dropAllMessages(confirm: String): Int { require(confirm == "Yes, please, I know what I'm doing!") { "Incorrect confirmation string for dropAllMessages" diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt index e366b98..ee6ce02 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt @@ -19,6 +19,7 @@ package org.funfix.delayedqueue.jvm import java.security.MessageDigest import java.time.Clock import java.time.Instant +import java.util.Collections import java.util.UUID import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -292,6 +293,7 @@ private constructor( ) emptyMap() // Trigger fallback } + else -> throw e // Other exceptions propagate } } @@ -529,6 +531,37 @@ private constructor( database.withConnection { connection -> adapter.checkIfKeyExists(connection, key, pKind) } } + @Throws(ResourceUnavailableException::class, InterruptedException::class) + override fun countMessages(): Int = withRetries { + database.withConnection { connection -> adapter.countMessages(connection, pKind) } + } + + @Throws( + IllegalArgumentException::class, + ResourceUnavailableException::class, + InterruptedException::class, + ) + override fun listMessages(limit: Int, offset: Int): List> { + require(limit > 0) { "limit must be positive, got: $limit" } + require(offset >= 0) { "offset must be non-negative, got: $offset" } + + return withRetries { + database.withConnection { connection -> + val rows = adapter.listMessages(connection, pKind, limit, offset) + Collections.unmodifiableList( + rows.map { row -> + ScheduledMessage( + key = row.data.pKey, + payload = serializer.deserialize(row.data.payload), + scheduleAt = row.data.scheduledAt, + canUpdate = false, + ) + } + ) + } + } + } + @Throws( IllegalArgumentException::class, ResourceUnavailableException::class, @@ -601,8 +634,10 @@ private constructor( JdbcDriver.Sqlite -> SqliteMigrations.getMigrations(config.tableName) JdbcDriver.PostgreSQL -> PostgreSQLMigrations.getMigrations(config.tableName) + JdbcDriver.MsSqlServer -> MsSqlServerMigrations.getMigrations(config.tableName) + JdbcDriver.MariaDB -> MariaDBMigrations.getMigrations(config.tableName) JdbcDriver.MySQL -> MySQLMigrations.getMigrations(config.tableName) JdbcDriver.Oracle -> OracleMigrations.getMigrations(config.tableName) diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt index 2386844..a757636 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt @@ -369,6 +369,57 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } } + /** Counts all rows with a specific kind. */ + fun countMessages(conn: SafeConnection, kind: String): Int { + val sql = + """ + SELECT COUNT(*) FROM ${conn.quote(tableName)} + WHERE ${conn.quote("pKind")} = ? + """ + return conn.prepareStatement(sql) { stmt -> + stmt.setString(1, kind) + stmt.executeQuery().use { rs -> + rs.next() + rs.getInt(1) + } + } + } + + /** Lists rows with a specific kind, ordered by scheduledAt ascending, with pagination. */ + open fun listMessages( + conn: SafeConnection, + kind: String, + limit: Int, + offset: Int, + ): List { + val sql = + """ + SELECT + ${conn.quote("id")}, + ${conn.quote("pKey")}, + ${conn.quote("pKind")}, + ${conn.quote("payload")}, + ${conn.quote("scheduledAt")}, + ${conn.quote("scheduledAtInitially")}, + ${conn.quote("lockUuid")}, + ${conn.quote("createdAt")} + FROM ${conn.quote(tableName)} + WHERE ${conn.quote("pKind")} = ? + ORDER BY ${conn.quote("scheduledAt")}, ${conn.quote("pKey")} + LIMIT $limit OFFSET $offset + """ + return conn.prepareStatement(sql) { stmt -> + stmt.setString(1, kind) + stmt.executeQuery().use { rs -> + val results = mutableListOf() + while (rs.next()) { + results.add(rs.toDBTableRowWithId()) + } + results + } + } + } + /** * Acquires many messages optimistically by updating them with a lock. Returns the number of * messages acquired. diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt index 89b36a8..c7793d1 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt @@ -176,6 +176,41 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } + override fun listMessages( + conn: SafeConnection, + kind: String, + limit: Int, + offset: Int, + ): List { + val sql = + """ + SELECT + [id], + [pKey], + [pKind], + [payload], + [scheduledAt], + [scheduledAtInitially], + [lockUuid], + [createdAt] + FROM [$tableName] + WHERE [pKind] = ? + ORDER BY [scheduledAt], [pKey] + OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY + """ + + return conn.prepareStatement(sql) { stmt -> + stmt.setString(1, kind) + stmt.executeQuery().use { rs -> + val results = mutableListOf() + while (rs.next()) { + results.add(rs.toDBTableRowWithId()) + } + results + } + } + } + override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? { val sql = """ diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt index 2f4bad9..af893ad 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt @@ -101,6 +101,41 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } + override fun listMessages( + conn: SafeConnection, + kind: String, + limit: Int, + offset: Int, + ): List { + val sql = + """ + SELECT + "id", + "pKey", + "pKind", + "payload", + "scheduledAt", + "scheduledAtInitially", + "lockUuid", + "createdAt" + FROM "$tableName" + WHERE "pKind" = ? + ORDER BY "scheduledAt", "pKey" + OFFSET $offset ROWS FETCH NEXT $limit ROWS ONLY + """ + + return conn.prepareStatement(sql) { stmt -> + stmt.setString(1, kind) + stmt.executeQuery().use { rs -> + val results = mutableListOf() + while (rs.next()) { + results.add(rs.toDBTableRowWithId()) + } + results + } + } + } + override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? { val sql = """ diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueInMemoryTest.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueInMemoryTest.java index fb4e9ab..c76806b 100644 --- a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueInMemoryTest.java +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueInMemoryTest.java @@ -878,4 +878,273 @@ public void tryPollMany_withMaxSizeLessThanOrEqualToZero_returnsEmptyBatch() { assertTrue(batch3.payload().contains("value offered (1.1)")); assertTrue(batch3.payload().contains("value offered (2.1)")); } + + // ========== Count Messages ========== + + @Test + public void countMessages_returnsZeroForEmptyQueue() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + + assertEquals(0, queue.countMessages()); + } + + @Test + public void countMessages_returnsCorrectCount() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var scheduleAt = clock.now().plusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + queue.offerOrUpdate("key3", "payload3", scheduleAt); + + assertEquals(3, queue.countMessages()); + } + + @Test + public void countMessages_decreasesAfterDrop() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var scheduleAt = clock.now().plusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + assertEquals(2, queue.countMessages()); + + queue.dropMessage("key1"); + assertEquals(1, queue.countMessages()); + } + + @Test + public void countMessages_decreasesAfterAcknowledge() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var scheduleAt = clock.now(); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + assertEquals(2, queue.countMessages()); + + var envelope = queue.tryPoll(); + assertNotNull(envelope); + envelope.acknowledge(); + assertEquals(1, queue.countMessages()); + } + + // ========== List Messages ========== + + @Test + public void listMessages_returnsEmptyListForEmptyQueue() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + + var entries = queue.listMessages(10, 0); + + assertNotNull(entries); + assertTrue(entries.isEmpty()); + } + + @Test + public void listMessages_returnsAllEntriesWithinLimit() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(10, 0); + + assertEquals(3, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("payload1", entries.get(0).payload()); + assertEquals(base.plusSeconds(10), entries.get(0).scheduleAt()); + assertEquals("key2", entries.get(1).key()); + assertEquals("key3", entries.get(2).key()); + } + + @Test + public void listMessages_orderedByScheduledAt() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + // Insert out of order + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + + var entries = queue.listMessages(10, 0); + + assertEquals(3, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("key2", entries.get(1).key()); + assertEquals("key3", entries.get(2).key()); + } + + @Test + public void listMessages_respectsLimit() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(2, 0); + + assertEquals(2, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("key2", entries.get(1).key()); + } + + @Test + public void listMessages_respectsOffset() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(10, 1); + + assertEquals(2, entries.size()); + assertEquals("key2", entries.get(0).key()); + assertEquals("key3", entries.get(1).key()); + } + + @Test + public void listMessages_offsetBeyondSizeReturnsEmpty() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + + var entries = queue.listMessages(10, 100); + + assertTrue(entries.isEmpty()); + } + + @Test + public void listMessages_limitAndOffset() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + queue.offerOrUpdate("key4", "payload4", base.plusSeconds(40)); + + var entries = queue.listMessages(2, 1); + + assertEquals(2, entries.size()); + assertEquals("key2", entries.get(0).key()); + assertEquals("key3", entries.get(1).key()); + } + + @Test + public void listMessages_returnsUnmodifiableList() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + + queue.offerOrUpdate("key1", "payload1", clock.now().plusSeconds(10)); + + var entries = queue.listMessages(10, 0); + + assertThrows(UnsupportedOperationException.class, () -> + entries.add(new ScheduledMessage<>("fake", "fake", Instant.now())) + ); + } + + @Test + public void listMessages_throwsOnInvalidArguments() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(0, 0)); + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(-1, 0)); + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(10, -1)); + } + + @Test + public void listMessages_isReadOnly_doesNotAffectQueue() { + var clock = new MutableClock(Instant.parse("2024-01-01T00:00:00Z")); + var queue = DelayedQueueInMemory.create( + DelayedQueueTimeConfig.create(Duration.ofSeconds(30), Duration.ofMillis(100)), + "test-source", + clock + ); + var scheduleAt = clock.now(); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + + // List should not lock or remove messages + queue.listMessages(10, 0); + queue.listMessages(10, 0); + + // Message should still be pollable + var envelope = queue.tryPoll(); + assertNotNull(envelope); + assertEquals("payload1", envelope.payload()); + } + } diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCContractTestBase.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCContractTestBase.java index 39fed4b..8c2d1cc 100644 --- a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCContractTestBase.java +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCContractTestBase.java @@ -591,4 +591,167 @@ public void tryPollMany_withMaxSizeLessThanOrEqualToZero_returnsEmptyBatch() thr assertTrue(batch3.payload().contains("value offered (1.1)")); assertTrue(batch3.payload().contains("value offered (2.1)")); } + + // ========== Count Messages ========== + + @Test + public void countMessages_returnsZeroForEmptyQueue() throws Exception { + queue = createQueue(); + + assertEquals(0, queue.countMessages()); + } + + @Test + public void countMessages_returnsCorrectCount() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var scheduleAt = clock.now().plusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + queue.offerOrUpdate("key3", "payload3", scheduleAt); + + assertEquals(3, queue.countMessages()); + } + + @Test + public void countMessages_decreasesAfterDrop() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var scheduleAt = clock.now().plusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + assertEquals(2, queue.countMessages()); + + queue.dropMessage("key1"); + assertEquals(1, queue.countMessages()); + } + + @Test + public void countMessages_decreasesAfterAcknowledge() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var scheduleAt = clock.now().minusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + queue.offerOrUpdate("key2", "payload2", scheduleAt); + assertEquals(2, queue.countMessages()); + + var envelope = queue.tryPoll(); + assertNotNull(envelope); + envelope.acknowledge(); + assertEquals(1, queue.countMessages()); + } + + // ========== List Messages ========== + + @Test + public void listMessages_returnsEmptyListForEmptyQueue() throws Exception { + queue = createQueue(); + + var entries = queue.listMessages(10, 0); + + assertNotNull(entries); + assertTrue(entries.isEmpty()); + } + + @Test + public void listMessages_returnsAllEntriesWithinLimit() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(10, 0); + + assertEquals(3, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("payload1", entries.get(0).payload()); + assertEquals("key2", entries.get(1).key()); + assertEquals("key3", entries.get(2).key()); + } + + @Test + public void listMessages_orderedByScheduledAt() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var base = clock.now(); + + // Insert out of order + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + + var entries = queue.listMessages(10, 0); + + assertEquals(3, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("key2", entries.get(1).key()); + assertEquals("key3", entries.get(2).key()); + } + + @Test + public void listMessages_respectsLimit() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(2, 0); + + assertEquals(2, entries.size()); + assertEquals("key1", entries.get(0).key()); + assertEquals("key2", entries.get(1).key()); + } + + @Test + public void listMessages_respectsOffset() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var base = clock.now(); + + queue.offerOrUpdate("key1", "payload1", base.plusSeconds(10)); + queue.offerOrUpdate("key2", "payload2", base.plusSeconds(20)); + queue.offerOrUpdate("key3", "payload3", base.plusSeconds(30)); + + var entries = queue.listMessages(10, 1); + + assertEquals(2, entries.size()); + assertEquals("key2", entries.get(0).key()); + assertEquals("key3", entries.get(1).key()); + } + + @Test + public void listMessages_throwsOnInvalidArguments() throws Exception { + queue = createQueue(); + + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(0, 0)); + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(-1, 0)); + assertThrows(IllegalArgumentException.class, () -> queue.listMessages(10, -1)); + } + + @Test + public void listMessages_isReadOnly_doesNotAffectQueue() throws Exception { + var clock = new MutableClock(Instant.parse("2024-01-01T10:00:00Z")); + queue = createQueueWithClock(clock); + var scheduleAt = clock.now().minusSeconds(10); + + queue.offerOrUpdate("key1", "payload1", scheduleAt); + + // List should not lock or remove messages + queue.listMessages(10, 0); + queue.listMessages(10, 0); + + // Message should still be pollable + var envelope = queue.tryPoll(); + assertNotNull(envelope); + assertEquals("payload1", envelope.payload()); + } } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala index f452036..a4b5b2d 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala @@ -115,6 +115,27 @@ trait DelayedQueue[A] { */ def containsMessage(key: String): IO[Boolean] + /** Returns the total number of messages currently in the queue. + * + * This includes both messages that are ready for processing and messages + * scheduled for the future. + */ + def countMessages: IO[Int] + + /** Returns a paginated, read-only list of messages currently in the queue, + * ordered by [[ScheduledMessage.scheduleAt]] ascending. + * + * This is intended for observability and visualization. The returned + * [[ScheduledMessage]] instances are snapshots with + * [[ScheduledMessage.canUpdate]] always set to `false`. + * + * @param limit + * the maximum number of entries to return (must be positive) + * @param offset + * the number of entries to skip (must be non-negative) + */ + def listMessages(limit: Int, offset: Int): IO[List[ScheduledMessage[A]]] + /** Drops all existing enqueued messages. * * This deletes all messages from the DB table of the configured type. diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala index 36a882c..e46991f 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala @@ -80,6 +80,15 @@ private[scala] class DelayedQueueWrapper[A]( override def containsMessage(key: String): IO[Boolean] = IO(underlying.containsMessage(key)) + override def countMessages: IO[Int] = + IO(underlying.countMessages()) + + override def listMessages(limit: Int, offset: Int): IO[List[ScheduledMessage[A]]] = + IO { + val javaEntries = underlying.listMessages(limit, offset) + javaEntries.asScala.toList.map(ScheduledMessage.fromJava) + } + override def dropAllMessages(confirm: String): IO[Int] = IO(underlying.dropAllMessages(confirm)) diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala index 8f31a29..af6a274 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala @@ -257,6 +257,130 @@ class DelayedQueueInMemorySpec extends CatsEffectSuite { } } + test("countMessages should return 0 for empty queue") { + DelayedQueueInMemory[String]().use { queue => + queue.countMessages.assertEquals(0) + } + } + + test("countMessages should return correct count") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + count <- queue.countMessages + } yield assertEquals(count, 3) + } + } + + test("countMessages should decrease after drop") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.dropMessage("key1") + count <- queue.countMessages + } yield assertEquals(count, 1) + } + } + + test("listMessages should return empty list for empty queue") { + DelayedQueueInMemory[String]().use { queue => + queue.listMessages(10, 0).map { entries => + assertEquals(entries, List.empty[ScheduledMessage[String]]) + } + } + } + + test("listMessages should return entries ordered by scheduledAt") { + DelayedQueueInMemory[String]().use { queue => + for { + now <- IO(Instant.now()) + _ <- queue.offerOrUpdate("key3", "payload3", now.plusSeconds(30)) + _ <- queue.offerOrUpdate("key1", "payload1", now.plusSeconds(10)) + _ <- queue.offerOrUpdate("key2", "payload2", now.plusSeconds(20)) + entries <- queue.listMessages(10, 0) + _ <- IO { + assertEquals(entries.length, 3) + assertEquals(entries(0).key, "key1") + assertEquals(entries(0).payload, "payload1") + assertEquals(entries(1).key, "key2") + assertEquals(entries(2).key, "key3") + } + } yield () + } + } + + test("listMessages should respect limit") { + DelayedQueueInMemory[String]().use { queue => + for { + now <- IO(Instant.now()) + _ <- queue.offerOrUpdate("key1", "payload1", now.plusSeconds(10)) + _ <- queue.offerOrUpdate("key2", "payload2", now.plusSeconds(20)) + _ <- queue.offerOrUpdate("key3", "payload3", now.plusSeconds(30)) + entries <- queue.listMessages(2, 0) + _ <- IO { + assertEquals(entries.length, 2) + assertEquals(entries(0).key, "key1") + assertEquals(entries(1).key, "key2") + } + } yield () + } + } + + test("listMessages should respect offset") { + DelayedQueueInMemory[String]().use { queue => + for { + now <- IO(Instant.now()) + _ <- queue.offerOrUpdate("key1", "payload1", now.plusSeconds(10)) + _ <- queue.offerOrUpdate("key2", "payload2", now.plusSeconds(20)) + _ <- queue.offerOrUpdate("key3", "payload3", now.plusSeconds(30)) + entries <- queue.listMessages(10, 1) + _ <- IO { + assertEquals(entries.length, 2) + assertEquals(entries(0).key, "key2") + assertEquals(entries(1).key, "key3") + } + } yield () + } + } + + test("listMessages should reject invalid arguments") { + DelayedQueueInMemory[String]().use { queue => + for { + _ <- queue.listMessages(0, 0).attempt.map { r => + assert(r.isLeft, "limit=0 should fail") + assert(r.left.exists(_.isInstanceOf[IllegalArgumentException])) + } + _ <- queue.listMessages(-1, 0).attempt.map { r => + assert(r.isLeft, "limit=-1 should fail") + } + _ <- queue.listMessages(10, -1).attempt.map { r => + assert(r.isLeft, "offset=-1 should fail") + } + } yield () + } + } + + test("listMessages is read-only and does not affect queue") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.listMessages(10, 0) + _ <- queue.listMessages(10, 0) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined, "message should still be pollable after listing") + assertEquals(envelope.get.payload, "payload1") + } + } yield () + } + } + test("time passage: offer in future, tryPoll returns None, advance time, tryPoll succeeds") { DelayedQueueInMemory[String]().use { queue => for { diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala index a2437ad..8977ca6 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -260,6 +260,94 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } } + test("countMessages should return 0 for empty queue") { + withQueue { queue => + queue.countMessages.assertEquals(0) + } + } + + test("countMessages should return correct count") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + count <- queue.countMessages + } yield assertEquals(count, 3) + } + } + + test("countMessages should decrease after drop") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.dropMessage("key1") + count <- queue.countMessages + } yield assertEquals(count, 1) + } + } + + test("listMessages should return empty list for empty queue") { + withQueue { queue => + queue.listMessages(10, 0).map { entries => + assertEquals(entries, List.empty[ScheduledMessage[String]]) + } + } + } + + test("listMessages should return entries ordered by scheduledAt") { + withQueue { queue => + for { + now <- IO(Instant.now()) + _ <- queue.offerOrUpdate("key3", "payload3", now.plusSeconds(30)) + _ <- queue.offerOrUpdate("key1", "payload1", now.plusSeconds(10)) + _ <- queue.offerOrUpdate("key2", "payload2", now.plusSeconds(20)) + entries <- queue.listMessages(10, 0) + _ <- IO { + assertEquals(entries.length, 3) + assertEquals(entries(0).key, "key1") + assertEquals(entries(1).key, "key2") + assertEquals(entries(2).key, "key3") + } + } yield () + } + } + + test("listMessages should respect limit and offset") { + withQueue { queue => + for { + now <- IO(Instant.now()) + _ <- queue.offerOrUpdate("key1", "payload1", now.plusSeconds(10)) + _ <- queue.offerOrUpdate("key2", "payload2", now.plusSeconds(20)) + _ <- queue.offerOrUpdate("key3", "payload3", now.plusSeconds(30)) + entries <- queue.listMessages(1, 1) + _ <- IO { + assertEquals(entries.length, 1) + assertEquals(entries(0).key, "key2") + } + } yield () + } + } + + test("listMessages should reject invalid arguments") { + withQueue { queue => + for { + _ <- queue.listMessages(0, 0).attempt.map { r => + assert(r.isLeft, "limit=0 should fail") + } + _ <- queue.listMessages(-1, 0).attempt.map { r => + assert(r.isLeft, "limit=-1 should fail") + } + _ <- queue.listMessages(10, -1).attempt.map { r => + assert(r.isLeft, "offset=-1 should fail") + } + } yield () + } + } + test("multiple queues can share the same table") { val tableName = s"shared_table_${System.nanoTime()}" val config1 = createConfig(tableName, "queue1")