diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml
index 1a284494..eb4518b4 100644
--- a/.github/workflows/java-ci.yaml
+++ b/.github/workflows/java-ci.yaml
@@ -137,10 +137,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
+ sudo systemctl stop redis-server
+ redis-server --logfile /tmp/redis.log --daemonize yes
+
+ - name: Log Redis info
+ run: |
+ redis-server --version
+ redis-cli INFO server
- name: Run producer-only tests
run: ./gradlew test -DincludeTags=producerOnly
+ - name: Upload redis-server log
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: producer-redis-server
+ path: /tmp/redis.log
+ if-no-files-found: ignore
+
- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
@@ -189,10 +204,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
+ sudo systemctl stop redis-server
+ redis-server --logfile /tmp/redis.log --daemonize yes
+
+ - name: Log Redis info
+ run: |
+ redis-server --version
+ redis-cli INFO server
- name: Run integration tests
run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local
+ - name: Upload redis-server log
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: integration-redis-server
+ path: /tmp/redis.log
+ if-no-files-found: ignore
+
- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
@@ -245,12 +275,12 @@ jobs:
- name: Setup Redis Cluster
run: |
mkdir 9000 9001 9002 9003 9004 9005
- printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf
- printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf
- printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf
- printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf
- printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf
- printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf
+ printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9000.log" >> 9000/redis.conf
+ printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9001.log" >> 9001/redis.conf
+ printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9002.log" >> 9002/redis.conf
+ printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9003.log" >> 9003/redis.conf
+ printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9004.log" >> 9004/redis.conf
+ printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9005.log" >> 9005/redis.conf
(cd 9000 && redis-server ./redis.conf) &
(cd 9001 && redis-server ./redis.conf) &
(cd 9002 && redis-server ./redis.conf) &
@@ -260,9 +290,23 @@ jobs:
sleep 30
yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1
+ - name: Log Redis cluster info
+ run: |
+ redis-server --version
+ redis-cli -p 9000 INFO server | grep -E 'redis_version|os|tcp_port|uptime_in_seconds'
+ redis-cli -p 9000 CLUSTER INFO
+
- name: Run Redis cluster tests
run: ./gradlew test -DincludeTags=redisCluster
+ - name: Upload redis-server logs
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: redis-server-cluster
+ path: /tmp/redis-*.log
+ if-no-files-found: ignore
+
- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
@@ -313,10 +357,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
+ sudo systemctl stop redis-server
+ redis-server --logfile /tmp/redis.log --daemonize yes
+
+ - name: Log Redis info
+ run: |
+ redis-server --version
+ redis-cli INFO server
- name: Run reactive integration tests
run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local
+ - name: Upload redis-server log
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: reactive-redis-server
+ path: /tmp/redis.log
+ if-no-files-found: ignore
+
- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
@@ -365,7 +424,7 @@ jobs:
# instead of pulling a Testcontainers image.
- name: Install nats-server
run: |
- NATS_VERSION=v2.10.22
+ NATS_VERSION=v2.12.0
curl -sSL "https://github.com/nats-io/nats-server/releases/download/${NATS_VERSION}/nats-server-${NATS_VERSION}-linux-amd64.tar.gz" \
| tar -xz -C /tmp
sudo mv "/tmp/nats-server-${NATS_VERSION}-linux-amd64/nats-server" /usr/local/bin/nats-server
@@ -374,7 +433,7 @@ jobs:
- name: Start nats-server
run: |
mkdir -p /tmp/jetstream
- nohup nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 &
+ nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 &
for i in $(seq 1 20); do
if (echo > /dev/tcp/127.0.0.1/4222) 2>/dev/null; then
echo "nats-server ready after ${i}s"; break
@@ -382,6 +441,9 @@ jobs:
sleep 1
done
+ - name: Log NATS server info
+ run: nats-server --version
+
- name: Run NATS tests
env:
NATS_RUNNING: "true"
@@ -392,7 +454,7 @@ jobs:
if: always()
uses: actions/upload-artifact@v4
with:
- name: nats-server-log
+ name: nats-server
path: /tmp/nats.log
if-no-files-found: ignore
diff --git a/AGENTS.md b/AGENTS.md
index a5815012..a582dcca 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -31,7 +31,7 @@ Never remove or increment the base version number. The human decides when the of
# Memory Context
-# [rqueue] recent context, 2026-05-09 2:28pm GMT+5:30
+# [rqueue] recent context, 2026-05-09 9:08pm GMT+5:30
No previous sessions found.
\ No newline at end of file
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java
index fbb29f0d..ad75f4d1 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java
@@ -122,10 +122,15 @@ private void updateCounter(boolean fail) {
if (Objects.isNull(counter)) {
return;
}
+ // Pass the consumer name so the counter can route increments to the per-consumer entry
+ // when multiple @RqueueListener methods share a queue with distinct consumerName overrides;
+ // null/empty (no override) routes to the bare-queue counter unchanged.
+ String queueName = job.getQueueDetail().getName();
+ String consumerName = job.getQueueDetail().getConsumerName();
if (fail) {
- counter.updateFailureCount(job.getQueueDetail().getName());
+ counter.updateFailureCount(queueName, consumerName);
} else {
- counter.updateExecutionCount(job.getQueueDetail().getName());
+ counter.updateExecutionCount(queueName, consumerName);
}
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java
index 35173d19..e00336af 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java
@@ -29,6 +29,13 @@
/**
* Queue counter counts the different types of events related to a queue. Failure and execution
* count, it supports queue registrations.
+ *
+ *
Multi-consumer keying. Two {@code @RqueueListener} methods on the same queue with
+ * different {@code consumerName} overrides each produce a distinct {@link QueueDetail} and need
+ * their own counters. The maps are keyed by {@code queueName##consumerName} (or just
+ * {@code queueName} when no override is set) so the second registration does not silently
+ * overwrite the first. Increment lookups follow the same composite key — see
+ * {@link #updateFailureCount(String, String)}.
*/
public class QueueCounter {
@@ -37,20 +44,49 @@ public class QueueCounter {
private final Map queueNameToFailureCounter = new HashMap<>();
private final Map queueNameToExecutionCounter = new HashMap<>();
- private void updateCounter(Map map, String queueName) {
- Counter counter = map.get(queueName);
+ private static String key(String queueName, String consumerName) {
+ return (consumerName == null || consumerName.isEmpty())
+ ? queueName
+ : queueName + "##" + consumerName;
+ }
+
+ private void updateCounter(Map map, String queueName, String consumerName) {
+ // Try the consumer-specific entry first; fall back to the bare queue-name entry so callers
+ // that don't yet pass a consumer name (older paths, single-consumer queues) still work.
+ Counter counter = map.get(key(queueName, consumerName));
+ if (counter == null && consumerName != null && !consumerName.isEmpty()) {
+ counter = map.get(queueName);
+ }
if (counter == null) {
return;
}
counter.increment();
}
+ /** Backward-compatible single-arg increment; route to the bare-queue counter only. */
void updateFailureCount(String queueName) {
- updateCounter(queueNameToFailureCounter, queueName);
+ updateCounter(queueNameToFailureCounter, queueName, null);
}
+ /** Backward-compatible single-arg increment; route to the bare-queue counter only. */
void updateExecutionCount(String queueName) {
- updateCounter(queueNameToExecutionCounter, queueName);
+ updateCounter(queueNameToExecutionCounter, queueName, null);
+ }
+
+ /**
+ * Consumer-aware increment: increments the counter registered for
+ * {@code (queueName, consumerName)}, falling back to the bare {@code queueName} counter when no
+ * consumer-scoped entry exists. Use this from
+ * {@link com.github.sonus21.rqueue.listener.RqueueExecutor} (which has the {@link QueueDetail})
+ * so multi-consumer queues keep accurate per-consumer counts.
+ */
+ void updateFailureCount(String queueName, String consumerName) {
+ updateCounter(queueNameToFailureCounter, queueName, consumerName);
+ }
+
+ /** Consumer-aware execution-count increment. See {@link #updateFailureCount(String, String)}. */
+ void updateExecutionCount(String queueName, String consumerName) {
+ updateCounter(queueNameToExecutionCounter, queueName, consumerName);
}
void registerQueue(
@@ -58,19 +94,20 @@ void registerQueue(
Tags queueTags,
MeterRegistry registry,
QueueDetail queueDetail) {
+ String mapKey = key(queueDetail.getName(), queueDetail.getConsumerName());
if (metricsProperties.countFailure()) {
Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(FAILURE_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Failure count");
Counter counter = builder.register(registry);
- queueNameToFailureCounter.put(queueDetail.getName(), counter);
+ queueNameToFailureCounter.put(mapKey, counter);
}
if (metricsProperties.countExecution()) {
Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(EXECUTION_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Task execution count");
Counter counter = builder.register(registry);
- queueNameToExecutionCounter.put(queueDetail.getName(), counter);
+ queueNameToExecutionCounter.put(mapKey, counter);
}
}
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java
index 7da81462..2d7d3505 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java
@@ -38,4 +38,14 @@ public void updateFailureCount(String queueName) {
public void updateExecutionCount(String queueName) {
queueCounter.updateExecutionCount(queueName);
}
+
+ @Override
+ public void updateFailureCount(String queueName, String consumerName) {
+ queueCounter.updateFailureCount(queueName, consumerName);
+ }
+
+ @Override
+ public void updateExecutionCount(String queueName, String consumerName) {
+ queueCounter.updateExecutionCount(queueName, consumerName);
+ }
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java
index f862e8cc..33be981e 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java
@@ -35,6 +35,14 @@
public class RqueueMetrics implements RqueueMetricsRegistry {
static final String QUEUE_KEY = "key";
+ /**
+ * Tag added when a {@link QueueDetail} declares a {@code consumerName} override. Without this
+ * tag, two {@code @RqueueListener} methods on the same queue with different consumer names
+ * register gauges with identical (name, tag-set) pairs and Micrometer silently keeps only the
+ * first — losing the second consumer's metrics entirely.
+ */
+ static final String CONSUMER_KEY = "consumer";
+
private static final String QUEUE_SIZE = "queue.size";
private static final String SCHEDULED_QUEUE_SIZE = "scheduled.queue.size";
private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size";
@@ -58,17 +66,31 @@ private void monitor() {
for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
Tags queueTags =
Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
+ // When a queue carries multiple consumers (multiple @RqueueListener with distinct
+ // consumerName overrides), each gets its own QueueDetail. Without a `consumer` tag the
+ // gauges would share the same (name, tags) and Micrometer would drop all but the first.
+ String consumerName = queueDetail.getConsumerName();
+ boolean hasConsumerOverride = consumerName != null && !consumerName.isEmpty();
+ if (hasConsumerOverride) {
+ queueTags = queueTags.and(CONSUMER_KEY, consumerName);
+ }
Gauge.builder(
metricsProperties.getMetricName(QUEUE_SIZE),
queueDetail,
- c -> queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
+ c -> hasConsumerOverride
+ ? queueMetricsProvider.getPendingMessageCountByConsumer(
+ queueDetail.getName(), consumerName)
+ : queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("The number of entries in this queue")
.register(meterRegistry);
Gauge.builder(
metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE),
queueDetail,
- c -> queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
+ c -> hasConsumerOverride
+ ? queueMetricsProvider.getProcessingMessageCountByConsumer(
+ queueDetail.getName(), consumerName)
+ : queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
.description("The number of entries in the processing queue")
.register(meterRegistry);
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java
index 119625ff..db005ffc 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java
@@ -21,4 +21,21 @@ public interface RqueueMetricsCounter {
void updateFailureCount(String queueName);
void updateExecutionCount(String queueName);
+
+ /**
+ * Consumer-aware failure increment. When a queue carries multiple {@code @RqueueListener}
+ * methods with distinct {@code consumerName} overrides, each consumer has its own counter
+ * registered under {@code (queueName, consumerName)}; calling the bare-queue overload would
+ * route every increment to the same (last-registered) counter and silently lose per-consumer
+ * counts. Defaults to the queue-level path so callers that don't have a consumer name keep
+ * working unchanged.
+ */
+ default void updateFailureCount(String queueName, String consumerName) {
+ updateFailureCount(queueName);
+ }
+
+ /** Consumer-aware execution increment. See {@link #updateFailureCount(String, String)}. */
+ default void updateExecutionCount(String queueName, String consumerName) {
+ updateExecutionCount(queueName);
+ }
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java
index f622541c..4131a936 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java
@@ -72,4 +72,25 @@ default long getScheduledMessageCount(String queueName, String priority) {
default long getProcessingMessageCount(String queueName, String priority) {
return getProcessingMessageCount(queueName);
}
+
+ /**
+ * Per-consumer variant of {@link #getPendingMessageCount(String)}. When two
+ * {@code @RqueueListener} methods on the same queue declare different {@code consumerName}
+ * overrides, each gets its own QueueDetail and its own metric registration; backends that can
+ * report per-consumer pending depth (e.g. NATS JetStream Limits/Interest streams or any
+ * fan-out broker) should override this. The default delegates to the queue-level call so
+ * single-consumer queues, and backends without per-consumer state, behave unchanged.
+ *
+ * @param queueName user-facing queue name
+ * @param consumerName consumer-name override from {@code @RqueueListener(consumerName=...)};
+ * {@code null} or empty when no override is set
+ */
+ default long getPendingMessageCountByConsumer(String queueName, String consumerName) {
+ return getPendingMessageCount(queueName);
+ }
+
+ /** Per-consumer variant of {@link #getProcessingMessageCount(String)}. See related javadoc. */
+ default long getProcessingMessageCountByConsumer(String queueName, String consumerName) {
+ return getProcessingMessageCount(queueName);
+ }
}
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java
index 4f15e2df..39510c11 100644
--- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java
@@ -89,4 +89,75 @@ void updateFailureCount() {
void updateExecutionCount() {
validateCountStatistics(TestUtils.createQueueDetail("scheduled-queue", 900000L), "success");
}
+
+ /**
+ * Regression: two QueueDetails on the same queue with different consumerName overrides each
+ * register their own counter, and a consumer-aware {@code updateXxxCount} routes to the
+ * correct one. Before the fix, both registrations collided on {@code map.put(queueName, ...)}
+ * and only the last consumer's counter was reachable; calls without a consumer name silently
+ * lost increments for any consumer that wasn't last to register.
+ */
+ @Test
+ void multiConsumerOnSameQueueRoutesToCorrectCounter() {
+ metricsProperties.getCount().setExecution(true);
+ metricsProperties.getCount().setFailure(true);
+ MeterRegistry registry = new SimpleMeterRegistry();
+ QueueCounter counter = new QueueCounter();
+
+ QueueDetail qA = QueueDetail.builder()
+ .name("multi")
+ .queueName("__rq::queue::multi")
+ .processingQueueName("__rq::p-queue::multi")
+ .scheduledQueueName("__rq::d-queue::multi")
+ .completedQueueName("__rq::c-queue::multi")
+ .consumerName("consumer-a")
+ .numRetry(3)
+ .visibilityTimeout(900000L)
+ .priorityGroup("")
+ .concurrency(new com.github.sonus21.rqueue.models.Concurrency(-1, -1))
+ .active(true)
+ .build();
+ QueueDetail qB = QueueDetail.builder()
+ .name("multi")
+ .queueName("__rq::queue::multi")
+ .processingQueueName("__rq::p-queue::multi")
+ .scheduledQueueName("__rq::d-queue::multi")
+ .completedQueueName("__rq::c-queue::multi")
+ .consumerName("consumer-b")
+ .numRetry(3)
+ .visibilityTimeout(900000L)
+ .priorityGroup("")
+ .concurrency(new com.github.sonus21.rqueue.models.Concurrency(-1, -1))
+ .active(true)
+ .build();
+
+ counter.registerQueue(
+ metricsProperties, Tags.of("queue", "multi", "consumer", "consumer-a"), registry, qA);
+ counter.registerQueue(
+ metricsProperties, Tags.of("queue", "multi", "consumer", "consumer-b"), registry, qB);
+
+ counter.updateExecutionCount("multi", "consumer-a");
+ counter.updateExecutionCount("multi", "consumer-a");
+ counter.updateExecutionCount("multi", "consumer-b");
+ counter.updateFailureCount("multi", "consumer-b");
+
+ Tags tagsA = Tags.of("queue", "multi", "consumer", "consumer-a", QUEUE_KEY, qA.getQueueName());
+ Tags tagsB = Tags.of("queue", "multi", "consumer", "consumer-b", QUEUE_KEY, qB.getQueueName());
+
+ assertEquals(
+ 2.0,
+ registry.get(EXECUTION_COUNT).tags(tagsA).counter().count(),
+ 0.0,
+ "consumer-a execution count");
+ assertEquals(
+ 1.0,
+ registry.get(EXECUTION_COUNT).tags(tagsB).counter().count(),
+ 0.0,
+ "consumer-b execution count");
+ assertEquals(
+ 1.0,
+ registry.get(FAILURE_COUNT).tags(tagsB).counter().count(),
+ 0.0,
+ "consumer-b failure count");
+ }
}
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java
index 76faf458..87dca835 100644
--- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java
@@ -30,7 +30,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -53,8 +52,9 @@ public class NatsProvisioner {
private final ConcurrentHashMap kvCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap kvLocks = new ConcurrentHashMap<>();
- // stream name → provisioned (set membership acts as the boolean flag)
- private final Set streamsDone = ConcurrentHashMap.newKeySet();
+ // stream name → schedulingEnabled (true if the stream was created/updated with
+ // allowMessageSchedules)
+ private final ConcurrentHashMap streamsDone = new ConcurrentHashMap<>();
private final ConcurrentHashMap streamLocks = new ConcurrentHashMap<>();
// "streamName/requestedConsumerName" → actual consumer name (may differ for stale-rebind)
@@ -63,7 +63,7 @@ public class NatsProvisioner {
/**
* Minimum NATS server version that supports server-side message scheduling via the
- * {@code Nats-Next-Deliver-Time} JetStream publish header (ADR-51).
+ * {@code Nats-Schedule} JetStream publish header (ADR-51).
*/
public static final String SCHEDULING_MIN_VERSION = "2.12.0";
@@ -146,12 +146,18 @@ public KeyValue ensureKv(String bucketName, Duration ttl)
* use {@link #ensureStream(String, List, QueueType)} instead.
*/
public void ensureStream(String streamName, List subjects) {
- ensureStream(streamName, subjects, QueueType.QUEUE, null);
+ ensureStream(streamName, subjects, QueueType.QUEUE, null, false);
}
- /** See {@link #ensureStream(String, List, QueueType, String)}. */
+ /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */
public void ensureStream(String streamName, List subjects, QueueType queueType) {
- ensureStream(streamName, subjects, queueType, null);
+ ensureStream(streamName, subjects, queueType, null, false);
+ }
+
+ /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */
+ public void ensureStream(
+ String streamName, List subjects, QueueType queueType, String description) {
+ ensureStream(streamName, subjects, queueType, description, false);
}
/**
@@ -168,20 +174,35 @@ public void ensureStream(String streamName, List subjects, QueueType que
* {@code nats stream info}). Callers should pass the rqueue queue name so operators can map a
* stream back to the queue that created it; pass {@code null} to skip.
*
+ * {@code allowSchedules} must be {@code true} when the stream will receive messages published
+ * with the {@code Nats-Schedule} header (ADR-51). Only callers that perform delayed
+ * enqueue should pass {@code true}; regular enqueue callers should pass {@code false} (or use the
+ * shorter overloads). Equivalent to the CLI flag {@code --allow-schedules}.
+ *
*
Hits the NATS backend at most once per stream name per process lifetime; subsequent calls
- * return immediately from the in-process cache. If the stream already exists with a different
+ * return immediately from the in-process cache. If {@code allowSchedules=true} is later requested
+ * for a stream that was previously created without that flag, the stream is updated in place via
+ * {@link JetStreamManagement#updateStream}. If the stream already exists with a different
* retention policy, a WARNING is logged and the existing config is left untouched.
*/
public void ensureStream(
- String streamName, List subjects, QueueType queueType, String description) {
- if (streamsDone.contains(streamName)) {
+ String streamName,
+ List subjects,
+ QueueType queueType,
+ String description,
+ boolean allowSchedules) {
+ // Fast-path: already provisioned with at least as many capabilities as requested.
+ Boolean cached = streamsDone.get(streamName);
+ if (cached != null && (cached || !allowSchedules)) {
return;
}
Object lock = streamLocks.computeIfAbsent(streamName, k -> new Object());
synchronized (lock) {
- if (streamsDone.contains(streamName)) {
+ cached = streamsDone.get(streamName);
+ if (cached != null && (cached || !allowSchedules)) {
return;
}
+ boolean enableSchedules = allowSchedules && schedulingSupported;
try {
StreamInfo existing = safeGetStreamInfo(streamName);
RetentionPolicy desired =
@@ -199,6 +220,11 @@ public void ensureStream(
.storageType(sd.getStorage())
.retentionPolicy(desired)
.compressionOption(CompressionOption.S2);
+ if (enableSchedules) {
+ // Enable server-side message scheduling (ADR-51 / Nats-Schedule header).
+ // Equivalent to: nats stream add MY_STREAM --allow-schedules
+ b.allowMessageSchedules(true);
+ }
if (description != null && !description.isEmpty()) {
b.description(description);
}
@@ -223,12 +249,46 @@ public void ensureStream(
+ " — leaving existing config in place.",
new Object[] {streamName, actual, desired});
}
+ // Check whether new subjects need to be merged in (e.g. the sched wildcard added by
+ // enqueueWithDelay after the stream was originally created by a plain enqueue call).
+ java.util.List existingSubjects = existing.getConfiguration().getSubjects();
+ java.util.Set existingSet = existingSubjects != null
+ ? new java.util.HashSet<>(existingSubjects)
+ : new java.util.HashSet<>();
+ boolean needsSubjectUpdate = subjects.stream().anyMatch(s -> !existingSet.contains(s));
+ boolean needsFlagUpdate =
+ enableSchedules && !existing.getConfiguration().getAllowMsgSchedules();
+
+ if (needsFlagUpdate || needsSubjectUpdate) {
+ // Merge: keep all existing subjects and append new ones (never remove).
+ java.util.LinkedHashSet merged = new java.util.LinkedHashSet<>(existingSet);
+ merged.addAll(subjects);
+ StreamConfiguration.Builder upd = StreamConfiguration.builder(
+ existing.getConfiguration())
+ .subjects(new java.util.ArrayList<>(merged));
+ if (needsFlagUpdate) {
+ upd.allowMessageSchedules(true);
+ }
+ jsm.updateStream(upd.build());
+ if (needsFlagUpdate) {
+ log.log(
+ Level.INFO,
+ "Stream ''{0}'' updated to enable message scheduling (ADR-51).",
+ streamName);
+ }
+ if (needsSubjectUpdate) {
+ log.log(
+ Level.INFO,
+ "Stream ''{0}'' updated with additional subjects: {1}.",
+ new Object[] {streamName, subjects});
+ }
+ }
}
} catch (IOException | JetStreamApiException e) {
throw new RqueueNatsException(
"Failed to ensure stream '" + streamName + "' for subjects " + subjects, e);
}
- streamsDone.add(streamName);
+ streamsDone.put(streamName, enableSchedules);
}
}
@@ -236,14 +296,38 @@ public void ensureStream(
* Ensure a durable pull consumer exists, returning the consumer name.
* Hits the NATS backend at most once per (stream, consumer) pair per process lifetime.
*
- * No filter subject is set on the consumer: each queue already has its own dedicated stream
- * with a single subject, so a filter would be redundant. More importantly, omitting the filter
- * allows multiple independent consumer groups (fan-out) to coexist on the same stream — NATS
- * rejects two consumers with the same filter subject (error 10100) regardless of retention type.
+ *
Overload without a filter subject: used when the stream has only the work subject so the
+ * filter would be redundant, or when multiple independent consumer groups (fan-out) must coexist
+ * on a Limits-retention stream (NATS rejects two consumers with the same filter subject, error
+ * 10100). For WorkQueue streams that also carry scheduler subjects ({@code .sched.*}) a filter
+ * subject MUST be supplied via
+ * {@link #ensureConsumer(String, String, String, Duration, long, long)} so the consumer only
+ * receives work-subject messages and does not accidentally pick up scheduler entries.
+ */
+ public String ensureConsumer(
+ String streamName,
+ String consumerName,
+ Duration ackWait,
+ long maxDeliver,
+ long maxAckPending) {
+ return ensureConsumer(streamName, consumerName, null, ackWait, maxDeliver, maxAckPending);
+ }
+
+ /**
+ * Ensure a durable pull consumer exists with an optional subject filter, returning the consumer
+ * name. Hits the NATS backend at most once per (stream, consumer) pair per process lifetime.
+ *
+ *
{@code filterSubject} — when non-null, sets the consumer's filter subject so it only
+ * receives messages published to that subject. Required when the stream carries both work subjects
+ * and scheduler subjects ({@code .sched.*}): without a filter the consumer reads scheduler
+ * entries before the scheduled time, delivering the message early. Pass {@code null} for streams
+ * that do not use NATS scheduling, or where fan-out across multiple consumers is needed (Limits
+ * retention).
*/
public String ensureConsumer(
String streamName,
String consumerName,
+ String filterSubject,
Duration ackWait,
long maxDeliver,
long maxAckPending) {
@@ -258,8 +342,8 @@ public String ensureConsumer(
if (cached != null) {
return cached;
}
- String actual =
- doEnsureConsumer(streamName, consumerName, ackWait, maxDeliver, maxAckPending);
+ String actual = doEnsureConsumer(
+ streamName, consumerName, filterSubject, ackWait, maxDeliver, maxAckPending);
consumerCache.put(cacheKey, actual);
return actual;
}
@@ -268,6 +352,7 @@ public String ensureConsumer(
private String doEnsureConsumer(
String streamName,
String consumerName,
+ String filterSubject,
Duration ackWait,
long maxDeliver,
long maxAckPending) {
@@ -295,16 +380,20 @@ private String doEnsureConsumer(
throw new RqueueNatsException("Consumer '" + consumerName + "' on stream '" + streamName
+ "' does not exist and autoCreateConsumers=false");
}
- jsm.addOrUpdateConsumer(
- streamName,
- ConsumerConfiguration.builder()
- .durable(consumerName)
- .ackPolicy(AckPolicy.Explicit)
- .deliverPolicy(DeliverPolicy.All)
- .ackWait(ackWait)
- .maxDeliver(maxDeliver)
- .maxAckPending(maxAckPending)
- .build());
+ ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder()
+ .durable(consumerName)
+ .ackPolicy(AckPolicy.Explicit)
+ .deliverPolicy(DeliverPolicy.All)
+ .ackWait(ackWait)
+ .maxDeliver(maxDeliver)
+ .maxAckPending(maxAckPending);
+ if (filterSubject != null && !filterSubject.isEmpty()) {
+ // Filter to the work subject only so that scheduler entries (published to
+ // .sched.*) are not delivered to this consumer before the scheduled time.
+ // The NATS scheduler fires the triggered message to workSubject when the time arrives.
+ ccBuilder.filterSubject(filterSubject);
+ }
+ jsm.addOrUpdateConsumer(streamName, ccBuilder.build());
return consumerName;
} catch (JetStreamApiException e) {
throw new RqueueNatsException(
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java
index f42e2dcf..38dafea1 100644
--- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java
@@ -64,10 +64,25 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable {
private static final Logger log = Logger.getLogger(JetStreamMessageBroker.class.getName());
/**
- * JetStream publish header used by NATS >= 2.12 to hold a message until a UTC delivery time
- * (ADR-51). Value must be RFC 3339 UTC, e.g. {@code 2026-05-09T12:30:00Z}.
+ * NATS message-scheduling header (ADR-51, NATS >= 2.12).
+ * Value format: {@code @at }, e.g. {@code @at 2026-05-09T12:30:00Z}.
+ * The message must be published to a dedicated scheduler subject (not the work subject);
+ * the work subject is declared via {@link #HDR_SCHEDULE_TARGET}.
*/
- static final String HDR_NEXT_DELIVER_TIME = "Nats-Next-Deliver-Time";
+ public static final String HDR_SCHEDULE = "Nats-Schedule";
+
+ /**
+ * Target subject to which NATS fires the work message when the schedule triggers.
+ * Must differ from the publish (scheduler) subject.
+ */
+ public static final String HDR_SCHEDULE_TARGET = "Nats-Schedule-Target";
+
+ /**
+ * Suffix appended to the work subject to form the per-message scheduler subject,
+ * e.g. {@code rqueue.js.orders.sched.}.
+ * The stream must include {@code .sched.*} in its subjects list.
+ */
+ public static final String SCHED_SUBJECT_SUFFIX = ".sched.";
/**
* Enrichment header: epoch-ms at which this message was scheduled to be processed.
@@ -92,7 +107,7 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable {
*/
private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50);
- /** RFC 3339 UTC formatter for the {@code Nats-Next-Deliver-Time} header value. */
+ /** RFC 3339 UTC formatter for the {@code Nats-Schedule} header value (@at prefix). */
private static final DateTimeFormatter RFC3339_UTC =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC);
@@ -197,6 +212,24 @@ private String dlqSubjectFor(QueueDetail q) {
return subjectFor(q) + config.getDlqSubjectSuffix();
}
+ /**
+ * Scheduler subject for a single delayed message: {@code .sched.}.
+ * The NATS scheduler fires a publish to the work subject when the schedule triggers.
+ * Each message gets its own unique subject so the rollup-sub only replaces itself,
+ * never another message's schedule entry.
+ */
+ private String schedSubjectFor(QueueDetail q, String msgId) {
+ return subjectFor(q) + SCHED_SUBJECT_SUFFIX + msgId;
+ }
+
+ /**
+ * Wildcard pattern covering all scheduler subjects for a queue, used when registering
+ * the stream's subject list. E.g. {@code rqueue.js.orders.sched.*}.
+ */
+ static String schedSubjectPattern(String workSubject) {
+ return workSubject + SCHED_SUBJECT_SUFFIX + "*";
+ }
+
/** Stream description shown in {@code nats stream info} so operators can map back to rqueue. */
private static String streamDescription(QueueDetail q) {
return "rqueue queue: " + q.getName();
@@ -295,13 +328,19 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
+ NatsProvisioner.SCHEDULING_MIN_VERSION
+ "+ or use the Redis backend for delayed messages.");
}
- String subject = subjectFor(q);
+ String workSubject = subjectFor(q);
String stream = streamFor(q);
- provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
- Headers headers = buildSchedulingHeaders(m, delayMs);
+ provisioner.ensureStream(
+ stream,
+ List.of(workSubject, schedSubjectPattern(workSubject)),
+ q.getType(),
+ streamDescription(q),
+ true);
+ String schedSubject = schedSubjectFor(q, m.getId());
+ Headers headers = buildSchedulingHeaders(m, delayMs, workSubject);
try {
byte[] payload = serdes.serialize(m);
- js.publish(subject, headers, payload);
+ js.publish(schedSubject, headers, payload);
} catch (IOException | JetStreamApiException e) {
throw new RqueueNatsException(
"Failed to enqueue scheduled message id="
@@ -309,7 +348,7 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
+ " queue="
+ q.getName()
+ " subject="
- + subject,
+ + schedSubject,
e);
} catch (RuntimeException e) {
throw new RqueueNatsException(
@@ -318,7 +357,7 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
+ " queue="
+ q.getName()
+ " subject="
- + subject,
+ + schedSubject,
e);
}
}
@@ -378,10 +417,15 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
+ NatsProvisioner.SCHEDULING_MIN_VERSION
+ "+ or use the Redis backend for delayed messages."));
}
- String subject = subjectFor(q);
+ String workSubject = subjectFor(q);
String stream = streamFor(q);
try {
- provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
+ provisioner.ensureStream(
+ stream,
+ List.of(workSubject, schedSubjectPattern(workSubject)),
+ q.getType(),
+ streamDescription(q),
+ true);
} catch (Exception e) {
return Mono.error(new RqueueNatsException(
"Failed to provision stream for reactive scheduled enqueue id="
@@ -390,7 +434,8 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
+ q.getName(),
e));
}
- Headers headers = buildSchedulingHeaders(m, delayMs);
+ String schedSubject = schedSubjectFor(q, m.getId());
+ Headers headers = buildSchedulingHeaders(m, delayMs, workSubject);
byte[] payload;
try {
payload = serdes.serialize(m);
@@ -401,10 +446,10 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
+ " queue="
+ q.getName()
+ " subject="
- + subject,
+ + schedSubject,
e));
}
- return Mono.fromFuture(() -> js.publishAsync(subject, headers, payload))
+ return Mono.fromFuture(() -> js.publishAsync(schedSubject, headers, payload))
.onErrorMap(e -> e instanceof RqueueNatsException
? e
: new RqueueNatsException(
@@ -413,7 +458,7 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
+ " queue="
+ q.getName()
+ " subject="
- + subject,
+ + schedSubject,
e))
.then();
}
@@ -507,12 +552,14 @@ private List popInternal(
String actualConsumerName = provisioner.ensureConsumer(
stream,
consumerName,
+ subject, // filter: consumer only receives work-subject messages, not sched entries
ackWait,
maxDeliver,
config.getConsumerDefaults().getMaxAckPending());
PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, actualConsumerName);
- // Consumer has no filter subject; pass null so the NATS client doesn't validate
- // the subject against a (nonexistent) filter — SUB-90011 otherwise.
+ // The filter is set on the consumer itself; pass null here so the NATS client does not
+ // re-validate the subject against the consumer filter at subscribe time — SUB-90011
+ // is thrown if the passed subject doesn't match the consumer's filter subject exactly.
return js.subscribe(null, opts);
} catch (IOException | JetStreamApiException e) {
throw new RqueueNatsException(
@@ -1002,7 +1049,21 @@ public Capabilities capabilities() {
* {@link RqueueMessage#setPeriod} if the deserialized payload lacks it.
*
*/
- private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs) {
+ /**
+ * Build NATS message-scheduling headers for a delayed publish (ADR-51, NATS >= 2.12).
+ *
+ * The message is published to a dedicated scheduler subject
+ * ({@code .sched.}). When the schedule triggers, NATS fires a
+ * JetStream publish to {@code workSubject} so consumers see it only after the delay.
+ *
+ * Required headers (confirmed against nats-server 2.12.8):
+ *
+ * - {@code Nats-Schedule: @at } — trigger time, {@code @at } prefix required
+ *
- {@code Nats-Schedule-Target: } — where to publish when schedule fires
+ *
- {@code Nats-Rollup: sub} — replaces any existing schedule for this subject (idempotent)
+ *
+ */
+ private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs, String workSubject) {
Headers headers = new Headers();
if (m.getId() != null) {
// Dedup key: id-at-processAt for scheduled messages (processAt > 0).
@@ -1018,8 +1079,13 @@ private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs) {
}
long deliverAtMs =
m.getProcessAt() > 0 ? m.getProcessAt() : System.currentTimeMillis() + delayMs;
- String deliverAt = RFC3339_UTC.format(Instant.ofEpochMilli(deliverAtMs));
- headers.add(HDR_NEXT_DELIVER_TIME, deliverAt);
+ // "@at " prefix is required — bare RFC3339 is not recognised by the server scheduler.
+ String deliverAt = "@at " + RFC3339_UTC.format(Instant.ofEpochMilli(deliverAtMs));
+ headers.add(HDR_SCHEDULE, deliverAt);
+ headers.add(HDR_SCHEDULE_TARGET, workSubject);
+ // Rollup-sub: replaces any existing schedule entry for this scheduler subject so that
+ // re-enqueue of the same message ID at the same processAt is idempotent.
+ headers.add("Nats-Rollup", "sub");
// Enrichment headers — readable on pop without deserializing the payload
headers.add(HDR_PROCESS_AT, String.valueOf(deliverAtMs));
if (m.isPeriodic()) {
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java
new file mode 100644
index 00000000..ab0bfcc3
--- /dev/null
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2026 Sonu Kumar
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ *
+ */
+package com.github.sonus21.rqueue.nats.js;
+
+import com.github.sonus21.rqueue.config.RqueueConfig;
+import com.github.sonus21.rqueue.core.EndpointRegistry;
+import com.github.sonus21.rqueue.core.spi.MessageBroker;
+import com.github.sonus21.rqueue.listener.QueueDetail;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+
+/**
+ * Bootstrap-time installer for the NATS-native dead-letter advisory bridge. For every active queue
+ * registered in {@link EndpointRegistry}, calls
+ * {@link JetStreamMessageBroker#installDeadLetterBridge(QueueDetail, String)} so that messages
+ * exceeding {@code maxDeliver} on the durable consumer are republished onto the queue's DLQ
+ * stream via the {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES} advisory subject.
+ *
+ * This is the NATS-side equivalent of the Redis backend's {@code RqueueDeadLetterPublisher}:
+ * the rqueue post-processing handler already routes failed deliveries to a configured Rqueue-level
+ * DLQ ({@code @RqueueListener(deadLetterQueue=...)}); the advisory bridge registered here is an
+ * additional, independent path that catches messages whose handler exhausted retries without the
+ * post-processor noticing (e.g. listener restart, container shutdown mid-retry, JetStream-driven
+ * redelivery exhaustion outside the rqueue retry counter).
+ *
+ *
Lifecycle. Implements {@link SmartInitializingSingleton} so it runs after every
+ * {@code @RqueueListener} bean has registered with {@link EndpointRegistry} and the
+ * {@link com.github.sonus21.rqueue.nats.js.NatsStreamValidator} has provisioned the underlying
+ * streams — but before {@code SmartLifecycle.start()} spawns the message pollers, so the bridge
+ * is in place before the first delivery attempt. Implements {@link DisposableBean} so the
+ * advisory dispatchers are torn down on context shutdown.
+ *
+ *
Producer-only mode. When {@link RqueueConfig#isProducer()} is true the application
+ * has no listeners and therefore no consumers that could exhaust retries; the registrar exits
+ * early and installs nothing.
+ *
+ *
Backend gating. Only does its work when the active broker is a
+ * {@link JetStreamMessageBroker}; on Redis or other backends the bean simply no-ops, so it is safe
+ * to wire unconditionally from the NATS auto-config (which is itself gated on
+ * {@code rqueue.backend=nats}).
+ */
+public class NatsDeadLetterBridgeRegistrar implements SmartInitializingSingleton, DisposableBean {
+
+ private static final Logger log = Logger.getLogger(NatsDeadLetterBridgeRegistrar.class.getName());
+
+ private final MessageBroker broker;
+ private final RqueueConfig rqueueConfig;
+ private final List bridges = new ArrayList<>();
+
+ public NatsDeadLetterBridgeRegistrar(MessageBroker broker, RqueueConfig rqueueConfig) {
+ this.broker = broker;
+ this.rqueueConfig = rqueueConfig;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ if (rqueueConfig != null && rqueueConfig.isProducer()) {
+ log.log(
+ Level.FINE,
+ "NatsDeadLetterBridgeRegistrar: producer-only mode — skipping bridge installation");
+ return;
+ }
+ if (!(broker instanceof JetStreamMessageBroker)) {
+ // Defensive — the bean is wired only by the NATS auto-config, but other backends could
+ // theoretically substitute a different MessageBroker via @Primary.
+ return;
+ }
+ JetStreamMessageBroker nb = (JetStreamMessageBroker) broker;
+ List queues = EndpointRegistry.getActiveQueueDetails();
+ if (queues.isEmpty()) {
+ return;
+ }
+ int installed = 0;
+ for (QueueDetail q : queues) {
+ String consumerName = q.resolvedConsumerName();
+ try {
+ bridges.add(nb.installDeadLetterBridge(q, consumerName));
+ installed++;
+ } catch (RuntimeException e) {
+ // Best-effort: a single failure must not abort listener startup. The rqueue-level DLQ
+ // path (PostProcessingHandler.moveToDlq) still works regardless.
+ log.log(
+ Level.WARNING,
+ "Failed to install dead-letter advisory bridge for queue " + q.getName() + " consumer "
+ + consumerName + ": " + e.getMessage(),
+ e);
+ }
+ }
+ log.log(
+ Level.INFO,
+ "NatsDeadLetterBridgeRegistrar: installed {0} advisory bridge(s) across {1} queue(s)",
+ new Object[] {installed, queues.size()});
+ }
+
+ @Override
+ public void destroy() {
+ for (AutoCloseable c : bridges) {
+ try {
+ c.close();
+ } catch (Exception ignore) {
+ // best-effort close; we are shutting down anyway
+ }
+ }
+ bridges.clear();
+ }
+}
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java
index 89614506..d249348b 100644
--- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java
@@ -97,7 +97,7 @@ public void afterSingletonsInstantiated() {
log.log(
Level.INFO,
"NatsStreamValidator: NATS message scheduling (ADR-51) is AVAILABLE — "
- + "enqueueWithDelay will use the Nats-Next-Deliver-Time header.");
+ + "enqueueWithDelay will use the Nats-Schedule header.");
} else {
log.log(
Level.WARNING,
@@ -122,7 +122,7 @@ public void afterSingletonsInstantiated() {
String mainSubject = config.getSubjectPrefix() + q.getName();
total += tryEnsure(failures, mainStream, mainSubject, q);
if (!producerOnly) {
- tryEnsureConsumer(failures, mainStream, q.resolvedConsumerName(), q, cd);
+ tryEnsureConsumer(failures, mainStream, q.resolvedConsumerName(), mainSubject, q, cd);
}
if (q.getPriority() != null) {
@@ -182,13 +182,14 @@ private void tryEnsureConsumer(
List failures,
String streamName,
String consumerName,
+ String filterSubject,
QueueDetail q,
RqueueNatsConfig.ConsumerDefaults cd) {
Duration ackWait = JetStreamMessageBroker.resolveAckWait(q, config);
long maxDeliver = JetStreamMessageBroker.resolveMaxDeliver(q, config);
try {
provisioner.ensureConsumer(
- streamName, consumerName, ackWait, maxDeliver, cd.getMaxAckPending());
+ streamName, consumerName, filterSubject, ackWait, maxDeliver, cd.getMaxAckPending());
} catch (RqueueNatsException e) {
failures.add("consumer " + consumerName + " on " + streamName + ": " + rootCause(e));
}
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java
index 8e026f03..43e90f73 100644
--- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java
@@ -23,8 +23,11 @@
import com.github.sonus21.rqueue.nats.RqueueNatsException;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
+import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.StreamInfo;
import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* NATS JetStream {@link RqueueQueueMetricsProvider}. Pending messages map to the message count of
@@ -36,6 +39,9 @@
*/
public class NatsRqueueQueueMetricsProvider implements RqueueQueueMetricsProvider {
+ private static final Logger log =
+ Logger.getLogger(NatsRqueueQueueMetricsProvider.class.getName());
+
private final JetStreamManagement jsm;
private final RqueueNatsConfig config;
@@ -99,4 +105,76 @@ public long getDeadLetterMessageCount(String queueName) {
"Failed to read DLQ stream size for queue=" + queueName + " stream=" + dlqStream, e);
}
}
+
+ /**
+ * Per-consumer pending depth: {@code ConsumerInfo.numPending}, the JetStream-tracked count of
+ * messages this durable has yet to deliver. Falls back to the queue-level stream count when
+ * {@code consumerName} is null/empty (no override) or when the consumer hasn't been provisioned
+ * yet (boot-time race), so dashboards never see a missing reading.
+ */
+ @Override
+ public long getPendingMessageCountByConsumer(String queueName, String consumerName) {
+ if (consumerName == null || consumerName.isEmpty()) {
+ return getPendingMessageCount(queueName);
+ }
+ QueueDetail q;
+ try {
+ q = EndpointRegistry.get(queueName);
+ } catch (QueueDoesNotExist e) {
+ return 0L;
+ }
+ String stream = config.getStreamPrefix() + q.getName();
+ try {
+ ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName);
+ return ci == null ? 0L : ci.getNumPending();
+ } catch (JetStreamApiException e) {
+ // consumer or stream not yet provisioned — fall back to stream-level count rather than 0,
+ // so the gauge reads the same as the bare-queue overload during the bootstrap window.
+ log.log(
+ Level.FINE,
+ "Consumer-aware pending lookup fell back to stream count: queue=" + queueName
+ + " consumer=" + consumerName + " (" + e.getMessage() + ")");
+ return getPendingMessageCount(queueName);
+ } catch (IOException e) {
+ throw new RqueueNatsException(
+ "Failed to read consumer info for queue=" + queueName + " consumer=" + consumerName
+ + " stream=" + stream,
+ e);
+ }
+ }
+
+ /**
+ * Per-consumer in-flight depth: {@code ConsumerInfo.numAckPending}, the count of messages
+ * delivered to this consumer but not yet acked. This is the JetStream-side analog of the
+ * Redis processing ZSET; reporting it per-consumer is essential when multiple
+ * {@code @RqueueListener} methods on the same queue progress at different rates.
+ */
+ @Override
+ public long getProcessingMessageCountByConsumer(String queueName, String consumerName) {
+ if (consumerName == null || consumerName.isEmpty()) {
+ return getProcessingMessageCount(queueName);
+ }
+ QueueDetail q;
+ try {
+ q = EndpointRegistry.get(queueName);
+ } catch (QueueDoesNotExist e) {
+ return 0L;
+ }
+ String stream = config.getStreamPrefix() + q.getName();
+ try {
+ ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName);
+ return ci == null ? 0L : ci.getNumAckPending();
+ } catch (JetStreamApiException e) {
+ log.log(
+ Level.FINE,
+ "Consumer-aware processing lookup fell back to 0: queue=" + queueName + " consumer="
+ + consumerName + " (" + e.getMessage() + ")");
+ return 0L;
+ } catch (IOException e) {
+ throw new RqueueNatsException(
+ "Failed to read consumer info for queue=" + queueName + " consumer=" + consumerName
+ + " stream=" + stream,
+ e);
+ }
+ }
}
diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java
index e9d4ae23..dce914b2 100644
--- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java
+++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java
@@ -70,7 +70,7 @@
* JetStream stream, republishes each to the destination stream, and hard-deletes the source copy
* via {@link JetStreamManagement#deleteMessage}. {@link #enqueueMessage(String, String, String)}
* looks up the message in the metadata store and republishes it immediately (without a
- * {@code Nats-Next-Deliver-Time} header) so the worker picks it up on its next poll.
+ * {@code Nats-Schedule} header) so the worker picks it up on its next poll.
*
* {@link #makeEmpty(String, String)} still returns "not supported" — purging a stream is a
* destructive admin operation best performed via {@code nats stream purge}.
@@ -156,7 +156,7 @@ public BooleanResponse deleteMessage(String queueName, String id) {
/**
* Re-enqueue a message for immediate delivery. Looks up the {@link RqueueMessage} from the
* metadata store by {@code queueName + id}, then republishes the raw bytes to the queue's
- * JetStream stream without a {@code Nats-Next-Deliver-Time} header so the poller picks it up
+ * JetStream stream without a {@code Nats-Schedule} header so the poller picks it up
* on its next fetch. A fresh {@code Nats-Msg-Id} ({@code id-requeue-}) prevents
* JetStream from deduplicating against the original scheduled publish. The {@code position}
* hint (FRONT / BACK) is ignored — JetStream pull consumers deliver in stream-sequence order.
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java
index bead4047..71208f33 100644
--- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java
@@ -9,6 +9,7 @@
*/
package com.github.sonus21.rqueue.nats;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -19,18 +20,30 @@
import io.nats.client.Options;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
/**
- * Base for JetStream integration tests. Mirrors the Redis test pattern: when {@code NATS_RUNNING}
- * is set the test connects to a locally running nats-server (CI path); otherwise a Testcontainers-
- * managed instance is started in {@link BeforeAll} (local Docker path). JUnit 5 / Testcontainers
- * skip the test gracefully if Docker isn't available and {@code NATS_RUNNING} isn't set.
+ * Base for JetStream integration tests. Two execution paths:
+ *
+ *
+ * - External NATS — set {@code NATS_RUNNING=1} (and optionally {@code NATS_URL}) to
+ * connect to an already-running nats-server. This is the path used in CI and when
+ * nats-server is installed locally (e.g. via Homebrew).
+ *
- Testcontainers — when Docker is available and {@code NATS_RUNNING} is not set, a
+ * {@code nats:2.12-alpine} container is started automatically.
+ *
+ *
+ * If neither path is viable the whole class is skipped via {@link
+ * org.junit.jupiter.api.Assumptions#assumeTrue} — no hard failure.
+ *
+ *
Why no {@code @Testcontainers(disabledWithoutDocker = true)}: that annotation
+ * disables the class at the JUnit extension level before {@code @BeforeAll} runs,
+ * so the external-NATS path would never be reached when Docker is absent. We manage the
+ * container lifecycle manually and gate on assumptions instead.
*/
-@Testcontainers(disabledWithoutDocker = true)
@NatsIntegrationTest
public abstract class AbstractJetStreamIT {
@@ -38,14 +51,7 @@ public abstract class AbstractJetStreamIT {
static final String EXTERNAL_NATS_URL =
System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222");
- /**
- * Container is only constructed in the local-Docker path. The Testcontainers extension
- * ignores static {@code GenericContainer} fields that aren't annotated {@code @Container};
- * we manage the container's lifecycle from {@link #setup()} / {@link #teardown()} so the
- * external-NATS path can leave it null without tripping the extension.
- */
protected static GenericContainer> NATS;
-
protected static Connection connection;
@BeforeAll
@@ -54,6 +60,10 @@ static void setup() throws Exception {
if (USE_EXTERNAL_NATS) {
url = EXTERNAL_NATS_URL;
} else {
+ assumeTrue(
+ DockerClientFactory.instance().isDockerAvailable(),
+ "Skipping NATS ITs: Docker is not available and NATS_RUNNING is not set. "
+ + "Start nats-server locally and set NATS_RUNNING=1, or start Docker.");
NATS = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine"))
.withCommand("-js", "-DV")
.withExposedPorts(4222)
@@ -82,11 +92,6 @@ protected QueueDetail mockQueue(String name, QueueType type) {
return mockQueue(name, type, null);
}
- /**
- * Build a mock QueueDetail whose {@code resolvedConsumerName()} returns the given consumer
- * name. Used by tests that exercise multi-consumer flows where pop's {@code consumerName}
- * argument must match what ack/nack derive from the QueueDetail.
- */
protected QueueDetail mockQueue(String name, QueueType type, String consumerName) {
QueueDetail q = mock(QueueDetail.class);
when(q.getName()).thenReturn(name);
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java
new file mode 100644
index 00000000..ac19a4ec
--- /dev/null
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2026 Sonu Kumar
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.github.sonus21.rqueue.nats;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import io.nats.client.JetStream;
+import io.nats.client.JetStreamManagement;
+import io.nats.client.api.RetentionPolicy;
+import io.nats.client.api.StorageType;
+import io.nats.client.api.StreamConfiguration;
+import io.nats.client.api.StreamInfo;
+import io.nats.client.impl.Headers;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Investigates whether NATS server actually enforces the {@code allow_msg_schedules} stream flag
+ * when a message is published with the {@code Nats-Schedule} header (ADR-51, NATS >= 2.12).
+ *
+ *
The unit tests confirm that {@link com.github.sonus21.rqueue.nats.internal.NatsProvisioner}
+ * sets the flag correctly, but they mock {@code jsm.addStream()} — they cannot prove the server
+ * enforces it. This IT creates a stream without the flag against a real
+ * {@code nats:2.12-alpine} container (or an externally-managed server) and attempts the publish
+ * directly, recording whether the server accepts or rejects it.
+ *
+ *
Expected result: NATS 2.12+ enforces the flag; the publish should be rejected (throw)
+ * with error 10189. If the publish succeeds the test documents that the flag is advisory on this
+ * server version, so that the team knows the protective unit tests are not backed by enforcement.
+ */
+@NatsIntegrationTest
+class AllowMessageSchedulesEnforcementIT extends AbstractJetStreamIT {
+
+ private static final DateTimeFormatter RFC3339 =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC);
+
+ @Test
+ void publishWithScheduleHeader_streamWithoutFlag_serverRejectsIt() throws Exception {
+ JetStreamManagement jsm = connection.jetStreamManagement();
+ JetStream js = connection.jetStream();
+
+ // Only meaningful on servers that support scheduling at all
+ assumeTrue(
+ connection.getServerInfo().isSameOrNewerThanVersion("2.12.0"),
+ "Skipping: server < 2.12 — Nats-Schedule header not supported");
+
+ String nano = String.valueOf(System.nanoTime());
+ String streamName = "enforce-sched-test-" + nano;
+ // Use a sched-subject pattern so the subject is valid for scheduling publish
+ String workSubject = "rqueue.enforce." + nano;
+ String schedSubject = workSubject + ".sched.probe";
+
+ // Create stream deliberately WITHOUT allowMessageSchedules, but including both subjects
+ StreamConfiguration cfg = StreamConfiguration.builder()
+ .name(streamName)
+ .subjects(workSubject, schedSubject)
+ .storageType(StorageType.Memory)
+ .retentionPolicy(RetentionPolicy.WorkQueue)
+ .allowMessageSchedules(false)
+ .build();
+ jsm.addStream(cfg);
+
+ // Confirm the flag is not set on the server-side config
+ StreamInfo info = jsm.getStreamInfo(streamName);
+ assertFalse(
+ info.getConfiguration().getAllowMsgSchedules(),
+ "Pre-condition: stream must NOT have allow_msg_schedules");
+
+ // Build correct ADR-51 headers: Nats-Schedule + Nats-Schedule-Target + Nats-Rollup
+ String deliverAt = "@at " + RFC3339.format(Instant.now().plusSeconds(10));
+ Headers headers = new Headers();
+ headers.add("Nats-Schedule", deliverAt);
+ headers.add("Nats-Schedule-Target", workSubject);
+ headers.add("Nats-Rollup", "sub");
+
+ boolean serverEnforcesFlag;
+ try {
+ js.publish(schedSubject, headers, "probe".getBytes());
+ // Publish succeeded — server does NOT enforce the flag
+ serverEnforcesFlag = false;
+ } catch (Exception e) {
+ // Publish rejected (expected: error 10189) — server DOES enforce the flag
+ serverEnforcesFlag = true;
+ }
+
+ // Clean up
+ jsm.deleteStream(streamName);
+
+ // Document and assert the actual behaviour.
+ // If this assertion ever changes it means a NATS server upgrade changed the enforcement policy.
+ assertTrue(
+ serverEnforcesFlag,
+ "NATS server accepted Nats-Schedule on a stream without allow_msg_schedules. This means"
+ + " the flag is advisory on this server version — the unit tests in"
+ + " NatsProvisionerTest that verify the flag is set are still correct best-practice"
+ + " but the server does not enforce it. Update this test if the behaviour is"
+ + " intentional.");
+ }
+
+ @Test
+ void publishWithScheduleHeader_streamWithFlag_serverAcceptsIt() throws Exception {
+ JetStreamManagement jsm = connection.jetStreamManagement();
+ JetStream js = connection.jetStream();
+
+ assumeTrue(
+ connection.getServerInfo().isSameOrNewerThanVersion("2.12.0"),
+ "Skipping: server < 2.12 — Nats-Schedule header not supported");
+
+ String nano = String.valueOf(System.nanoTime());
+ String streamName = "enforce-sched-ok-" + nano;
+ String workSubject = "rqueue.enforce.ok." + nano;
+ String schedSubject = workSubject + ".sched.probe";
+
+ // Create stream WITH allowMessageSchedules
+ StreamConfiguration cfg = StreamConfiguration.builder()
+ .name(streamName)
+ .subjects(workSubject, schedSubject)
+ .storageType(StorageType.Memory)
+ .retentionPolicy(RetentionPolicy.WorkQueue)
+ .allowMessageSchedules(true)
+ .build();
+ jsm.addStream(cfg);
+
+ StreamInfo info = jsm.getStreamInfo(streamName);
+ assertTrue(
+ info.getConfiguration().getAllowMsgSchedules(),
+ "Pre-condition: stream MUST have allow_msg_schedules");
+
+ String deliverAt = "@at " + RFC3339.format(Instant.now().plusSeconds(10));
+ Headers headers = new Headers();
+ headers.add("Nats-Schedule", deliverAt);
+ headers.add("Nats-Schedule-Target", workSubject);
+ headers.add("Nats-Rollup", "sub");
+
+ // With the flag set this must never throw
+ assertDoesNotThrow(
+ () -> js.publish(schedSubject, headers, "probe".getBytes()),
+ "Publish with Nats-Schedule must succeed when allow_msg_schedules=true");
+
+ assertNotNull(jsm.deleteStream(streamName));
+ }
+}
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java
index 0b2e20fa..5acebe56 100644
--- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java
@@ -139,4 +139,67 @@ void enqueueWithDelay_multipleMessages_allDeliveredAfterTheirTimes() throws Exce
}
}
}
+
+ /**
+ * Regression test for the "enqueue-first then enqueueWithDelay" ordering bug.
+ *
+ *
If a plain {@code enqueue()} call happens before the first {@code enqueueWithDelay()}, the
+ * stream is created with only the work subject and no {@code allow_msg_schedules} flag. The
+ * provisioner must detect this on the delayed call and upgrade the stream in-place: add the
+ * sched-wildcard subject AND set the flag in a single {@code updateStream()}, otherwise NATS
+ * rejects the sched-subject publish with "no stream matches subject".
+ */
+ /**
+ * Regression test for the "enqueue-first then enqueueWithDelay" ordering bug.
+ *
+ *
Uses a longer delay (10 s) than the other tests to absorb the stream-upgrade overhead
+ * (updateStream round-trip + consumer creation) that happens in this path but not when the
+ * stream is provisioned with scheduling from the start. Without the extra headroom the
+ * "beforeDelay" check can race with the scheduler firing.
+ */
+ @Test
+ void enqueueWithDelay_afterPlainEnqueue_streamUpgradedAndMessageHeld() throws Exception {
+ final long UPGRADE_DELAY_MS = 10_000L;
+ final long UPGRADE_BUFFER_MS = 3_000L;
+ QueueDetail q = mockQueue("mixed-" + System.nanoTime(), QueueType.QUEUE, "mixed-worker");
+ try (JetStreamMessageBroker broker =
+ JetStreamMessageBroker.builder().connection(connection).build()) {
+ assumeTrue(
+ broker.capabilities().supportsDelayedEnqueue(),
+ "Skipping: connected NATS server does not support message scheduling (< "
+ + com.github.sonus21.rqueue.nats.internal.NatsProvisioner.SCHEDULING_MIN_VERSION
+ + ")");
+
+ // Step 1 — plain enqueue: creates stream with only the work subject, no sched flag
+ RqueueMessage immediate =
+ RqueueMessage.builder().id("imm-1").message("immediate").build();
+ broker.enqueue(q, immediate);
+
+ // Step 2 — delayed enqueue: must upgrade the stream (add sched wildcard + flag)
+ RqueueMessage delayed = RqueueMessage.builder()
+ .id("del-1")
+ .message("delayed")
+ .processAt(System.currentTimeMillis() + UPGRADE_DELAY_MS)
+ .build();
+ broker.enqueueWithDelay(q, delayed, UPGRADE_DELAY_MS);
+
+ // The immediate message must be available right away
+ List nowMsgs = broker.pop(q, "mixed-worker", 5, Duration.ofSeconds(2));
+ assertEquals(1, nowMsgs.size(), "Immediate message must be delivered right away");
+ assertEquals("immediate", nowMsgs.get(0).getMessage());
+ broker.ack(q, nowMsgs.get(0));
+
+ // The delayed message must NOT be visible yet (10 s away — stream upgrade takes < 1 s)
+ List beforeDelay = broker.pop(q, "mixed-worker", 5, Duration.ofMillis(500));
+ assertTrue(beforeDelay.isEmpty(), "Delayed message must not arrive before scheduled time");
+
+ Thread.sleep(UPGRADE_DELAY_MS + UPGRADE_BUFFER_MS);
+
+ // After the delay the scheduled message must arrive
+ List afterDelay = broker.pop(q, "mixed-worker", 5, Duration.ofSeconds(3));
+ assertEquals(1, afterDelay.size(), "Delayed message must be delivered after scheduled time");
+ assertEquals("delayed", afterDelay.get(0).getMessage());
+ broker.ack(q, afterDelay.get(0));
+ }
+ }
}
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java
index 4b6d3817..2aa9a912 100644
--- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java
@@ -286,10 +286,12 @@ void capabilities_schedulingSupported_supportsDelayedEnqueueIsTrue() {
/**
* JetStreamMessageBroker does not override {@code scheduleNext}; the SPI default calls
* {@code enqueueWithDelay(q, message, delay)}. Verify that path reaches JetStream publish
- * with the correct subject and that {@code Nats-Next-Deliver-Time} is set (ADR-51 header).
+ * to the scheduler subject ({@code .sched.}) with the correct ADR-51 headers:
+ * {@code Nats-Schedule: @at }, {@code Nats-Schedule-Target: }, and
+ * {@code Nats-Rollup: sub}.
*/
@Test
- void scheduleNext_delegatesToEnqueueWithDelay_setsDeliverTimeHeader() throws Exception {
+ void scheduleNext_delegatesToEnqueueWithDelay_setsScheduleHeaders() throws Exception {
Fixture f = newFixture(RqueueNatsConfig.defaults(), true);
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
.thenReturn(mock(PublishAck.class));
@@ -305,11 +307,31 @@ void scheduleNext_delegatesToEnqueueWithDelay_setsDeliverTimeHeader() throws Exc
// scheduleNext default: delayMs = max(0, processAt - now) → enqueueWithDelay
f.broker.scheduleNext(queueNamed("orders"), "ignored-key", m, 60L);
+ ArgumentCaptor subject = ArgumentCaptor.forClass(String.class);
ArgumentCaptor headers = ArgumentCaptor.forClass(Headers.class);
- verify(f.js, times(1)).publish(eq("rqueue.js.orders"), headers.capture(), any(byte[].class));
- assertNotNull(
- headers.getValue().getFirst("Nats-Next-Deliver-Time"),
- "ADR-51 deliver-time header must be present");
+ verify(f.js, times(1)).publish(subject.capture(), headers.capture(), any(byte[].class));
+
+ // Must publish to the scheduler subject, NOT the work subject
+ assertEquals(
+ "rqueue.js.orders.sched.pid-1",
+ subject.getValue(),
+ "Must publish to scheduler subject .sched.");
+
+ // Nats-Schedule header must carry the @at prefix and an RFC3339 UTC time
+ String schedule = headers.getValue().getFirst(JetStreamMessageBroker.HDR_SCHEDULE);
+ assertNotNull(schedule, "Nats-Schedule header must be present");
+ assertTrue(schedule.startsWith("@at "), "Nats-Schedule value must start with '@at '");
+
+ // Nats-Schedule-Target must point to the work subject
+ assertEquals(
+ "rqueue.js.orders",
+ headers.getValue().getFirst(JetStreamMessageBroker.HDR_SCHEDULE_TARGET),
+ "Nats-Schedule-Target must be the work subject");
+
+ // Nats-Rollup must be 'sub' for per-subject idempotent scheduling
+ assertEquals("sub", headers.getValue().getFirst("Nats-Rollup"), "Nats-Rollup must be 'sub'");
+
+ // Dedup key must encode the period identity
assertEquals(
"pid-1-at-" + processAt,
headers.getValue().getFirst("Nats-Msg-Id"),
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java
index fc6afab9..2da7ab4b 100644
--- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java
@@ -63,7 +63,8 @@ private static QueueDetail queue(String name) {
void setUp() {
EndpointRegistry.delete();
provisioner = mock(NatsProvisioner.class);
- when(provisioner.ensureConsumer(anyString(), anyString(), any(), anyLong(), anyLong()))
+ when(provisioner.ensureConsumer(
+ anyString(), anyString(), anyString(), any(), anyLong(), anyLong()))
.thenReturn("rqueue-consumer");
natsConfig = RqueueNatsConfig.defaults();
}
@@ -81,7 +82,8 @@ void producerMode_skipsConsumerProvisioningButStillEnsuresStream() {
verify(provisioner, times(1))
.ensureStream(eq(natsConfig.getStreamPrefix() + "orders"), any(), any(), any());
verify(provisioner, never())
- .ensureConsumer(anyString(), anyString(), any(Duration.class), anyLong(), anyLong());
+ .ensureConsumer(
+ anyString(), anyString(), anyString(), any(Duration.class), anyLong(), anyLong());
}
@Test
@@ -99,6 +101,7 @@ void consumerMode_provisionsBothStreamAndConsumer() {
.ensureConsumer(
eq(natsConfig.getStreamPrefix() + "orders"),
anyString(),
+ eq(natsConfig.getSubjectPrefix() + "orders"),
any(Duration.class),
anyLong(),
anyLong());
@@ -112,6 +115,7 @@ void nullRqueueConfig_treatedAsConsumerMode_provisionsBoth() {
validator.afterSingletonsInstantiated();
verify(provisioner, times(1))
- .ensureConsumer(anyString(), anyString(), any(Duration.class), anyLong(), anyLong());
+ .ensureConsumer(
+ anyString(), anyString(), anyString(), any(Duration.class), anyLong(), anyLong());
}
}
diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java
index 5497aabd..4b6abbbd 100644
--- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java
+++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java
@@ -16,6 +16,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -167,12 +168,16 @@ void ensureStream_streamAlreadyExists_skipsCreation() throws IOException, JetStr
StreamInfo existing = mock(StreamInfo.class);
StreamConfiguration existingCfg = mock(StreamConfiguration.class);
when(existingCfg.getRetentionPolicy()).thenReturn(io.nats.client.api.RetentionPolicy.WorkQueue);
+ // Stub getSubjects() so the provisioner sees the subject as already present and skips update
+ when(existingCfg.getSubjects()).thenReturn(Collections.singletonList("rqueue.js.orders"));
+ when(existingCfg.getAllowMsgSchedules()).thenReturn(false);
when(existing.getConfiguration()).thenReturn(existingCfg);
when(jsm.getStreamInfo("rqueue-js-orders")).thenReturn(existing);
provisioner.ensureStream("rqueue-js-orders", Collections.singletonList("rqueue.js.orders"));
verify(jsm, never()).addStream(any());
+ verify(jsm, never()).updateStream(any());
}
/**
@@ -196,6 +201,138 @@ void ensureStream_streamNotExist_createsStream() throws IOException, JetStreamAp
verify(jsm, times(1)).addStream(any(StreamConfiguration.class));
}
+ /**
+ * When the NATS server supports scheduling (≥ 2.12), streams must be created with
+ * {@code allowMessageSchedules=true} so the server accepts {@code Nats-Schedule}
+ * publish headers (ADR-51). Equivalent to: {@code nats stream add --allow-schedules}.
+ */
+ @Test
+ void ensureStream_schedulingSupported_setsAllowMessageSchedules()
+ throws IOException, JetStreamApiException {
+ // setUp() already wires serverInfo.isSameOrNewerThanVersion() → true (scheduling supported)
+ JetStreamApiException notFound = makeStreamNotFoundEx();
+ when(jsm.getStreamInfo("rqueue-js-orders")).thenThrow(notFound);
+
+ provisioner.ensureStream(
+ "rqueue-js-orders",
+ Collections.singletonList("rqueue.js.orders"),
+ QueueType.QUEUE,
+ null,
+ true);
+
+ verify(jsm, times(1)).addStream(argThat(cfg -> cfg.getAllowMsgSchedules()));
+ }
+
+ @Test
+ void ensureStream_schedulingNotSupported_doesNotSetAllowMessageSchedules()
+ throws IOException, JetStreamApiException {
+ // Build a provisioner backed by a server that does NOT support scheduling
+ Connection oldConn = mock(Connection.class);
+ KeyValueManagement oldKvm = mock(KeyValueManagement.class);
+ when(oldConn.keyValueManagement()).thenReturn(oldKvm);
+ io.nats.client.api.ServerInfo oldInfo = mock(io.nats.client.api.ServerInfo.class);
+ when(oldInfo.isSameOrNewerThanVersion(anyString())).thenReturn(false);
+ when(oldInfo.getVersion()).thenReturn("2.10.0");
+ when(oldConn.getServerInfo()).thenReturn(oldInfo);
+ NatsProvisioner oldProvisioner = new NatsProvisioner(oldConn, jsm, config);
+
+ JetStreamApiException notFound = makeStreamNotFoundEx();
+ when(jsm.getStreamInfo("rqueue-js-orders")).thenThrow(notFound);
+
+ // allowSchedules=true is requested but server doesn't support it → flag must NOT be set
+ oldProvisioner.ensureStream(
+ "rqueue-js-orders",
+ Collections.singletonList("rqueue.js.orders"),
+ QueueType.QUEUE,
+ null,
+ true);
+
+ verify(jsm, times(1)).addStream(argThat(cfg -> !cfg.getAllowMsgSchedules()));
+ }
+
+ /**
+ * If a stream was initially created without scheduling (allowSchedules=false) and a later call
+ * requests scheduling (allowSchedules=true), the provisioner must call updateStream() to add the
+ * flag rather than silently skipping because the stream is already in cache.
+ */
+ @Test
+ void ensureStream_upgradeScheduling_updatesExistingStream()
+ throws IOException, JetStreamApiException {
+ // First call: stream doesn't exist, created without scheduling
+ JetStreamApiException notFound = makeStreamNotFoundEx();
+ when(jsm.getStreamInfo("rqueue-js-orders"))
+ .thenThrow(notFound) // first call: not found → create
+ .thenReturn(mock(
+ StreamInfo.class,
+ inv -> { // second call: exists, no scheduling
+ String m = inv.getMethod().getName();
+ if ("getConfiguration".equals(m)) {
+ StreamConfiguration cfg = StreamConfiguration.builder()
+ .name("rqueue-js-orders")
+ .subjects(Collections.singletonList("rqueue.js.orders"))
+ .build();
+ return cfg;
+ }
+ return null;
+ }));
+
+ // First call: no scheduling
+ provisioner.ensureStream(
+ "rqueue-js-orders",
+ Collections.singletonList("rqueue.js.orders"),
+ QueueType.QUEUE,
+ null,
+ false);
+ // Second call (different provisioner instance to bypass cache): scheduling requested
+ NatsProvisioner p2 = new NatsProvisioner(connection, jsm, config);
+ p2.ensureStream(
+ "rqueue-js-orders",
+ Collections.singletonList("rqueue.js.orders"),
+ QueueType.QUEUE,
+ null,
+ true);
+
+ verify(jsm, times(1)).addStream(any(StreamConfiguration.class));
+ verify(jsm, times(1)).updateStream(argThat(cfg -> cfg.getAllowMsgSchedules()));
+ }
+
+ /**
+ * Concrete scenario: plain enqueue() creates the stream with only the work subject, then
+ * enqueueWithDelay() is called for the first time. The provisioner must update the stream to
+ * add BOTH the sched-wildcard subject AND the allow_msg_schedules flag in a single updateStream()
+ * call. Without the subject update, publishing to the sched subject would be rejected by NATS
+ * with "no stream matches subject".
+ */
+ @Test
+ void ensureStream_firstEnqueueThenDelay_addsSchedSubjectAndFlag()
+ throws IOException, JetStreamApiException {
+ String workSubject = "rqueue.js.orders";
+ String schedPattern = workSubject + ".sched.*";
+
+ // After enqueue() created the stream (work subject only, no scheduling flag)
+ StreamInfo existingInfo = mock(StreamInfo.class, inv -> {
+ if ("getConfiguration".equals(inv.getMethod().getName())) {
+ return StreamConfiguration.builder()
+ .name("rqueue-js-orders")
+ .subjects(Collections.singletonList(workSubject))
+ .build(); // allowMsgSchedules defaults to false
+ }
+ return null;
+ });
+ when(jsm.getStreamInfo("rqueue-js-orders")).thenReturn(existingInfo);
+
+ // enqueueWithDelay() calls ensureStream with the sched wildcard and allowSchedules=true
+ provisioner.ensureStream(
+ "rqueue-js-orders", Arrays.asList(workSubject, schedPattern), QueueType.QUEUE, null, true);
+
+ // Must call updateStream (never addStream — stream already exists)
+ verify(jsm, never()).addStream(any());
+ verify(jsm, times(1))
+ .updateStream(argThat(cfg -> cfg.getAllowMsgSchedules()
+ && cfg.getSubjects().contains(workSubject)
+ && cfg.getSubjects().contains(schedPattern)));
+ }
+
@Test
void ensureStream_calledTwice_onlyCallsNatsOnce() throws IOException, JetStreamApiException {
JetStreamApiException notFound = makeStreamNotFoundEx();
diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java
index 0f1fbb95..df4c6dc9 100644
--- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java
+++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java
@@ -21,6 +21,7 @@
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker;
+import com.github.sonus21.rqueue.nats.js.NatsDeadLetterBridgeRegistrar;
import com.github.sonus21.rqueue.nats.js.NatsStreamValidator;
import com.github.sonus21.rqueue.nats.kv.NatsKvBucketValidator;
import com.github.sonus21.rqueue.nats.metrics.NatsRqueueQueueMetricsProvider;
@@ -162,6 +163,19 @@ public NatsStreamValidator natsStreamValidator(
natsProvisioner, toBrokerConfig(props), rqueueConfigProvider.getIfAvailable());
}
+ /**
+ * Boot-time installer for the NATS-native dead-letter advisory bridge: for every registered
+ * queue, subscribes to {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES..}
+ * and republishes any message that exhausts {@code maxDeliver} onto the queue's DLQ stream.
+ * Complementary to the rqueue-level DLQ path ({@code PostProcessingHandler.moveToDlq}).
+ */
+ @Bean
+ @ConditionalOnMissingBean(NatsDeadLetterBridgeRegistrar.class)
+ public NatsDeadLetterBridgeRegistrar natsDeadLetterBridgeRegistrar(
+ MessageBroker broker, ObjectProvider rqueueConfigProvider) {
+ return new NatsDeadLetterBridgeRegistrar(broker, rqueueConfigProvider.getIfAvailable());
+ }
+
/**
* Bean form of the KV-bucket validator. Other NATS beans {@code @DependsOn} this name so it runs
* before they are constructed. The flag is sourced from {@link RqueueNatsProperties} —
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java
index 8b8e5078..288dce93 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java
@@ -15,12 +15,21 @@
*/
package com.github.sonus21.rqueue.spring.boot.integration;
+import io.nats.client.Connection;
+import io.nats.client.JetStreamApiException;
+import io.nats.client.JetStreamManagement;
+import io.nats.client.Nats;
+import java.io.IOException;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
/**
@@ -30,16 +39,21 @@
* when the {@code NATS_RUNNING} environment variable is set the tests assume an externally
* managed nats-server is reachable at {@code NATS_URL} (default {@code nats://127.0.0.1:4222})
* and skip Testcontainers entirely. CI sets {@code NATS_RUNNING=true} after starting nats-server
- * via apt; local dev leaves it unset and falls back to Testcontainers, which itself skips
- * gracefully when Docker isn't available.
+ * via apt; local dev leaves it unset and falls back to Testcontainers.
+ *
+ * When neither {@code NATS_RUNNING} is set nor Docker is available the entire test class is
+ * skipped via {@link org.junit.jupiter.api.Assumptions#assumeTrue} inside {@link #startNats()}.
+ * This avoids the {@code @Testcontainers(disabledWithoutDocker=true)} pitfall where the annotation
+ * silently disables tests even when {@code NATS_RUNNING=true} and Docker happens to be absent.
*
*
Subclasses declare their own {@code @SpringBootApplication} test config (typically excluding
* Redis auto-config, see {@link NatsBackendEndToEndIT} for the reference pattern) and any
* {@code @RqueueListener} beans they need.
*/
-@Testcontainers(disabledWithoutDocker = true)
abstract class AbstractNatsBootIT {
+ private static final Logger log = Logger.getLogger(AbstractNatsBootIT.class.getName());
+
static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null;
static final String EXTERNAL_NATS_URL =
@@ -52,6 +66,10 @@ static void startNats() {
if (USE_EXTERNAL_NATS || NATS != null) {
return;
}
+ Assumptions.assumeTrue(
+ DockerClientFactory.instance().isDockerAvailable(),
+ "Skipping: Docker is not available and NATS_RUNNING is not set — "
+ + "start nats-server locally or set NATS_RUNNING=true");
NATS = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine"))
.withCommand("-js")
.withExposedPorts(4222)
@@ -71,4 +89,64 @@ static void natsProps(DynamicPropertyRegistry r) {
});
}
}
+
+ /**
+ * URL of whichever NATS the test is targeting — the externally-managed server when
+ * {@code NATS_RUNNING} is set, otherwise the Testcontainers-managed instance. Subclasses use
+ * this to open a short-lived connection in {@code @BeforeAll} cleanup hooks (see
+ * {@link #deleteStreamsWithPrefix}).
+ */
+ static String activeNatsUrl() {
+ if (USE_EXTERNAL_NATS) {
+ return EXTERNAL_NATS_URL;
+ }
+ if (NATS == null) {
+ startNats();
+ }
+ return "nats://" + NATS.getHost() + ":" + NATS.getMappedPort(4222);
+ }
+
+ /**
+ * Wipe every JetStream stream whose name starts with {@code prefix}. Called from each subclass's
+ * {@code @BeforeAll} (parameterised on that subclass's per-class {@code stream-prefix}) so that
+ * stale streams left by an earlier test class on the same NATS server — or by an earlier rerun
+ * against a persistent JetStream directory — never leak into the new run.
+ *
+ *
Best-effort: a single failing delete logs a warning and the loop continues. The cleanup
+ * itself never fails the test.
+ */
+ static void deleteStreamsWithPrefix(String prefix) {
+ if (prefix == null || prefix.isEmpty()) {
+ return;
+ }
+ try (Connection c = Nats.connect(activeNatsUrl())) {
+ JetStreamManagement jsm = c.jetStreamManagement();
+ List names = jsm.getStreamNames();
+ if (names == null) {
+ return;
+ }
+ int deleted = 0;
+ for (String name : names) {
+ if (!name.startsWith(prefix)) {
+ continue;
+ }
+ try {
+ jsm.deleteStream(name);
+ deleted++;
+ } catch (IOException | JetStreamApiException e) {
+ log.log(
+ Level.WARNING, "Failed to delete leftover stream " + name + ": " + e.getMessage());
+ }
+ }
+ if (deleted > 0) {
+ log.log(
+ Level.INFO,
+ "Pre-test cleanup: deleted {0} stream(s) with prefix ''{1}''",
+ new Object[] {deleted, prefix});
+ }
+ } catch (IOException | InterruptedException | JetStreamApiException e) {
+ log.log(
+ Level.WARNING, "Stream cleanup with prefix '" + prefix + "' failed: " + e.getMessage());
+ }
+ }
}
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java
index 55d53780..cadadf01 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,10 +56,22 @@
*/
@SpringBootTest(
classes = NatsBackendEndToEndIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsBackendEndToEndIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsBackendEndToEndIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsBackendEndToEndIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-backendE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.backendE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java
index 3cdda5eb..6076017d 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,10 +42,22 @@
*/
@SpringBootTest(
classes = NatsConcurrencyE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsConcurrencyE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsConcurrencyE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsConcurrencyE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-concurrencyE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.concurrencyE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java
index e47b6896..2b089f16 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java
@@ -23,6 +23,7 @@
import io.nats.client.api.ConsumerInfo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,10 +43,22 @@
*/
@SpringBootTest(
classes = NatsConsumerNameOverrideE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsConsumerNameOverrideE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsConsumerNameOverrideE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsConsumerNameOverrideE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-consumerOvrE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.consumerOvrE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
@@ -60,7 +73,8 @@ void overriddenConsumerNameIsRegisteredOnTheStream() throws Exception {
enqueuer.enqueue("custom-consumer", "hello");
assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue();
- ConsumerInfo info = jsm.getConsumerInfo("rqueue-js-custom-consumer", "my-custom-consumer");
+ ConsumerInfo info =
+ jsm.getConsumerInfo(STREAM_PREFIX + "custom-consumer", "my-custom-consumer");
assertThat(info).isNotNull();
assertThat(info.getName()).isEqualTo("my-custom-consumer");
}
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java
index e7dddcbe..943a4536 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -45,12 +46,24 @@
*/
@SpringBootTest(
classes = NatsMultipleListenersOnSameQueueE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsMultipleListenersOnSameQueueE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsMultipleListenersOnSameQueueE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
@Disabled("Default JetStream retention=WorkQueue prevents true fan-out across multiple consumers; "
+ "enable once retention is configurable per queue or defaulted to Limits/Interest.")
class NatsMultipleListenersOnSameQueueE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-multiListenerE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.multiListenerE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java
index 6640ac9d..a366a7f1 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,10 +44,22 @@
*/
@SpringBootTest(
classes = NatsPriorityQueuesE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsPriorityQueuesE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsPriorityQueuesE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsPriorityQueuesE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-priorityE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.priorityE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java
index 596b5615..bb92cca0 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,10 +44,23 @@
*/
@SpringBootTest(
classes = NatsReactiveEnqueueE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.reactive.enabled=true",
+ "rqueue.nats.naming.stream-prefix=" + NatsReactiveEnqueueE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsReactiveEnqueueE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsReactiveEnqueueE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-reactiveEnqE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.reactiveEnqE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
ReactiveRqueueMessageEnqueuer reactiveEnqueuer;
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java
index a7b86256..a0de8e23 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java
@@ -19,12 +19,15 @@
import com.github.sonus21.rqueue.annotation.RqueueListener;
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
+import io.nats.client.Connection;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.StreamInfo;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,62 +35,128 @@
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;
/**
- * After a handler exhausts {@code numRetries}, JetStream emits a max-deliveries advisory and the
- * broker's {@code installDeadLetterBridge} dispatcher republishes the payload onto the DLQ
- * stream. Currently disabled because {@link
- * com.github.sonus21.rqueue.spring.boot.RqueueNatsAutoConfig} does not yet invoke
- * {@code JetStreamMessageBroker.installDeadLetterBridge(...)} during container start, so dead-
- * lettered messages never reach the DLQ stream. Enable this test once that wiring is added.
+ * Verifies the NATS-native dead-letter advisory bridge installed by
+ * {@link com.github.sonus21.rqueue.nats.js.NatsDeadLetterBridgeRegistrar}: when JetStream emits
+ * {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES..} for a queue's durable
+ * consumer, the registrar's dispatcher looks up the offending message by sequence number and
+ * republishes it onto the queue's DLQ stream ({@code }).
+ *
+ * Why a synthetic advisory. In normal rqueue flow the framework either acks
+ * (success / forced-discard / moveToDlq) or naks (retry) every delivery, so NATS sees a terminal
+ * action before {@code maxDeliver} elapses and the advisory never fires from a real handler — that
+ * path is covered by
+ * {@code NatsSchedulingAdvancedE2EIT#scheduledMessageExhaustsRetriesToDlq} via the rqueue-level
+ * DLQ ({@code PostProcessingHandler.moveToDlq}). The advisory bridge is a defensive net for cases
+ * outside rqueue's control (process crash mid-handler, or a handler that blocks past its
+ * visibility timeout AND past every retry while NATS keeps redelivering). Triggering that path
+ * end-to-end is racy and slow, so this test instead publishes a synthetic advisory matching the
+ * shape {@code nats-server 2.12} actually emits and asserts the dispatcher reacts: enqueues a
+ * payload, looks up its stream sequence, fakes the advisory, and waits for the DLQ stream to
+ * receive it.
*/
@SpringBootTest(
classes = NatsRetryAndDlqE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.nats.naming.stream-prefix=" + NatsRetryAndDlqE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsRetryAndDlqE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
-@Disabled("This test exercises the NATS-native advisory bridge path"
- + " (JetStreamMessageBroker.installDeadLetterBridge /"
- + " $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES), which is not yet wired by"
- + " RqueueNatsAutoConfig. The rqueue-level DLQ path (PostProcessingHandler → broker.moveToDlq)"
- + " is already tested in NatsSchedulingAdvancedE2EIT#scheduledMessageExhaustsRetriesToDlq and"
- + " works without this bridge. Enable this test once RqueueNatsAutoConfig provisions the"
- + " advisory dispatcher per queue during container start.")
class NatsRetryAndDlqE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-retryDlqE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.retryDlqE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
@Autowired
RqueueMessageEnqueuer enqueuer;
@Autowired
- FailingListener listener;
+ Connection natsConnection;
@Autowired
JetStreamManagement jsm;
+ @Autowired
+ BlockingListener listener;
+
@Test
- void exhaustedMessageLandsOnDlqStream() {
- enqueuer.enqueue("failing", "boom");
+ void advisoryBridgeRepublishesIntoDlqStream() throws Exception {
+ String stream = STREAM_PREFIX + "boom";
+ String dlqStream = stream + "-dlq";
+
+ // Publish via the normal enqueue path so the source stream + bridge get provisioned exactly
+ // the way they would in production. The blocking listener picks up the delivery but never
+ // returns, so the message stays in flight (un-acked) in the WorkQueue stream — which is the
+ // realistic state when JetStream actually fires the max-delivery advisory in production
+ // (handler hung, ack never sent). The long visibilityTimeout keeps NATS from redelivering
+ // during the test.
+ enqueuer.enqueue("boom", "marker-payload");
+
+ // Confirm the listener has the message in flight (proves the source stream still has it: in
+ // WorkQueue retention an unacked message stays in the stream until acked or AckWait expires).
+ assertThat(listener.received.await(20, TimeUnit.SECONDS))
+ .as("Listener must receive the marker payload before we synthesize the advisory")
+ .isTrue();
+
+ // Bridge installer provisioned the DLQ stream from boot (installDeadLetterBridge →
+ // provisionDlq).
+ Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+ assertThat(jsm.getStreamInfo(dlqStream)).isNotNull();
+ });
+ long sourceSeq = jsm.getStreamInfo(stream).getStreamState().getLastSequence();
- Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> listener.attempts.get() >= 2);
+ // Synthetic max-delivery advisory matching nats-server 2.12's payload shape: only stream_seq
+ // is required by the bridge's republish logic. The advisory subject must include
+ // .; consumer name is the rqueue default for this queue, which is
+ // -consumer (see QueueDetail#resolvedConsumerName).
+ String consumer = "boom-consumer";
+ String advisorySubject =
+ "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES." + stream + "." + consumer;
+ String advisoryJson = "{\"type\":\"io.nats.jetstream.advisory.v1.max_deliveries\","
+ + "\"stream\":\"" + stream + "\","
+ + "\"consumer\":\"" + consumer + "\","
+ + "\"stream_seq\":" + sourceSeq + ","
+ + "\"deliveries\":3}";
+ natsConnection.publish(advisorySubject, advisoryJson.getBytes(StandardCharsets.UTF_8));
+ natsConnection.flush(Duration.ofSeconds(2));
- Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
- StreamInfo dlq = jsm.getStreamInfo("rqueue-js-failing-dlq");
+ // The bridge's dispatcher should have republished the source message onto the DLQ stream.
+ Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
+ StreamInfo dlq = jsm.getStreamInfo(dlqStream);
assertThat(dlq.getStreamState().getMsgCount()).isGreaterThanOrEqualTo(1);
});
}
@SpringBootApplication(
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
+ @Import(BlockingListener.class)
static class TestApp {}
+ /**
+ * Receives the marker payload, signals the test, then blocks past the test's runtime so the
+ * message stays in flight (un-acked) in the WorkQueue source stream. That keeps it reachable via
+ * {@code jsm.getMessage(stream, seq)} — which is what the advisory bridge does on receipt — and
+ * mirrors the production scenario where the bridge fires (handler hung past
+ * {@code visibilityTimeout}, NATS still considers the message un-acked). The 2-minute
+ * visibility timeout prevents NATS from redelivering before the test completes.
+ */
@Component
- static class FailingListener {
- final AtomicInteger attempts = new AtomicInteger();
+ static class BlockingListener {
+ final CountDownLatch received = new CountDownLatch(1);
- @RqueueListener(value = "failing", numRetries = "2")
- void onMessage(String payload) {
- attempts.incrementAndGet();
- throw new RuntimeException("simulated failure for payload=" + payload);
+ @RqueueListener(value = "boom", visibilityTimeout = "120000")
+ void onMessage(String payload) throws Exception {
+ received.countDown();
+ Thread.sleep(120_000L); // far exceeds the test's 30s budget
}
}
}
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java
index dcf37646..6224b041 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -45,14 +46,41 @@
*/
@SpringBootTest(
classes = NatsScheduledMessageE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.reactive.enabled=true",
+ // Per-class prefix isolates this test's NATS streams from every other NATS-backed test
+ // running against the same nats-server (CI shares one instance across all classes, and a
+ // persistent JetStream dir survives reruns). Same queue name → distinct stream so we never
+ // inherit stale config or in-flight messages from an earlier class/run.
+ "rqueue.nats.naming.stream-prefix=" + NatsScheduledMessageE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsScheduledMessageE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsScheduledMessageE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-schedE2E-";
+ static final String SUBJECT_PREFIX = "rqueue.js.schedE2E.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
static final Duration DELAY = Duration.ofSeconds(3);
static final Duration NOT_YET_GUARD = Duration.ofMillis(800);
static final Duration TOTAL_WAIT = Duration.ofSeconds(12);
+ /**
+ * Longer delay for the "enqueue-first, then enqueueIn" regression test. The stream upgrade
+ * (updateStream round-trip + consumer re-creation) adds latency before the scheduler header is
+ * accepted, so we use 10 s to give the system enough headroom that the "not-yet" assertion
+ * cannot race with the scheduler firing.
+ */
+ static final Duration ETD_DELAY = Duration.ofSeconds(10);
+
+ static final Duration ETD_TOTAL_WAIT = Duration.ofSeconds(20);
+
@Autowired
NatsProvisioner natsProvisioner;
@@ -65,6 +93,9 @@ class NatsScheduledMessageE2EIT extends AbstractNatsBootIT {
@Autowired
ScheduledListener listener;
+ @Autowired
+ EtdListener etdListener;
+
@BeforeEach
void requireSchedulingSupport() {
assumeTrue(
@@ -88,6 +119,52 @@ void scheduledMessageIsDeliveredAfterDelay() throws Exception {
assertThat(listener.syncReceived).containsExactly("delayed-payload");
}
+ /**
+ * Regression test for the "enqueue-first, then enqueueIn" stream-upgrade path.
+ *
+ * When a plain {@link RqueueMessageEnqueuer#enqueue enqueue()} call happens before the first
+ * {@link RqueueMessageEnqueuer#enqueueIn enqueueIn()} call the stream is provisioned without
+ * the {@code allow_msg_schedules} flag and without the sched-wildcard subject. The provisioner
+ * must detect this on the delayed call and upgrade the stream in-place (add both the flag and the
+ * sched-wildcard subject via a single {@code updateStream()}) before publishing the scheduled
+ * message — otherwise NATS rejects the publish with "no stream matches subject".
+ *
+ *
Expected behaviour:
+ *
+ * - The immediate message is delivered right away (stream has it before any upgrade).
+ * - After the stream upgrade the scheduled message is held by JetStream until
+ * {@code ETD_DELAY} has passed.
+ * - Both messages arrive within {@code ETD_TOTAL_WAIT}.
+ *
+ */
+ @Test
+ void enqueueFirst_thenEnqueueIn_streamUpgradedAndBothDelivered() throws Exception {
+ // Step 1: plain enqueue — stream is created with only the work subject, no sched flag
+ enqueuer.enqueue("etd-e2e", "immediate");
+
+ // Step 2: delayed enqueue — provisioner must upgrade the stream in-place
+ enqueuer.enqueueIn("etd-e2e", "delayed", ETD_DELAY);
+
+ // Immediate message must arrive before the ETD_DELAY fires
+ assertThat(etdListener.immediateLatch.await(TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS))
+ .as("Immediate message must be delivered before delay fires")
+ .isTrue();
+ assertThat(etdListener.received)
+ .as("Only the immediate message must have arrived at this point")
+ .containsExactly("immediate");
+
+ // Delayed message must NOT be visible yet (delay is 10 s, assertion runs < 1 s after immediate)
+ assertThat(etdListener.received).doesNotContain("delayed");
+
+ // Both messages must eventually arrive within the generous total-wait window
+ assertThat(etdListener.allLatch.await(ETD_TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS))
+ .as("Both messages must be delivered within %s", ETD_TOTAL_WAIT)
+ .isTrue();
+ assertThat(etdListener.received)
+ .as("Both immediate and delayed messages must have been received")
+ .containsExactlyInAnyOrder("immediate", "delayed");
+ }
+
@Test
void reactiveScheduledMessageIsDeliveredAfterDelay() throws Exception {
// Reset latch for the reactive queue
@@ -107,9 +184,31 @@ void reactiveScheduledMessageIsDeliveredAfterDelay() throws Exception {
@SpringBootApplication(
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
- @Import(ScheduledListener.class)
+ @Import({ScheduledListener.class, EtdListener.class})
static class TestApp {}
+ /**
+ * Listener for the "enqueue-first, then enqueueIn" regression test queue.
+ *
+ * {@code immediateLatch} counts down on the very first message received (used to assert the
+ * immediate message arrived before the delay fires). {@code allLatch} counts down twice — once
+ * per message — and reaching zero signals that both the immediate and delayed messages were
+ * consumed.
+ */
+ @Component
+ static class EtdListener {
+ final CountDownLatch immediateLatch = new CountDownLatch(1);
+ final CountDownLatch allLatch = new CountDownLatch(2);
+ final List received = Collections.synchronizedList(new ArrayList<>());
+
+ @RqueueListener(value = "etd-e2e")
+ void onMessage(String payload) {
+ received.add(payload);
+ immediateLatch.countDown(); // first call unblocks; subsequent calls are no-ops
+ allLatch.countDown(); // each call decrements; reaches 0 when both arrive
+ }
+ }
+
@Component
static class ScheduledListener {
final CountDownLatch syncLatch = new CountDownLatch(1);
diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java
index 9647957f..1c38c0ab 100644
--- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java
+++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java
@@ -27,11 +27,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -62,10 +65,23 @@
*/
@SpringBootTest(
classes = NatsSchedulingAdvancedE2EIT.TestApp.class,
- properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"})
+ properties = {
+ "rqueue.backend=nats",
+ "rqueue.reactive.enabled=true",
+ "rqueue.nats.naming.stream-prefix=" + NatsSchedulingAdvancedE2EIT.STREAM_PREFIX,
+ "rqueue.nats.naming.subject-prefix=" + NatsSchedulingAdvancedE2EIT.SUBJECT_PREFIX
+ })
@Tag("nats")
class NatsSchedulingAdvancedE2EIT extends AbstractNatsBootIT {
+ static final String STREAM_PREFIX = "rqueue-js-schedAdv-";
+ static final String SUBJECT_PREFIX = "rqueue.js.schedAdv.";
+
+ @BeforeAll
+ static void wipeOwnedStreams() {
+ deleteStreamsWithPrefix(STREAM_PREFIX);
+ }
+
static final Duration DELAY = Duration.ofSeconds(3);
static final Duration PERIOD = Duration.ofSeconds(5);
static final Duration TOTAL_WAIT = Duration.ofSeconds(30);
@@ -157,17 +173,45 @@ void concurrentRetryOnDelayedMessageCompletesExactlyOnce() throws Exception {
// ---- 4. Concurrent retry on recurring message ------------------------------
+ /**
+ * With {@code concurrency=2} two pollers race for each periodic delivery; the JetStream
+ * dedup-key (per-message {@code Nats-Msg-Id} = {@code id@processAt}, set by
+ * {@code JetStreamMessageBroker.buildSchedulingHeaders}) guarantees each period is delivered
+ * exactly once. This test pins that invariant by tracking the distinct {@code processAt} values
+ * the listener actually saw and asserting:
+ *
+ *
+ * - at least {@code 3} distinct periods fired within {@code TOTAL_WAIT};
+ * - the total invocation count equals the number of distinct periods — i.e. no period was
+ * processed more than once. A regression in dedup keying would surface as
+ * {@code count > distinct.size()}.
+ *
+ */
@Test
void concurrentRetryOnRecurringMessageNoDuplicates() throws Exception {
assumeScheduling();
enqueuer.enqueuePeriodic("adv-conc-recur-e2e", "conc-recur", PERIOD);
- assertThat(concurrentRecurringListener.latch.await(TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS))
- .as("Recurring message with concurrency=2 should complete at least 2 periods")
+ assertThat(concurrentRecurringListener.distinctPeriodsLatch.await(
+ TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS))
+ .as(
+ "Recurring message with concurrency=2 should fire >= 3 distinct periods within %s",
+ TOTAL_WAIT)
.isTrue();
- assertThat(concurrentRecurringListener.count.get())
- .as("Each period must run at least twice (2 periods delivered)")
- .isGreaterThanOrEqualTo(2);
+ // Brief quiesce so a racing duplicate (if any) has time to land before we assert.
+ Thread.sleep(POST_SUCCESS_QUIESCE.toMillis());
+
+ int count = concurrentRecurringListener.count.get();
+ int distinct = concurrentRecurringListener.distinctProcessAts.size();
+ assertThat(distinct)
+ .as("Should have observed at least 3 distinct periodic processAt values")
+ .isGreaterThanOrEqualTo(3);
+ assertThat(count)
+ .as(
+ "Each distinct period must be processed exactly once: count=%d, distinct=%d "
+ + "— a discrepancy means JetStream dedup (Nats-Msg-Id=id@processAt) regressed",
+ count, distinct)
+ .isEqualTo(distinct);
}
// ---- 6. Long-running job: keep-alive via Job.updateVisibilityTimeout ------
@@ -282,18 +326,30 @@ void onMessage(String payload) {
}
/**
- * Listener for test 4: concurrency=2, periodic message; latch opens when 2 periods complete.
- * The dedup-key fix (id@processAt) ensures period 2+ is not silently dropped by JetStream.
+ * Listener for test 4: concurrency=2, periodic message. Tracks every delivery's
+ * {@code processAt} (read from the {@link Job#getRqueueMessage()} attached to the message
+ * headers) so the test can assert each period is processed exactly once.
+ *
+ * {@code count} = total handler invocations. {@code distinctProcessAts} = unique
+ * {@code processAt} values seen. With correct JetStream dedup keying the two are equal; if a
+ * duplicate slips through (e.g. a regression in the {@code id@processAt} {@code Nats-Msg-Id})
+ * we observe {@code count > distinctProcessAts.size()}.
*/
@Component
static class ConcurrentRetryRecurringListener {
final AtomicInteger count = new AtomicInteger();
- final CountDownLatch latch = new CountDownLatch(2);
+ /** Concurrent set: ConcurrentHashMap-backed so concurrent inserts don't drop entries. */
+ final Set distinctProcessAts = ConcurrentHashMap.newKeySet();
+
+ final CountDownLatch distinctPeriodsLatch = new CountDownLatch(3);
@RqueueListener(value = "adv-conc-recur-e2e", concurrency = "2")
- void onMessage(String payload) {
+ void onMessage(String payload, @Header(RqueueMessageHeaders.JOB) Job job) {
count.incrementAndGet();
- latch.countDown();
+ long processAt = job.getRqueueMessage().getProcessAt();
+ if (distinctProcessAts.add(processAt)) {
+ distinctPeriodsLatch.countDown();
+ }
}
}