diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml index 1a284494..eb4518b4 100644 --- a/.github/workflows/java-ci.yaml +++ b/.github/workflows/java-ci.yaml @@ -137,10 +137,25 @@ jobs: sudo apt-get update sudo apt-get install -y redis-server redis-cli --version + sudo systemctl stop redis-server + redis-server --logfile /tmp/redis.log --daemonize yes + + - name: Log Redis info + run: | + redis-server --version + redis-cli INFO server - name: Run producer-only tests run: ./gradlew test -DincludeTags=producerOnly + - name: Upload redis-server log + if: always() + uses: actions/upload-artifact@v4 + with: + name: producer-redis-server + path: /tmp/redis.log + if-no-files-found: ignore + - name: Upload JaCoCo exec data if: always() uses: actions/upload-artifact@v4 @@ -189,10 +204,25 @@ jobs: sudo apt-get update sudo apt-get install -y redis-server redis-cli --version + sudo systemctl stop redis-server + redis-server --logfile /tmp/redis.log --daemonize yes + + - name: Log Redis info + run: | + redis-server --version + redis-cli INFO server - name: Run integration tests run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + - name: Upload redis-server log + if: always() + uses: actions/upload-artifact@v4 + with: + name: integration-redis-server + path: /tmp/redis.log + if-no-files-found: ignore + - name: Upload JaCoCo exec data if: always() uses: actions/upload-artifact@v4 @@ -245,12 +275,12 @@ jobs: - name: Setup Redis Cluster run: | mkdir 9000 9001 9002 9003 9004 9005 - printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf - printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf - printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf - printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf - printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf - printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf + printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9000.log" >> 9000/redis.conf + printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9001.log" >> 9001/redis.conf + printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9002.log" >> 9002/redis.conf + printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9003.log" >> 9003/redis.conf + printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9004.log" >> 9004/redis.conf + printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9005.log" >> 9005/redis.conf (cd 9000 && redis-server ./redis.conf) & (cd 9001 && redis-server ./redis.conf) & (cd 9002 && redis-server ./redis.conf) & @@ -260,9 +290,23 @@ jobs: sleep 30 yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1 + - name: Log Redis cluster info + run: | + redis-server --version + redis-cli -p 9000 INFO server | grep -E 'redis_version|os|tcp_port|uptime_in_seconds' + redis-cli -p 9000 CLUSTER INFO + - name: Run Redis cluster tests run: ./gradlew test -DincludeTags=redisCluster + - name: Upload redis-server logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: redis-server-cluster + path: /tmp/redis-*.log + if-no-files-found: ignore + - name: Upload JaCoCo exec data if: always() uses: actions/upload-artifact@v4 @@ -313,10 +357,25 @@ jobs: sudo apt-get update sudo apt-get install -y redis-server redis-cli --version + sudo systemctl stop redis-server + redis-server --logfile /tmp/redis.log --daemonize yes + + - name: Log Redis info + run: | + redis-server --version + redis-cli INFO server - name: Run reactive integration tests run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + - name: Upload redis-server log + if: always() + uses: actions/upload-artifact@v4 + with: + name: reactive-redis-server + path: /tmp/redis.log + if-no-files-found: ignore + - name: Upload JaCoCo exec data if: always() uses: actions/upload-artifact@v4 @@ -365,7 +424,7 @@ jobs: # instead of pulling a Testcontainers image. - name: Install nats-server run: | - NATS_VERSION=v2.10.22 + NATS_VERSION=v2.12.0 curl -sSL "https://github.com/nats-io/nats-server/releases/download/${NATS_VERSION}/nats-server-${NATS_VERSION}-linux-amd64.tar.gz" \ | tar -xz -C /tmp sudo mv "/tmp/nats-server-${NATS_VERSION}-linux-amd64/nats-server" /usr/local/bin/nats-server @@ -374,7 +433,7 @@ jobs: - name: Start nats-server run: | mkdir -p /tmp/jetstream - nohup nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 & + nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 & for i in $(seq 1 20); do if (echo > /dev/tcp/127.0.0.1/4222) 2>/dev/null; then echo "nats-server ready after ${i}s"; break @@ -382,6 +441,9 @@ jobs: sleep 1 done + - name: Log NATS server info + run: nats-server --version + - name: Run NATS tests env: NATS_RUNNING: "true" @@ -392,7 +454,7 @@ jobs: if: always() uses: actions/upload-artifact@v4 with: - name: nats-server-log + name: nats-server path: /tmp/nats.log if-no-files-found: ignore diff --git a/AGENTS.md b/AGENTS.md index a5815012..a582dcca 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,7 +31,7 @@ Never remove or increment the base version number. The human decides when the of # Memory Context -# [rqueue] recent context, 2026-05-09 2:28pm GMT+5:30 +# [rqueue] recent context, 2026-05-09 9:08pm GMT+5:30 No previous sessions found. \ No newline at end of file diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index fbb29f0d..ad75f4d1 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -122,10 +122,15 @@ private void updateCounter(boolean fail) { if (Objects.isNull(counter)) { return; } + // Pass the consumer name so the counter can route increments to the per-consumer entry + // when multiple @RqueueListener methods share a queue with distinct consumerName overrides; + // null/empty (no override) routes to the bare-queue counter unchanged. + String queueName = job.getQueueDetail().getName(); + String consumerName = job.getQueueDetail().getConsumerName(); if (fail) { - counter.updateFailureCount(job.getQueueDetail().getName()); + counter.updateFailureCount(queueName, consumerName); } else { - counter.updateExecutionCount(job.getQueueDetail().getName()); + counter.updateExecutionCount(queueName, consumerName); } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java index 35173d19..e00336af 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/QueueCounter.java @@ -29,6 +29,13 @@ /** * Queue counter counts the different types of events related to a queue. Failure and execution * count, it supports queue registrations. + * + *

Multi-consumer keying. Two {@code @RqueueListener} methods on the same queue with + * different {@code consumerName} overrides each produce a distinct {@link QueueDetail} and need + * their own counters. The maps are keyed by {@code queueName##consumerName} (or just + * {@code queueName} when no override is set) so the second registration does not silently + * overwrite the first. Increment lookups follow the same composite key — see + * {@link #updateFailureCount(String, String)}. */ public class QueueCounter { @@ -37,20 +44,49 @@ public class QueueCounter { private final Map queueNameToFailureCounter = new HashMap<>(); private final Map queueNameToExecutionCounter = new HashMap<>(); - private void updateCounter(Map map, String queueName) { - Counter counter = map.get(queueName); + private static String key(String queueName, String consumerName) { + return (consumerName == null || consumerName.isEmpty()) + ? queueName + : queueName + "##" + consumerName; + } + + private void updateCounter(Map map, String queueName, String consumerName) { + // Try the consumer-specific entry first; fall back to the bare queue-name entry so callers + // that don't yet pass a consumer name (older paths, single-consumer queues) still work. + Counter counter = map.get(key(queueName, consumerName)); + if (counter == null && consumerName != null && !consumerName.isEmpty()) { + counter = map.get(queueName); + } if (counter == null) { return; } counter.increment(); } + /** Backward-compatible single-arg increment; route to the bare-queue counter only. */ void updateFailureCount(String queueName) { - updateCounter(queueNameToFailureCounter, queueName); + updateCounter(queueNameToFailureCounter, queueName, null); } + /** Backward-compatible single-arg increment; route to the bare-queue counter only. */ void updateExecutionCount(String queueName) { - updateCounter(queueNameToExecutionCounter, queueName); + updateCounter(queueNameToExecutionCounter, queueName, null); + } + + /** + * Consumer-aware increment: increments the counter registered for + * {@code (queueName, consumerName)}, falling back to the bare {@code queueName} counter when no + * consumer-scoped entry exists. Use this from + * {@link com.github.sonus21.rqueue.listener.RqueueExecutor} (which has the {@link QueueDetail}) + * so multi-consumer queues keep accurate per-consumer counts. + */ + void updateFailureCount(String queueName, String consumerName) { + updateCounter(queueNameToFailureCounter, queueName, consumerName); + } + + /** Consumer-aware execution-count increment. See {@link #updateFailureCount(String, String)}. */ + void updateExecutionCount(String queueName, String consumerName) { + updateCounter(queueNameToExecutionCounter, queueName, consumerName); } void registerQueue( @@ -58,19 +94,20 @@ void registerQueue( Tags queueTags, MeterRegistry registry, QueueDetail queueDetail) { + String mapKey = key(queueDetail.getName(), queueDetail.getConsumerName()); if (metricsProperties.countFailure()) { Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(FAILURE_COUNT)) .tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName())) .description("Failure count"); Counter counter = builder.register(registry); - queueNameToFailureCounter.put(queueDetail.getName(), counter); + queueNameToFailureCounter.put(mapKey, counter); } if (metricsProperties.countExecution()) { Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(EXECUTION_COUNT)) .tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName())) .description("Task execution count"); Counter counter = builder.register(registry); - queueNameToExecutionCounter.put(queueDetail.getName(), counter); + queueNameToExecutionCounter.put(mapKey, counter); } } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java index 7da81462..2d7d3505 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java @@ -38,4 +38,14 @@ public void updateFailureCount(String queueName) { public void updateExecutionCount(String queueName) { queueCounter.updateExecutionCount(queueName); } + + @Override + public void updateFailureCount(String queueName, String consumerName) { + queueCounter.updateFailureCount(queueName, consumerName); + } + + @Override + public void updateExecutionCount(String queueName, String consumerName) { + queueCounter.updateExecutionCount(queueName, consumerName); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java index f862e8cc..33be981e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java @@ -35,6 +35,14 @@ public class RqueueMetrics implements RqueueMetricsRegistry { static final String QUEUE_KEY = "key"; + /** + * Tag added when a {@link QueueDetail} declares a {@code consumerName} override. Without this + * tag, two {@code @RqueueListener} methods on the same queue with different consumer names + * register gauges with identical (name, tag-set) pairs and Micrometer silently keeps only the + * first — losing the second consumer's metrics entirely. + */ + static final String CONSUMER_KEY = "consumer"; + private static final String QUEUE_SIZE = "queue.size"; private static final String SCHEDULED_QUEUE_SIZE = "scheduled.queue.size"; private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size"; @@ -58,17 +66,31 @@ private void monitor() { for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) { Tags queueTags = Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName()); + // When a queue carries multiple consumers (multiple @RqueueListener with distinct + // consumerName overrides), each gets its own QueueDetail. Without a `consumer` tag the + // gauges would share the same (name, tags) and Micrometer would drop all but the first. + String consumerName = queueDetail.getConsumerName(); + boolean hasConsumerOverride = consumerName != null && !consumerName.isEmpty(); + if (hasConsumerOverride) { + queueTags = queueTags.and(CONSUMER_KEY, consumerName); + } Gauge.builder( metricsProperties.getMetricName(QUEUE_SIZE), queueDetail, - c -> queueMetricsProvider.getPendingMessageCount(queueDetail.getName())) + c -> hasConsumerOverride + ? queueMetricsProvider.getPendingMessageCountByConsumer( + queueDetail.getName(), consumerName) + : queueMetricsProvider.getPendingMessageCount(queueDetail.getName())) .tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName())) .description("The number of entries in this queue") .register(meterRegistry); Gauge.builder( metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE), queueDetail, - c -> queueMetricsProvider.getProcessingMessageCount(queueDetail.getName())) + c -> hasConsumerOverride + ? queueMetricsProvider.getProcessingMessageCountByConsumer( + queueDetail.getName(), consumerName) + : queueMetricsProvider.getProcessingMessageCount(queueDetail.getName())) .tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName())) .description("The number of entries in the processing queue") .register(meterRegistry); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java index 119625ff..db005ffc 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java @@ -21,4 +21,21 @@ public interface RqueueMetricsCounter { void updateFailureCount(String queueName); void updateExecutionCount(String queueName); + + /** + * Consumer-aware failure increment. When a queue carries multiple {@code @RqueueListener} + * methods with distinct {@code consumerName} overrides, each consumer has its own counter + * registered under {@code (queueName, consumerName)}; calling the bare-queue overload would + * route every increment to the same (last-registered) counter and silently lose per-consumer + * counts. Defaults to the queue-level path so callers that don't have a consumer name keep + * working unchanged. + */ + default void updateFailureCount(String queueName, String consumerName) { + updateFailureCount(queueName); + } + + /** Consumer-aware execution increment. See {@link #updateFailureCount(String, String)}. */ + default void updateExecutionCount(String queueName, String consumerName) { + updateExecutionCount(queueName); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java index f622541c..4131a936 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java @@ -72,4 +72,25 @@ default long getScheduledMessageCount(String queueName, String priority) { default long getProcessingMessageCount(String queueName, String priority) { return getProcessingMessageCount(queueName); } + + /** + * Per-consumer variant of {@link #getPendingMessageCount(String)}. When two + * {@code @RqueueListener} methods on the same queue declare different {@code consumerName} + * overrides, each gets its own QueueDetail and its own metric registration; backends that can + * report per-consumer pending depth (e.g. NATS JetStream Limits/Interest streams or any + * fan-out broker) should override this. The default delegates to the queue-level call so + * single-consumer queues, and backends without per-consumer state, behave unchanged. + * + * @param queueName user-facing queue name + * @param consumerName consumer-name override from {@code @RqueueListener(consumerName=...)}; + * {@code null} or empty when no override is set + */ + default long getPendingMessageCountByConsumer(String queueName, String consumerName) { + return getPendingMessageCount(queueName); + } + + /** Per-consumer variant of {@link #getProcessingMessageCount(String)}. See related javadoc. */ + default long getProcessingMessageCountByConsumer(String queueName, String consumerName) { + return getProcessingMessageCount(queueName); + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java index 4f15e2df..39510c11 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java @@ -89,4 +89,75 @@ void updateFailureCount() { void updateExecutionCount() { validateCountStatistics(TestUtils.createQueueDetail("scheduled-queue", 900000L), "success"); } + + /** + * Regression: two QueueDetails on the same queue with different consumerName overrides each + * register their own counter, and a consumer-aware {@code updateXxxCount} routes to the + * correct one. Before the fix, both registrations collided on {@code map.put(queueName, ...)} + * and only the last consumer's counter was reachable; calls without a consumer name silently + * lost increments for any consumer that wasn't last to register. + */ + @Test + void multiConsumerOnSameQueueRoutesToCorrectCounter() { + metricsProperties.getCount().setExecution(true); + metricsProperties.getCount().setFailure(true); + MeterRegistry registry = new SimpleMeterRegistry(); + QueueCounter counter = new QueueCounter(); + + QueueDetail qA = QueueDetail.builder() + .name("multi") + .queueName("__rq::queue::multi") + .processingQueueName("__rq::p-queue::multi") + .scheduledQueueName("__rq::d-queue::multi") + .completedQueueName("__rq::c-queue::multi") + .consumerName("consumer-a") + .numRetry(3) + .visibilityTimeout(900000L) + .priorityGroup("") + .concurrency(new com.github.sonus21.rqueue.models.Concurrency(-1, -1)) + .active(true) + .build(); + QueueDetail qB = QueueDetail.builder() + .name("multi") + .queueName("__rq::queue::multi") + .processingQueueName("__rq::p-queue::multi") + .scheduledQueueName("__rq::d-queue::multi") + .completedQueueName("__rq::c-queue::multi") + .consumerName("consumer-b") + .numRetry(3) + .visibilityTimeout(900000L) + .priorityGroup("") + .concurrency(new com.github.sonus21.rqueue.models.Concurrency(-1, -1)) + .active(true) + .build(); + + counter.registerQueue( + metricsProperties, Tags.of("queue", "multi", "consumer", "consumer-a"), registry, qA); + counter.registerQueue( + metricsProperties, Tags.of("queue", "multi", "consumer", "consumer-b"), registry, qB); + + counter.updateExecutionCount("multi", "consumer-a"); + counter.updateExecutionCount("multi", "consumer-a"); + counter.updateExecutionCount("multi", "consumer-b"); + counter.updateFailureCount("multi", "consumer-b"); + + Tags tagsA = Tags.of("queue", "multi", "consumer", "consumer-a", QUEUE_KEY, qA.getQueueName()); + Tags tagsB = Tags.of("queue", "multi", "consumer", "consumer-b", QUEUE_KEY, qB.getQueueName()); + + assertEquals( + 2.0, + registry.get(EXECUTION_COUNT).tags(tagsA).counter().count(), + 0.0, + "consumer-a execution count"); + assertEquals( + 1.0, + registry.get(EXECUTION_COUNT).tags(tagsB).counter().count(), + 0.0, + "consumer-b execution count"); + assertEquals( + 1.0, + registry.get(FAILURE_COUNT).tags(tagsB).counter().count(), + 0.0, + "consumer-b failure count"); + } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java index 76faf458..87dca835 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.time.Duration; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -53,8 +52,9 @@ public class NatsProvisioner { private final ConcurrentHashMap kvCache = new ConcurrentHashMap<>(); private final ConcurrentHashMap kvLocks = new ConcurrentHashMap<>(); - // stream name → provisioned (set membership acts as the boolean flag) - private final Set streamsDone = ConcurrentHashMap.newKeySet(); + // stream name → schedulingEnabled (true if the stream was created/updated with + // allowMessageSchedules) + private final ConcurrentHashMap streamsDone = new ConcurrentHashMap<>(); private final ConcurrentHashMap streamLocks = new ConcurrentHashMap<>(); // "streamName/requestedConsumerName" → actual consumer name (may differ for stale-rebind) @@ -63,7 +63,7 @@ public class NatsProvisioner { /** * Minimum NATS server version that supports server-side message scheduling via the - * {@code Nats-Next-Deliver-Time} JetStream publish header (ADR-51). + * {@code Nats-Schedule} JetStream publish header (ADR-51). */ public static final String SCHEDULING_MIN_VERSION = "2.12.0"; @@ -146,12 +146,18 @@ public KeyValue ensureKv(String bucketName, Duration ttl) * use {@link #ensureStream(String, List, QueueType)} instead. */ public void ensureStream(String streamName, List subjects) { - ensureStream(streamName, subjects, QueueType.QUEUE, null); + ensureStream(streamName, subjects, QueueType.QUEUE, null, false); } - /** See {@link #ensureStream(String, List, QueueType, String)}. */ + /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */ public void ensureStream(String streamName, List subjects, QueueType queueType) { - ensureStream(streamName, subjects, queueType, null); + ensureStream(streamName, subjects, queueType, null, false); + } + + /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */ + public void ensureStream( + String streamName, List subjects, QueueType queueType, String description) { + ensureStream(streamName, subjects, queueType, description, false); } /** @@ -168,20 +174,35 @@ public void ensureStream(String streamName, List subjects, QueueType que * {@code nats stream info}). Callers should pass the rqueue queue name so operators can map a * stream back to the queue that created it; pass {@code null} to skip. * + *

{@code allowSchedules} must be {@code true} when the stream will receive messages published + * with the {@code Nats-Schedule} header (ADR-51). Only callers that perform delayed + * enqueue should pass {@code true}; regular enqueue callers should pass {@code false} (or use the + * shorter overloads). Equivalent to the CLI flag {@code --allow-schedules}. + * *

Hits the NATS backend at most once per stream name per process lifetime; subsequent calls - * return immediately from the in-process cache. If the stream already exists with a different + * return immediately from the in-process cache. If {@code allowSchedules=true} is later requested + * for a stream that was previously created without that flag, the stream is updated in place via + * {@link JetStreamManagement#updateStream}. If the stream already exists with a different * retention policy, a WARNING is logged and the existing config is left untouched. */ public void ensureStream( - String streamName, List subjects, QueueType queueType, String description) { - if (streamsDone.contains(streamName)) { + String streamName, + List subjects, + QueueType queueType, + String description, + boolean allowSchedules) { + // Fast-path: already provisioned with at least as many capabilities as requested. + Boolean cached = streamsDone.get(streamName); + if (cached != null && (cached || !allowSchedules)) { return; } Object lock = streamLocks.computeIfAbsent(streamName, k -> new Object()); synchronized (lock) { - if (streamsDone.contains(streamName)) { + cached = streamsDone.get(streamName); + if (cached != null && (cached || !allowSchedules)) { return; } + boolean enableSchedules = allowSchedules && schedulingSupported; try { StreamInfo existing = safeGetStreamInfo(streamName); RetentionPolicy desired = @@ -199,6 +220,11 @@ public void ensureStream( .storageType(sd.getStorage()) .retentionPolicy(desired) .compressionOption(CompressionOption.S2); + if (enableSchedules) { + // Enable server-side message scheduling (ADR-51 / Nats-Schedule header). + // Equivalent to: nats stream add MY_STREAM --allow-schedules + b.allowMessageSchedules(true); + } if (description != null && !description.isEmpty()) { b.description(description); } @@ -223,12 +249,46 @@ public void ensureStream( + " — leaving existing config in place.", new Object[] {streamName, actual, desired}); } + // Check whether new subjects need to be merged in (e.g. the sched wildcard added by + // enqueueWithDelay after the stream was originally created by a plain enqueue call). + java.util.List existingSubjects = existing.getConfiguration().getSubjects(); + java.util.Set existingSet = existingSubjects != null + ? new java.util.HashSet<>(existingSubjects) + : new java.util.HashSet<>(); + boolean needsSubjectUpdate = subjects.stream().anyMatch(s -> !existingSet.contains(s)); + boolean needsFlagUpdate = + enableSchedules && !existing.getConfiguration().getAllowMsgSchedules(); + + if (needsFlagUpdate || needsSubjectUpdate) { + // Merge: keep all existing subjects and append new ones (never remove). + java.util.LinkedHashSet merged = new java.util.LinkedHashSet<>(existingSet); + merged.addAll(subjects); + StreamConfiguration.Builder upd = StreamConfiguration.builder( + existing.getConfiguration()) + .subjects(new java.util.ArrayList<>(merged)); + if (needsFlagUpdate) { + upd.allowMessageSchedules(true); + } + jsm.updateStream(upd.build()); + if (needsFlagUpdate) { + log.log( + Level.INFO, + "Stream ''{0}'' updated to enable message scheduling (ADR-51).", + streamName); + } + if (needsSubjectUpdate) { + log.log( + Level.INFO, + "Stream ''{0}'' updated with additional subjects: {1}.", + new Object[] {streamName, subjects}); + } + } } } catch (IOException | JetStreamApiException e) { throw new RqueueNatsException( "Failed to ensure stream '" + streamName + "' for subjects " + subjects, e); } - streamsDone.add(streamName); + streamsDone.put(streamName, enableSchedules); } } @@ -236,14 +296,38 @@ public void ensureStream( * Ensure a durable pull consumer exists, returning the consumer name. * Hits the NATS backend at most once per (stream, consumer) pair per process lifetime. * - *

No filter subject is set on the consumer: each queue already has its own dedicated stream - * with a single subject, so a filter would be redundant. More importantly, omitting the filter - * allows multiple independent consumer groups (fan-out) to coexist on the same stream — NATS - * rejects two consumers with the same filter subject (error 10100) regardless of retention type. + *

Overload without a filter subject: used when the stream has only the work subject so the + * filter would be redundant, or when multiple independent consumer groups (fan-out) must coexist + * on a Limits-retention stream (NATS rejects two consumers with the same filter subject, error + * 10100). For WorkQueue streams that also carry scheduler subjects ({@code .sched.*}) a filter + * subject MUST be supplied via + * {@link #ensureConsumer(String, String, String, Duration, long, long)} so the consumer only + * receives work-subject messages and does not accidentally pick up scheduler entries. + */ + public String ensureConsumer( + String streamName, + String consumerName, + Duration ackWait, + long maxDeliver, + long maxAckPending) { + return ensureConsumer(streamName, consumerName, null, ackWait, maxDeliver, maxAckPending); + } + + /** + * Ensure a durable pull consumer exists with an optional subject filter, returning the consumer + * name. Hits the NATS backend at most once per (stream, consumer) pair per process lifetime. + * + *

{@code filterSubject} — when non-null, sets the consumer's filter subject so it only + * receives messages published to that subject. Required when the stream carries both work subjects + * and scheduler subjects ({@code .sched.*}): without a filter the consumer reads scheduler + * entries before the scheduled time, delivering the message early. Pass {@code null} for streams + * that do not use NATS scheduling, or where fan-out across multiple consumers is needed (Limits + * retention). */ public String ensureConsumer( String streamName, String consumerName, + String filterSubject, Duration ackWait, long maxDeliver, long maxAckPending) { @@ -258,8 +342,8 @@ public String ensureConsumer( if (cached != null) { return cached; } - String actual = - doEnsureConsumer(streamName, consumerName, ackWait, maxDeliver, maxAckPending); + String actual = doEnsureConsumer( + streamName, consumerName, filterSubject, ackWait, maxDeliver, maxAckPending); consumerCache.put(cacheKey, actual); return actual; } @@ -268,6 +352,7 @@ public String ensureConsumer( private String doEnsureConsumer( String streamName, String consumerName, + String filterSubject, Duration ackWait, long maxDeliver, long maxAckPending) { @@ -295,16 +380,20 @@ private String doEnsureConsumer( throw new RqueueNatsException("Consumer '" + consumerName + "' on stream '" + streamName + "' does not exist and autoCreateConsumers=false"); } - jsm.addOrUpdateConsumer( - streamName, - ConsumerConfiguration.builder() - .durable(consumerName) - .ackPolicy(AckPolicy.Explicit) - .deliverPolicy(DeliverPolicy.All) - .ackWait(ackWait) - .maxDeliver(maxDeliver) - .maxAckPending(maxAckPending) - .build()); + ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder() + .durable(consumerName) + .ackPolicy(AckPolicy.Explicit) + .deliverPolicy(DeliverPolicy.All) + .ackWait(ackWait) + .maxDeliver(maxDeliver) + .maxAckPending(maxAckPending); + if (filterSubject != null && !filterSubject.isEmpty()) { + // Filter to the work subject only so that scheduler entries (published to + // .sched.*) are not delivered to this consumer before the scheduled time. + // The NATS scheduler fires the triggered message to workSubject when the time arrives. + ccBuilder.filterSubject(filterSubject); + } + jsm.addOrUpdateConsumer(streamName, ccBuilder.build()); return consumerName; } catch (JetStreamApiException e) { throw new RqueueNatsException( diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index f42e2dcf..38dafea1 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -64,10 +64,25 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { private static final Logger log = Logger.getLogger(JetStreamMessageBroker.class.getName()); /** - * JetStream publish header used by NATS >= 2.12 to hold a message until a UTC delivery time - * (ADR-51). Value must be RFC 3339 UTC, e.g. {@code 2026-05-09T12:30:00Z}. + * NATS message-scheduling header (ADR-51, NATS >= 2.12). + * Value format: {@code @at }, e.g. {@code @at 2026-05-09T12:30:00Z}. + * The message must be published to a dedicated scheduler subject (not the work subject); + * the work subject is declared via {@link #HDR_SCHEDULE_TARGET}. */ - static final String HDR_NEXT_DELIVER_TIME = "Nats-Next-Deliver-Time"; + public static final String HDR_SCHEDULE = "Nats-Schedule"; + + /** + * Target subject to which NATS fires the work message when the schedule triggers. + * Must differ from the publish (scheduler) subject. + */ + public static final String HDR_SCHEDULE_TARGET = "Nats-Schedule-Target"; + + /** + * Suffix appended to the work subject to form the per-message scheduler subject, + * e.g. {@code rqueue.js.orders.sched.}. + * The stream must include {@code .sched.*} in its subjects list. + */ + public static final String SCHED_SUBJECT_SUFFIX = ".sched."; /** * Enrichment header: epoch-ms at which this message was scheduled to be processed. @@ -92,7 +107,7 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { */ private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50); - /** RFC 3339 UTC formatter for the {@code Nats-Next-Deliver-Time} header value. */ + /** RFC 3339 UTC formatter for the {@code Nats-Schedule} header value (@at prefix). */ private static final DateTimeFormatter RFC3339_UTC = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC); @@ -197,6 +212,24 @@ private String dlqSubjectFor(QueueDetail q) { return subjectFor(q) + config.getDlqSubjectSuffix(); } + /** + * Scheduler subject for a single delayed message: {@code .sched.}. + * The NATS scheduler fires a publish to the work subject when the schedule triggers. + * Each message gets its own unique subject so the rollup-sub only replaces itself, + * never another message's schedule entry. + */ + private String schedSubjectFor(QueueDetail q, String msgId) { + return subjectFor(q) + SCHED_SUBJECT_SUFFIX + msgId; + } + + /** + * Wildcard pattern covering all scheduler subjects for a queue, used when registering + * the stream's subject list. E.g. {@code rqueue.js.orders.sched.*}. + */ + static String schedSubjectPattern(String workSubject) { + return workSubject + SCHED_SUBJECT_SUFFIX + "*"; + } + /** Stream description shown in {@code nats stream info} so operators can map back to rqueue. */ private static String streamDescription(QueueDetail q) { return "rqueue queue: " + q.getName(); @@ -295,13 +328,19 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) { + NatsProvisioner.SCHEDULING_MIN_VERSION + "+ or use the Redis backend for delayed messages."); } - String subject = subjectFor(q); + String workSubject = subjectFor(q); String stream = streamFor(q); - provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q)); - Headers headers = buildSchedulingHeaders(m, delayMs); + provisioner.ensureStream( + stream, + List.of(workSubject, schedSubjectPattern(workSubject)), + q.getType(), + streamDescription(q), + true); + String schedSubject = schedSubjectFor(q, m.getId()); + Headers headers = buildSchedulingHeaders(m, delayMs, workSubject); try { byte[] payload = serdes.serialize(m); - js.publish(subject, headers, payload); + js.publish(schedSubject, headers, payload); } catch (IOException | JetStreamApiException e) { throw new RqueueNatsException( "Failed to enqueue scheduled message id=" @@ -309,7 +348,7 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) { + " queue=" + q.getName() + " subject=" - + subject, + + schedSubject, e); } catch (RuntimeException e) { throw new RqueueNatsException( @@ -318,7 +357,7 @@ public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) { + " queue=" + q.getName() + " subject=" - + subject, + + schedSubject, e); } } @@ -378,10 +417,15 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long + NatsProvisioner.SCHEDULING_MIN_VERSION + "+ or use the Redis backend for delayed messages.")); } - String subject = subjectFor(q); + String workSubject = subjectFor(q); String stream = streamFor(q); try { - provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q)); + provisioner.ensureStream( + stream, + List.of(workSubject, schedSubjectPattern(workSubject)), + q.getType(), + streamDescription(q), + true); } catch (Exception e) { return Mono.error(new RqueueNatsException( "Failed to provision stream for reactive scheduled enqueue id=" @@ -390,7 +434,8 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long + q.getName(), e)); } - Headers headers = buildSchedulingHeaders(m, delayMs); + String schedSubject = schedSubjectFor(q, m.getId()); + Headers headers = buildSchedulingHeaders(m, delayMs, workSubject); byte[] payload; try { payload = serdes.serialize(m); @@ -401,10 +446,10 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long + " queue=" + q.getName() + " subject=" - + subject, + + schedSubject, e)); } - return Mono.fromFuture(() -> js.publishAsync(subject, headers, payload)) + return Mono.fromFuture(() -> js.publishAsync(schedSubject, headers, payload)) .onErrorMap(e -> e instanceof RqueueNatsException ? e : new RqueueNatsException( @@ -413,7 +458,7 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long + " queue=" + q.getName() + " subject=" - + subject, + + schedSubject, e)) .then(); } @@ -507,12 +552,14 @@ private List popInternal( String actualConsumerName = provisioner.ensureConsumer( stream, consumerName, + subject, // filter: consumer only receives work-subject messages, not sched entries ackWait, maxDeliver, config.getConsumerDefaults().getMaxAckPending()); PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, actualConsumerName); - // Consumer has no filter subject; pass null so the NATS client doesn't validate - // the subject against a (nonexistent) filter — SUB-90011 otherwise. + // The filter is set on the consumer itself; pass null here so the NATS client does not + // re-validate the subject against the consumer filter at subscribe time — SUB-90011 + // is thrown if the passed subject doesn't match the consumer's filter subject exactly. return js.subscribe(null, opts); } catch (IOException | JetStreamApiException e) { throw new RqueueNatsException( @@ -1002,7 +1049,21 @@ public Capabilities capabilities() { * {@link RqueueMessage#setPeriod} if the deserialized payload lacks it. * */ - private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs) { + /** + * Build NATS message-scheduling headers for a delayed publish (ADR-51, NATS >= 2.12). + * + *

The message is published to a dedicated scheduler subject + * ({@code .sched.}). When the schedule triggers, NATS fires a + * JetStream publish to {@code workSubject} so consumers see it only after the delay. + * + *

Required headers (confirmed against nats-server 2.12.8): + *

+ */ + private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs, String workSubject) { Headers headers = new Headers(); if (m.getId() != null) { // Dedup key: id-at-processAt for scheduled messages (processAt > 0). @@ -1018,8 +1079,13 @@ private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs) { } long deliverAtMs = m.getProcessAt() > 0 ? m.getProcessAt() : System.currentTimeMillis() + delayMs; - String deliverAt = RFC3339_UTC.format(Instant.ofEpochMilli(deliverAtMs)); - headers.add(HDR_NEXT_DELIVER_TIME, deliverAt); + // "@at " prefix is required — bare RFC3339 is not recognised by the server scheduler. + String deliverAt = "@at " + RFC3339_UTC.format(Instant.ofEpochMilli(deliverAtMs)); + headers.add(HDR_SCHEDULE, deliverAt); + headers.add(HDR_SCHEDULE_TARGET, workSubject); + // Rollup-sub: replaces any existing schedule entry for this scheduler subject so that + // re-enqueue of the same message ID at the same processAt is idempotent. + headers.add("Nats-Rollup", "sub"); // Enrichment headers — readable on pop without deserializing the payload headers.add(HDR_PROCESS_AT, String.valueOf(deliverAtMs)); if (m.isPeriodic()) { diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java new file mode 100644 index 00000000..ab0bfcc3 --- /dev/null +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsDeadLetterBridgeRegistrar.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.nats.js; + +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.EndpointRegistry; +import com.github.sonus21.rqueue.core.spi.MessageBroker; +import com.github.sonus21.rqueue.listener.QueueDetail; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.SmartInitializingSingleton; + +/** + * Bootstrap-time installer for the NATS-native dead-letter advisory bridge. For every active queue + * registered in {@link EndpointRegistry}, calls + * {@link JetStreamMessageBroker#installDeadLetterBridge(QueueDetail, String)} so that messages + * exceeding {@code maxDeliver} on the durable consumer are republished onto the queue's DLQ + * stream via the {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES} advisory subject. + * + *

This is the NATS-side equivalent of the Redis backend's {@code RqueueDeadLetterPublisher}: + * the rqueue post-processing handler already routes failed deliveries to a configured Rqueue-level + * DLQ ({@code @RqueueListener(deadLetterQueue=...)}); the advisory bridge registered here is an + * additional, independent path that catches messages whose handler exhausted retries without the + * post-processor noticing (e.g. listener restart, container shutdown mid-retry, JetStream-driven + * redelivery exhaustion outside the rqueue retry counter). + * + *

Lifecycle. Implements {@link SmartInitializingSingleton} so it runs after every + * {@code @RqueueListener} bean has registered with {@link EndpointRegistry} and the + * {@link com.github.sonus21.rqueue.nats.js.NatsStreamValidator} has provisioned the underlying + * streams — but before {@code SmartLifecycle.start()} spawns the message pollers, so the bridge + * is in place before the first delivery attempt. Implements {@link DisposableBean} so the + * advisory dispatchers are torn down on context shutdown. + * + *

Producer-only mode. When {@link RqueueConfig#isProducer()} is true the application + * has no listeners and therefore no consumers that could exhaust retries; the registrar exits + * early and installs nothing. + * + *

Backend gating. Only does its work when the active broker is a + * {@link JetStreamMessageBroker}; on Redis or other backends the bean simply no-ops, so it is safe + * to wire unconditionally from the NATS auto-config (which is itself gated on + * {@code rqueue.backend=nats}). + */ +public class NatsDeadLetterBridgeRegistrar implements SmartInitializingSingleton, DisposableBean { + + private static final Logger log = Logger.getLogger(NatsDeadLetterBridgeRegistrar.class.getName()); + + private final MessageBroker broker; + private final RqueueConfig rqueueConfig; + private final List bridges = new ArrayList<>(); + + public NatsDeadLetterBridgeRegistrar(MessageBroker broker, RqueueConfig rqueueConfig) { + this.broker = broker; + this.rqueueConfig = rqueueConfig; + } + + @Override + public void afterSingletonsInstantiated() { + if (rqueueConfig != null && rqueueConfig.isProducer()) { + log.log( + Level.FINE, + "NatsDeadLetterBridgeRegistrar: producer-only mode — skipping bridge installation"); + return; + } + if (!(broker instanceof JetStreamMessageBroker)) { + // Defensive — the bean is wired only by the NATS auto-config, but other backends could + // theoretically substitute a different MessageBroker via @Primary. + return; + } + JetStreamMessageBroker nb = (JetStreamMessageBroker) broker; + List queues = EndpointRegistry.getActiveQueueDetails(); + if (queues.isEmpty()) { + return; + } + int installed = 0; + for (QueueDetail q : queues) { + String consumerName = q.resolvedConsumerName(); + try { + bridges.add(nb.installDeadLetterBridge(q, consumerName)); + installed++; + } catch (RuntimeException e) { + // Best-effort: a single failure must not abort listener startup. The rqueue-level DLQ + // path (PostProcessingHandler.moveToDlq) still works regardless. + log.log( + Level.WARNING, + "Failed to install dead-letter advisory bridge for queue " + q.getName() + " consumer " + + consumerName + ": " + e.getMessage(), + e); + } + } + log.log( + Level.INFO, + "NatsDeadLetterBridgeRegistrar: installed {0} advisory bridge(s) across {1} queue(s)", + new Object[] {installed, queues.size()}); + } + + @Override + public void destroy() { + for (AutoCloseable c : bridges) { + try { + c.close(); + } catch (Exception ignore) { + // best-effort close; we are shutting down anyway + } + } + bridges.clear(); + } +} diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java index 89614506..d249348b 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java @@ -97,7 +97,7 @@ public void afterSingletonsInstantiated() { log.log( Level.INFO, "NatsStreamValidator: NATS message scheduling (ADR-51) is AVAILABLE — " - + "enqueueWithDelay will use the Nats-Next-Deliver-Time header."); + + "enqueueWithDelay will use the Nats-Schedule header."); } else { log.log( Level.WARNING, @@ -122,7 +122,7 @@ public void afterSingletonsInstantiated() { String mainSubject = config.getSubjectPrefix() + q.getName(); total += tryEnsure(failures, mainStream, mainSubject, q); if (!producerOnly) { - tryEnsureConsumer(failures, mainStream, q.resolvedConsumerName(), q, cd); + tryEnsureConsumer(failures, mainStream, q.resolvedConsumerName(), mainSubject, q, cd); } if (q.getPriority() != null) { @@ -182,13 +182,14 @@ private void tryEnsureConsumer( List failures, String streamName, String consumerName, + String filterSubject, QueueDetail q, RqueueNatsConfig.ConsumerDefaults cd) { Duration ackWait = JetStreamMessageBroker.resolveAckWait(q, config); long maxDeliver = JetStreamMessageBroker.resolveMaxDeliver(q, config); try { provisioner.ensureConsumer( - streamName, consumerName, ackWait, maxDeliver, cd.getMaxAckPending()); + streamName, consumerName, filterSubject, ackWait, maxDeliver, cd.getMaxAckPending()); } catch (RqueueNatsException e) { failures.add("consumer " + consumerName + " on " + streamName + ": " + rootCause(e)); } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java index 8e026f03..43e90f73 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/metrics/NatsRqueueQueueMetricsProvider.java @@ -23,8 +23,11 @@ import com.github.sonus21.rqueue.nats.RqueueNatsException; import io.nats.client.JetStreamApiException; import io.nats.client.JetStreamManagement; +import io.nats.client.api.ConsumerInfo; import io.nats.client.api.StreamInfo; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; /** * NATS JetStream {@link RqueueQueueMetricsProvider}. Pending messages map to the message count of @@ -36,6 +39,9 @@ */ public class NatsRqueueQueueMetricsProvider implements RqueueQueueMetricsProvider { + private static final Logger log = + Logger.getLogger(NatsRqueueQueueMetricsProvider.class.getName()); + private final JetStreamManagement jsm; private final RqueueNatsConfig config; @@ -99,4 +105,76 @@ public long getDeadLetterMessageCount(String queueName) { "Failed to read DLQ stream size for queue=" + queueName + " stream=" + dlqStream, e); } } + + /** + * Per-consumer pending depth: {@code ConsumerInfo.numPending}, the JetStream-tracked count of + * messages this durable has yet to deliver. Falls back to the queue-level stream count when + * {@code consumerName} is null/empty (no override) or when the consumer hasn't been provisioned + * yet (boot-time race), so dashboards never see a missing reading. + */ + @Override + public long getPendingMessageCountByConsumer(String queueName, String consumerName) { + if (consumerName == null || consumerName.isEmpty()) { + return getPendingMessageCount(queueName); + } + QueueDetail q; + try { + q = EndpointRegistry.get(queueName); + } catch (QueueDoesNotExist e) { + return 0L; + } + String stream = config.getStreamPrefix() + q.getName(); + try { + ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName); + return ci == null ? 0L : ci.getNumPending(); + } catch (JetStreamApiException e) { + // consumer or stream not yet provisioned — fall back to stream-level count rather than 0, + // so the gauge reads the same as the bare-queue overload during the bootstrap window. + log.log( + Level.FINE, + "Consumer-aware pending lookup fell back to stream count: queue=" + queueName + + " consumer=" + consumerName + " (" + e.getMessage() + ")"); + return getPendingMessageCount(queueName); + } catch (IOException e) { + throw new RqueueNatsException( + "Failed to read consumer info for queue=" + queueName + " consumer=" + consumerName + + " stream=" + stream, + e); + } + } + + /** + * Per-consumer in-flight depth: {@code ConsumerInfo.numAckPending}, the count of messages + * delivered to this consumer but not yet acked. This is the JetStream-side analog of the + * Redis processing ZSET; reporting it per-consumer is essential when multiple + * {@code @RqueueListener} methods on the same queue progress at different rates. + */ + @Override + public long getProcessingMessageCountByConsumer(String queueName, String consumerName) { + if (consumerName == null || consumerName.isEmpty()) { + return getProcessingMessageCount(queueName); + } + QueueDetail q; + try { + q = EndpointRegistry.get(queueName); + } catch (QueueDoesNotExist e) { + return 0L; + } + String stream = config.getStreamPrefix() + q.getName(); + try { + ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName); + return ci == null ? 0L : ci.getNumAckPending(); + } catch (JetStreamApiException e) { + log.log( + Level.FINE, + "Consumer-aware processing lookup fell back to 0: queue=" + queueName + " consumer=" + + consumerName + " (" + e.getMessage() + ")"); + return 0L; + } catch (IOException e) { + throw new RqueueNatsException( + "Failed to read consumer info for queue=" + queueName + " consumer=" + consumerName + + " stream=" + stream, + e); + } + } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java index e9d4ae23..dce914b2 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java @@ -70,7 +70,7 @@ * JetStream stream, republishes each to the destination stream, and hard-deletes the source copy * via {@link JetStreamManagement#deleteMessage}. {@link #enqueueMessage(String, String, String)} * looks up the message in the metadata store and republishes it immediately (without a - * {@code Nats-Next-Deliver-Time} header) so the worker picks it up on its next poll. + * {@code Nats-Schedule} header) so the worker picks it up on its next poll. * *

{@link #makeEmpty(String, String)} still returns "not supported" — purging a stream is a * destructive admin operation best performed via {@code nats stream purge}. @@ -156,7 +156,7 @@ public BooleanResponse deleteMessage(String queueName, String id) { /** * Re-enqueue a message for immediate delivery. Looks up the {@link RqueueMessage} from the * metadata store by {@code queueName + id}, then republishes the raw bytes to the queue's - * JetStream stream without a {@code Nats-Next-Deliver-Time} header so the poller picks it up + * JetStream stream without a {@code Nats-Schedule} header so the poller picks it up * on its next fetch. A fresh {@code Nats-Msg-Id} ({@code id-requeue-}) prevents * JetStream from deduplicating against the original scheduled publish. The {@code position} * hint (FRONT / BACK) is ignored — JetStream pull consumers deliver in stream-sequence order. diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java index bead4047..71208f33 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java @@ -9,6 +9,7 @@ */ package com.github.sonus21.rqueue.nats; +import static org.junit.jupiter.api.Assumptions.assumeTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -19,18 +20,30 @@ import io.nats.client.Options; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; /** - * Base for JetStream integration tests. Mirrors the Redis test pattern: when {@code NATS_RUNNING} - * is set the test connects to a locally running nats-server (CI path); otherwise a Testcontainers- - * managed instance is started in {@link BeforeAll} (local Docker path). JUnit 5 / Testcontainers - * skip the test gracefully if Docker isn't available and {@code NATS_RUNNING} isn't set. + * Base for JetStream integration tests. Two execution paths: + * + *

    + *
  1. External NATS — set {@code NATS_RUNNING=1} (and optionally {@code NATS_URL}) to + * connect to an already-running nats-server. This is the path used in CI and when + * nats-server is installed locally (e.g. via Homebrew). + *
  2. Testcontainers — when Docker is available and {@code NATS_RUNNING} is not set, a + * {@code nats:2.12-alpine} container is started automatically. + *
+ * + *

If neither path is viable the whole class is skipped via {@link + * org.junit.jupiter.api.Assumptions#assumeTrue} — no hard failure. + * + *

Why no {@code @Testcontainers(disabledWithoutDocker = true)}: that annotation + * disables the class at the JUnit extension level before {@code @BeforeAll} runs, + * so the external-NATS path would never be reached when Docker is absent. We manage the + * container lifecycle manually and gate on assumptions instead. */ -@Testcontainers(disabledWithoutDocker = true) @NatsIntegrationTest public abstract class AbstractJetStreamIT { @@ -38,14 +51,7 @@ public abstract class AbstractJetStreamIT { static final String EXTERNAL_NATS_URL = System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); - /** - * Container is only constructed in the local-Docker path. The Testcontainers extension - * ignores static {@code GenericContainer} fields that aren't annotated {@code @Container}; - * we manage the container's lifecycle from {@link #setup()} / {@link #teardown()} so the - * external-NATS path can leave it null without tripping the extension. - */ protected static GenericContainer NATS; - protected static Connection connection; @BeforeAll @@ -54,6 +60,10 @@ static void setup() throws Exception { if (USE_EXTERNAL_NATS) { url = EXTERNAL_NATS_URL; } else { + assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping NATS ITs: Docker is not available and NATS_RUNNING is not set. " + + "Start nats-server locally and set NATS_RUNNING=1, or start Docker."); NATS = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) .withCommand("-js", "-DV") .withExposedPorts(4222) @@ -82,11 +92,6 @@ protected QueueDetail mockQueue(String name, QueueType type) { return mockQueue(name, type, null); } - /** - * Build a mock QueueDetail whose {@code resolvedConsumerName()} returns the given consumer - * name. Used by tests that exercise multi-consumer flows where pop's {@code consumerName} - * argument must match what ack/nack derive from the QueueDetail. - */ protected QueueDetail mockQueue(String name, QueueType type, String consumerName) { QueueDetail q = mock(QueueDetail.class); when(q.getName()).thenReturn(name); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java new file mode 100644 index 00000000..ac19a4ec --- /dev/null +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AllowMessageSchedulesEnforcementIT.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.nats; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import io.nats.client.JetStream; +import io.nats.client.JetStreamManagement; +import io.nats.client.api.RetentionPolicy; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; +import io.nats.client.impl.Headers; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import org.junit.jupiter.api.Test; + +/** + * Investigates whether NATS server actually enforces the {@code allow_msg_schedules} stream flag + * when a message is published with the {@code Nats-Schedule} header (ADR-51, NATS >= 2.12). + * + *

The unit tests confirm that {@link com.github.sonus21.rqueue.nats.internal.NatsProvisioner} + * sets the flag correctly, but they mock {@code jsm.addStream()} — they cannot prove the server + * enforces it. This IT creates a stream without the flag against a real + * {@code nats:2.12-alpine} container (or an externally-managed server) and attempts the publish + * directly, recording whether the server accepts or rejects it. + * + *

Expected result: NATS 2.12+ enforces the flag; the publish should be rejected (throw) + * with error 10189. If the publish succeeds the test documents that the flag is advisory on this + * server version, so that the team knows the protective unit tests are not backed by enforcement. + */ +@NatsIntegrationTest +class AllowMessageSchedulesEnforcementIT extends AbstractJetStreamIT { + + private static final DateTimeFormatter RFC3339 = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC); + + @Test + void publishWithScheduleHeader_streamWithoutFlag_serverRejectsIt() throws Exception { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // Only meaningful on servers that support scheduling at all + assumeTrue( + connection.getServerInfo().isSameOrNewerThanVersion("2.12.0"), + "Skipping: server < 2.12 — Nats-Schedule header not supported"); + + String nano = String.valueOf(System.nanoTime()); + String streamName = "enforce-sched-test-" + nano; + // Use a sched-subject pattern so the subject is valid for scheduling publish + String workSubject = "rqueue.enforce." + nano; + String schedSubject = workSubject + ".sched.probe"; + + // Create stream deliberately WITHOUT allowMessageSchedules, but including both subjects + StreamConfiguration cfg = StreamConfiguration.builder() + .name(streamName) + .subjects(workSubject, schedSubject) + .storageType(StorageType.Memory) + .retentionPolicy(RetentionPolicy.WorkQueue) + .allowMessageSchedules(false) + .build(); + jsm.addStream(cfg); + + // Confirm the flag is not set on the server-side config + StreamInfo info = jsm.getStreamInfo(streamName); + assertFalse( + info.getConfiguration().getAllowMsgSchedules(), + "Pre-condition: stream must NOT have allow_msg_schedules"); + + // Build correct ADR-51 headers: Nats-Schedule + Nats-Schedule-Target + Nats-Rollup + String deliverAt = "@at " + RFC3339.format(Instant.now().plusSeconds(10)); + Headers headers = new Headers(); + headers.add("Nats-Schedule", deliverAt); + headers.add("Nats-Schedule-Target", workSubject); + headers.add("Nats-Rollup", "sub"); + + boolean serverEnforcesFlag; + try { + js.publish(schedSubject, headers, "probe".getBytes()); + // Publish succeeded — server does NOT enforce the flag + serverEnforcesFlag = false; + } catch (Exception e) { + // Publish rejected (expected: error 10189) — server DOES enforce the flag + serverEnforcesFlag = true; + } + + // Clean up + jsm.deleteStream(streamName); + + // Document and assert the actual behaviour. + // If this assertion ever changes it means a NATS server upgrade changed the enforcement policy. + assertTrue( + serverEnforcesFlag, + "NATS server accepted Nats-Schedule on a stream without allow_msg_schedules. This means" + + " the flag is advisory on this server version — the unit tests in" + + " NatsProvisionerTest that verify the flag is set are still correct best-practice" + + " but the server does not enforce it. Update this test if the behaviour is" + + " intentional."); + } + + @Test + void publishWithScheduleHeader_streamWithFlag_serverAcceptsIt() throws Exception { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + assumeTrue( + connection.getServerInfo().isSameOrNewerThanVersion("2.12.0"), + "Skipping: server < 2.12 — Nats-Schedule header not supported"); + + String nano = String.valueOf(System.nanoTime()); + String streamName = "enforce-sched-ok-" + nano; + String workSubject = "rqueue.enforce.ok." + nano; + String schedSubject = workSubject + ".sched.probe"; + + // Create stream WITH allowMessageSchedules + StreamConfiguration cfg = StreamConfiguration.builder() + .name(streamName) + .subjects(workSubject, schedSubject) + .storageType(StorageType.Memory) + .retentionPolicy(RetentionPolicy.WorkQueue) + .allowMessageSchedules(true) + .build(); + jsm.addStream(cfg); + + StreamInfo info = jsm.getStreamInfo(streamName); + assertTrue( + info.getConfiguration().getAllowMsgSchedules(), + "Pre-condition: stream MUST have allow_msg_schedules"); + + String deliverAt = "@at " + RFC3339.format(Instant.now().plusSeconds(10)); + Headers headers = new Headers(); + headers.add("Nats-Schedule", deliverAt); + headers.add("Nats-Schedule-Target", workSubject); + headers.add("Nats-Rollup", "sub"); + + // With the flag set this must never throw + assertDoesNotThrow( + () -> js.publish(schedSubject, headers, "probe".getBytes()), + "Publish with Nats-Schedule must succeed when allow_msg_schedules=true"); + + assertNotNull(jsm.deleteStream(streamName)); + } +} diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java index 0b2e20fa..5acebe56 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerSchedulingIT.java @@ -139,4 +139,67 @@ void enqueueWithDelay_multipleMessages_allDeliveredAfterTheirTimes() throws Exce } } } + + /** + * Regression test for the "enqueue-first then enqueueWithDelay" ordering bug. + * + *

If a plain {@code enqueue()} call happens before the first {@code enqueueWithDelay()}, the + * stream is created with only the work subject and no {@code allow_msg_schedules} flag. The + * provisioner must detect this on the delayed call and upgrade the stream in-place: add the + * sched-wildcard subject AND set the flag in a single {@code updateStream()}, otherwise NATS + * rejects the sched-subject publish with "no stream matches subject". + */ + /** + * Regression test for the "enqueue-first then enqueueWithDelay" ordering bug. + * + *

Uses a longer delay (10 s) than the other tests to absorb the stream-upgrade overhead + * (updateStream round-trip + consumer creation) that happens in this path but not when the + * stream is provisioned with scheduling from the start. Without the extra headroom the + * "beforeDelay" check can race with the scheduler firing. + */ + @Test + void enqueueWithDelay_afterPlainEnqueue_streamUpgradedAndMessageHeld() throws Exception { + final long UPGRADE_DELAY_MS = 10_000L; + final long UPGRADE_BUFFER_MS = 3_000L; + QueueDetail q = mockQueue("mixed-" + System.nanoTime(), QueueType.QUEUE, "mixed-worker"); + try (JetStreamMessageBroker broker = + JetStreamMessageBroker.builder().connection(connection).build()) { + assumeTrue( + broker.capabilities().supportsDelayedEnqueue(), + "Skipping: connected NATS server does not support message scheduling (< " + + com.github.sonus21.rqueue.nats.internal.NatsProvisioner.SCHEDULING_MIN_VERSION + + ")"); + + // Step 1 — plain enqueue: creates stream with only the work subject, no sched flag + RqueueMessage immediate = + RqueueMessage.builder().id("imm-1").message("immediate").build(); + broker.enqueue(q, immediate); + + // Step 2 — delayed enqueue: must upgrade the stream (add sched wildcard + flag) + RqueueMessage delayed = RqueueMessage.builder() + .id("del-1") + .message("delayed") + .processAt(System.currentTimeMillis() + UPGRADE_DELAY_MS) + .build(); + broker.enqueueWithDelay(q, delayed, UPGRADE_DELAY_MS); + + // The immediate message must be available right away + List nowMsgs = broker.pop(q, "mixed-worker", 5, Duration.ofSeconds(2)); + assertEquals(1, nowMsgs.size(), "Immediate message must be delivered right away"); + assertEquals("immediate", nowMsgs.get(0).getMessage()); + broker.ack(q, nowMsgs.get(0)); + + // The delayed message must NOT be visible yet (10 s away — stream upgrade takes < 1 s) + List beforeDelay = broker.pop(q, "mixed-worker", 5, Duration.ofMillis(500)); + assertTrue(beforeDelay.isEmpty(), "Delayed message must not arrive before scheduled time"); + + Thread.sleep(UPGRADE_DELAY_MS + UPGRADE_BUFFER_MS); + + // After the delay the scheduled message must arrive + List afterDelay = broker.pop(q, "mixed-worker", 5, Duration.ofSeconds(3)); + assertEquals(1, afterDelay.size(), "Delayed message must be delivered after scheduled time"); + assertEquals("delayed", afterDelay.get(0).getMessage()); + broker.ack(q, afterDelay.get(0)); + } + } } diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 4b6d3817..2aa9a912 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -286,10 +286,12 @@ void capabilities_schedulingSupported_supportsDelayedEnqueueIsTrue() { /** * JetStreamMessageBroker does not override {@code scheduleNext}; the SPI default calls * {@code enqueueWithDelay(q, message, delay)}. Verify that path reaches JetStream publish - * with the correct subject and that {@code Nats-Next-Deliver-Time} is set (ADR-51 header). + * to the scheduler subject ({@code .sched.}) with the correct ADR-51 headers: + * {@code Nats-Schedule: @at }, {@code Nats-Schedule-Target: }, and + * {@code Nats-Rollup: sub}. */ @Test - void scheduleNext_delegatesToEnqueueWithDelay_setsDeliverTimeHeader() throws Exception { + void scheduleNext_delegatesToEnqueueWithDelay_setsScheduleHeaders() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults(), true); when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class))) .thenReturn(mock(PublishAck.class)); @@ -305,11 +307,31 @@ void scheduleNext_delegatesToEnqueueWithDelay_setsDeliverTimeHeader() throws Exc // scheduleNext default: delayMs = max(0, processAt - now) → enqueueWithDelay f.broker.scheduleNext(queueNamed("orders"), "ignored-key", m, 60L); + ArgumentCaptor subject = ArgumentCaptor.forClass(String.class); ArgumentCaptor headers = ArgumentCaptor.forClass(Headers.class); - verify(f.js, times(1)).publish(eq("rqueue.js.orders"), headers.capture(), any(byte[].class)); - assertNotNull( - headers.getValue().getFirst("Nats-Next-Deliver-Time"), - "ADR-51 deliver-time header must be present"); + verify(f.js, times(1)).publish(subject.capture(), headers.capture(), any(byte[].class)); + + // Must publish to the scheduler subject, NOT the work subject + assertEquals( + "rqueue.js.orders.sched.pid-1", + subject.getValue(), + "Must publish to scheduler subject .sched."); + + // Nats-Schedule header must carry the @at prefix and an RFC3339 UTC time + String schedule = headers.getValue().getFirst(JetStreamMessageBroker.HDR_SCHEDULE); + assertNotNull(schedule, "Nats-Schedule header must be present"); + assertTrue(schedule.startsWith("@at "), "Nats-Schedule value must start with '@at '"); + + // Nats-Schedule-Target must point to the work subject + assertEquals( + "rqueue.js.orders", + headers.getValue().getFirst(JetStreamMessageBroker.HDR_SCHEDULE_TARGET), + "Nats-Schedule-Target must be the work subject"); + + // Nats-Rollup must be 'sub' for per-subject idempotent scheduling + assertEquals("sub", headers.getValue().getFirst("Nats-Rollup"), "Nats-Rollup must be 'sub'"); + + // Dedup key must encode the period identity assertEquals( "pid-1-at-" + processAt, headers.getValue().getFirst("Nats-Msg-Id"), diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java index fc6afab9..2da7ab4b 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/NatsStreamValidatorProducerModeTest.java @@ -63,7 +63,8 @@ private static QueueDetail queue(String name) { void setUp() { EndpointRegistry.delete(); provisioner = mock(NatsProvisioner.class); - when(provisioner.ensureConsumer(anyString(), anyString(), any(), anyLong(), anyLong())) + when(provisioner.ensureConsumer( + anyString(), anyString(), anyString(), any(), anyLong(), anyLong())) .thenReturn("rqueue-consumer"); natsConfig = RqueueNatsConfig.defaults(); } @@ -81,7 +82,8 @@ void producerMode_skipsConsumerProvisioningButStillEnsuresStream() { verify(provisioner, times(1)) .ensureStream(eq(natsConfig.getStreamPrefix() + "orders"), any(), any(), any()); verify(provisioner, never()) - .ensureConsumer(anyString(), anyString(), any(Duration.class), anyLong(), anyLong()); + .ensureConsumer( + anyString(), anyString(), anyString(), any(Duration.class), anyLong(), anyLong()); } @Test @@ -99,6 +101,7 @@ void consumerMode_provisionsBothStreamAndConsumer() { .ensureConsumer( eq(natsConfig.getStreamPrefix() + "orders"), anyString(), + eq(natsConfig.getSubjectPrefix() + "orders"), any(Duration.class), anyLong(), anyLong()); @@ -112,6 +115,7 @@ void nullRqueueConfig_treatedAsConsumerMode_provisionsBoth() { validator.afterSingletonsInstantiated(); verify(provisioner, times(1)) - .ensureConsumer(anyString(), anyString(), any(Duration.class), anyLong(), anyLong()); + .ensureConsumer( + anyString(), anyString(), anyString(), any(Duration.class), anyLong(), anyLong()); } } diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java index 5497aabd..4b6abbbd 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/internal/NatsProvisionerTest.java @@ -16,6 +16,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -167,12 +168,16 @@ void ensureStream_streamAlreadyExists_skipsCreation() throws IOException, JetStr StreamInfo existing = mock(StreamInfo.class); StreamConfiguration existingCfg = mock(StreamConfiguration.class); when(existingCfg.getRetentionPolicy()).thenReturn(io.nats.client.api.RetentionPolicy.WorkQueue); + // Stub getSubjects() so the provisioner sees the subject as already present and skips update + when(existingCfg.getSubjects()).thenReturn(Collections.singletonList("rqueue.js.orders")); + when(existingCfg.getAllowMsgSchedules()).thenReturn(false); when(existing.getConfiguration()).thenReturn(existingCfg); when(jsm.getStreamInfo("rqueue-js-orders")).thenReturn(existing); provisioner.ensureStream("rqueue-js-orders", Collections.singletonList("rqueue.js.orders")); verify(jsm, never()).addStream(any()); + verify(jsm, never()).updateStream(any()); } /** @@ -196,6 +201,138 @@ void ensureStream_streamNotExist_createsStream() throws IOException, JetStreamAp verify(jsm, times(1)).addStream(any(StreamConfiguration.class)); } + /** + * When the NATS server supports scheduling (≥ 2.12), streams must be created with + * {@code allowMessageSchedules=true} so the server accepts {@code Nats-Schedule} + * publish headers (ADR-51). Equivalent to: {@code nats stream add --allow-schedules}. + */ + @Test + void ensureStream_schedulingSupported_setsAllowMessageSchedules() + throws IOException, JetStreamApiException { + // setUp() already wires serverInfo.isSameOrNewerThanVersion() → true (scheduling supported) + JetStreamApiException notFound = makeStreamNotFoundEx(); + when(jsm.getStreamInfo("rqueue-js-orders")).thenThrow(notFound); + + provisioner.ensureStream( + "rqueue-js-orders", + Collections.singletonList("rqueue.js.orders"), + QueueType.QUEUE, + null, + true); + + verify(jsm, times(1)).addStream(argThat(cfg -> cfg.getAllowMsgSchedules())); + } + + @Test + void ensureStream_schedulingNotSupported_doesNotSetAllowMessageSchedules() + throws IOException, JetStreamApiException { + // Build a provisioner backed by a server that does NOT support scheduling + Connection oldConn = mock(Connection.class); + KeyValueManagement oldKvm = mock(KeyValueManagement.class); + when(oldConn.keyValueManagement()).thenReturn(oldKvm); + io.nats.client.api.ServerInfo oldInfo = mock(io.nats.client.api.ServerInfo.class); + when(oldInfo.isSameOrNewerThanVersion(anyString())).thenReturn(false); + when(oldInfo.getVersion()).thenReturn("2.10.0"); + when(oldConn.getServerInfo()).thenReturn(oldInfo); + NatsProvisioner oldProvisioner = new NatsProvisioner(oldConn, jsm, config); + + JetStreamApiException notFound = makeStreamNotFoundEx(); + when(jsm.getStreamInfo("rqueue-js-orders")).thenThrow(notFound); + + // allowSchedules=true is requested but server doesn't support it → flag must NOT be set + oldProvisioner.ensureStream( + "rqueue-js-orders", + Collections.singletonList("rqueue.js.orders"), + QueueType.QUEUE, + null, + true); + + verify(jsm, times(1)).addStream(argThat(cfg -> !cfg.getAllowMsgSchedules())); + } + + /** + * If a stream was initially created without scheduling (allowSchedules=false) and a later call + * requests scheduling (allowSchedules=true), the provisioner must call updateStream() to add the + * flag rather than silently skipping because the stream is already in cache. + */ + @Test + void ensureStream_upgradeScheduling_updatesExistingStream() + throws IOException, JetStreamApiException { + // First call: stream doesn't exist, created without scheduling + JetStreamApiException notFound = makeStreamNotFoundEx(); + when(jsm.getStreamInfo("rqueue-js-orders")) + .thenThrow(notFound) // first call: not found → create + .thenReturn(mock( + StreamInfo.class, + inv -> { // second call: exists, no scheduling + String m = inv.getMethod().getName(); + if ("getConfiguration".equals(m)) { + StreamConfiguration cfg = StreamConfiguration.builder() + .name("rqueue-js-orders") + .subjects(Collections.singletonList("rqueue.js.orders")) + .build(); + return cfg; + } + return null; + })); + + // First call: no scheduling + provisioner.ensureStream( + "rqueue-js-orders", + Collections.singletonList("rqueue.js.orders"), + QueueType.QUEUE, + null, + false); + // Second call (different provisioner instance to bypass cache): scheduling requested + NatsProvisioner p2 = new NatsProvisioner(connection, jsm, config); + p2.ensureStream( + "rqueue-js-orders", + Collections.singletonList("rqueue.js.orders"), + QueueType.QUEUE, + null, + true); + + verify(jsm, times(1)).addStream(any(StreamConfiguration.class)); + verify(jsm, times(1)).updateStream(argThat(cfg -> cfg.getAllowMsgSchedules())); + } + + /** + * Concrete scenario: plain enqueue() creates the stream with only the work subject, then + * enqueueWithDelay() is called for the first time. The provisioner must update the stream to + * add BOTH the sched-wildcard subject AND the allow_msg_schedules flag in a single updateStream() + * call. Without the subject update, publishing to the sched subject would be rejected by NATS + * with "no stream matches subject". + */ + @Test + void ensureStream_firstEnqueueThenDelay_addsSchedSubjectAndFlag() + throws IOException, JetStreamApiException { + String workSubject = "rqueue.js.orders"; + String schedPattern = workSubject + ".sched.*"; + + // After enqueue() created the stream (work subject only, no scheduling flag) + StreamInfo existingInfo = mock(StreamInfo.class, inv -> { + if ("getConfiguration".equals(inv.getMethod().getName())) { + return StreamConfiguration.builder() + .name("rqueue-js-orders") + .subjects(Collections.singletonList(workSubject)) + .build(); // allowMsgSchedules defaults to false + } + return null; + }); + when(jsm.getStreamInfo("rqueue-js-orders")).thenReturn(existingInfo); + + // enqueueWithDelay() calls ensureStream with the sched wildcard and allowSchedules=true + provisioner.ensureStream( + "rqueue-js-orders", Arrays.asList(workSubject, schedPattern), QueueType.QUEUE, null, true); + + // Must call updateStream (never addStream — stream already exists) + verify(jsm, never()).addStream(any()); + verify(jsm, times(1)) + .updateStream(argThat(cfg -> cfg.getAllowMsgSchedules() + && cfg.getSubjects().contains(workSubject) + && cfg.getSubjects().contains(schedPattern))); + } + @Test void ensureStream_calledTwice_onlyCallsNatsOnce() throws IOException, JetStreamApiException { JetStreamApiException notFound = makeStreamNotFoundEx(); diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java index 0f1fbb95..df4c6dc9 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java @@ -21,6 +21,7 @@ import com.github.sonus21.rqueue.nats.RqueueNatsConfig; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker; +import com.github.sonus21.rqueue.nats.js.NatsDeadLetterBridgeRegistrar; import com.github.sonus21.rqueue.nats.js.NatsStreamValidator; import com.github.sonus21.rqueue.nats.kv.NatsKvBucketValidator; import com.github.sonus21.rqueue.nats.metrics.NatsRqueueQueueMetricsProvider; @@ -162,6 +163,19 @@ public NatsStreamValidator natsStreamValidator( natsProvisioner, toBrokerConfig(props), rqueueConfigProvider.getIfAvailable()); } + /** + * Boot-time installer for the NATS-native dead-letter advisory bridge: for every registered + * queue, subscribes to {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES..} + * and republishes any message that exhausts {@code maxDeliver} onto the queue's DLQ stream. + * Complementary to the rqueue-level DLQ path ({@code PostProcessingHandler.moveToDlq}). + */ + @Bean + @ConditionalOnMissingBean(NatsDeadLetterBridgeRegistrar.class) + public NatsDeadLetterBridgeRegistrar natsDeadLetterBridgeRegistrar( + MessageBroker broker, ObjectProvider rqueueConfigProvider) { + return new NatsDeadLetterBridgeRegistrar(broker, rqueueConfigProvider.getIfAvailable()); + } + /** * Bean form of the KV-bucket validator. Other NATS beans {@code @DependsOn} this name so it runs * before they are constructed. The flag is sourced from {@link RqueueNatsProperties} — diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java index 8b8e5078..288dce93 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/AbstractNatsBootIT.java @@ -15,12 +15,21 @@ */ package com.github.sonus21.rqueue.spring.boot.integration; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; /** @@ -30,16 +39,21 @@ * when the {@code NATS_RUNNING} environment variable is set the tests assume an externally * managed nats-server is reachable at {@code NATS_URL} (default {@code nats://127.0.0.1:4222}) * and skip Testcontainers entirely. CI sets {@code NATS_RUNNING=true} after starting nats-server - * via apt; local dev leaves it unset and falls back to Testcontainers, which itself skips - * gracefully when Docker isn't available. + * via apt; local dev leaves it unset and falls back to Testcontainers. + * + *

When neither {@code NATS_RUNNING} is set nor Docker is available the entire test class is + * skipped via {@link org.junit.jupiter.api.Assumptions#assumeTrue} inside {@link #startNats()}. + * This avoids the {@code @Testcontainers(disabledWithoutDocker=true)} pitfall where the annotation + * silently disables tests even when {@code NATS_RUNNING=true} and Docker happens to be absent. * *

Subclasses declare their own {@code @SpringBootApplication} test config (typically excluding * Redis auto-config, see {@link NatsBackendEndToEndIT} for the reference pattern) and any * {@code @RqueueListener} beans they need. */ -@Testcontainers(disabledWithoutDocker = true) abstract class AbstractNatsBootIT { + private static final Logger log = Logger.getLogger(AbstractNatsBootIT.class.getName()); + static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; static final String EXTERNAL_NATS_URL = @@ -52,6 +66,10 @@ static void startNats() { if (USE_EXTERNAL_NATS || NATS != null) { return; } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set — " + + "start nats-server locally or set NATS_RUNNING=true"); NATS = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) .withCommand("-js") .withExposedPorts(4222) @@ -71,4 +89,64 @@ static void natsProps(DynamicPropertyRegistry r) { }); } } + + /** + * URL of whichever NATS the test is targeting — the externally-managed server when + * {@code NATS_RUNNING} is set, otherwise the Testcontainers-managed instance. Subclasses use + * this to open a short-lived connection in {@code @BeforeAll} cleanup hooks (see + * {@link #deleteStreamsWithPrefix}). + */ + static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + if (NATS == null) { + startNats(); + } + return "nats://" + NATS.getHost() + ":" + NATS.getMappedPort(4222); + } + + /** + * Wipe every JetStream stream whose name starts with {@code prefix}. Called from each subclass's + * {@code @BeforeAll} (parameterised on that subclass's per-class {@code stream-prefix}) so that + * stale streams left by an earlier test class on the same NATS server — or by an earlier rerun + * against a persistent JetStream directory — never leak into the new run. + * + *

Best-effort: a single failing delete logs a warning and the loop continues. The cleanup + * itself never fails the test. + */ + static void deleteStreamsWithPrefix(String prefix) { + if (prefix == null || prefix.isEmpty()) { + return; + } + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement jsm = c.jetStreamManagement(); + List names = jsm.getStreamNames(); + if (names == null) { + return; + } + int deleted = 0; + for (String name : names) { + if (!name.startsWith(prefix)) { + continue; + } + try { + jsm.deleteStream(name); + deleted++; + } catch (IOException | JetStreamApiException e) { + log.log( + Level.WARNING, "Failed to delete leftover stream " + name + ": " + e.getMessage()); + } + } + if (deleted > 0) { + log.log( + Level.INFO, + "Pre-test cleanup: deleted {0} stream(s) with prefix ''{1}''", + new Object[] {deleted, prefix}); + } + } catch (IOException | InterruptedException | JetStreamApiException e) { + log.log( + Level.WARNING, "Stream cleanup with prefix '" + prefix + "' failed: " + e.getMessage()); + } + } } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java index 55d53780..cadadf01 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -55,10 +56,22 @@ */ @SpringBootTest( classes = NatsBackendEndToEndIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsBackendEndToEndIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsBackendEndToEndIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsBackendEndToEndIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-backendE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.backendE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java index 3cdda5eb..6076017d 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -41,10 +42,22 @@ */ @SpringBootTest( classes = NatsConcurrencyE2EIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsConcurrencyE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsConcurrencyE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsConcurrencyE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-concurrencyE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.concurrencyE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java index e47b6896..2b089f16 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConsumerNameOverrideE2EIT.java @@ -23,6 +23,7 @@ import io.nats.client.api.ConsumerInfo; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -42,10 +43,22 @@ */ @SpringBootTest( classes = NatsConsumerNameOverrideE2EIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsConsumerNameOverrideE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsConsumerNameOverrideE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsConsumerNameOverrideE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-consumerOvrE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.consumerOvrE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; @@ -60,7 +73,8 @@ void overriddenConsumerNameIsRegisteredOnTheStream() throws Exception { enqueuer.enqueue("custom-consumer", "hello"); assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue(); - ConsumerInfo info = jsm.getConsumerInfo("rqueue-js-custom-consumer", "my-custom-consumer"); + ConsumerInfo info = + jsm.getConsumerInfo(STREAM_PREFIX + "custom-consumer", "my-custom-consumer"); assertThat(info).isNotNull(); assertThat(info.getName()).isEqualTo("my-custom-consumer"); } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java index e7dddcbe..943a4536 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsMultipleListenersOnSameQueueE2EIT.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -45,12 +46,24 @@ */ @SpringBootTest( classes = NatsMultipleListenersOnSameQueueE2EIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsMultipleListenersOnSameQueueE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsMultipleListenersOnSameQueueE2EIT.SUBJECT_PREFIX + }) @Tag("nats") @Disabled("Default JetStream retention=WorkQueue prevents true fan-out across multiple consumers; " + "enable once retention is configurable per queue or defaulted to Limits/Interest.") class NatsMultipleListenersOnSameQueueE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-multiListenerE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.multiListenerE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java index 6640ac9d..a366a7f1 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -43,10 +44,22 @@ */ @SpringBootTest( classes = NatsPriorityQueuesE2EIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsPriorityQueuesE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsPriorityQueuesE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsPriorityQueuesE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-priorityE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.priorityE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java index 596b5615..bb92cca0 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -43,10 +44,23 @@ */ @SpringBootTest( classes = NatsReactiveEnqueueE2EIT.TestApp.class, - properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"}) + properties = { + "rqueue.backend=nats", + "rqueue.reactive.enabled=true", + "rqueue.nats.naming.stream-prefix=" + NatsReactiveEnqueueE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsReactiveEnqueueE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsReactiveEnqueueE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-reactiveEnqE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.reactiveEnqE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired ReactiveRqueueMessageEnqueuer reactiveEnqueuer; diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java index a7b86256..a0de8e23 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsRetryAndDlqE2EIT.java @@ -19,12 +19,15 @@ import com.github.sonus21.rqueue.annotation.RqueueListener; import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import io.nats.client.Connection; import io.nats.client.JetStreamManagement; import io.nats.client.api.StreamInfo; +import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.awaitility.Awaitility; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -32,62 +35,128 @@ import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; import org.springframework.stereotype.Component; /** - * After a handler exhausts {@code numRetries}, JetStream emits a max-deliveries advisory and the - * broker's {@code installDeadLetterBridge} dispatcher republishes the payload onto the DLQ - * stream. Currently disabled because {@link - * com.github.sonus21.rqueue.spring.boot.RqueueNatsAutoConfig} does not yet invoke - * {@code JetStreamMessageBroker.installDeadLetterBridge(...)} during container start, so dead- - * lettered messages never reach the DLQ stream. Enable this test once that wiring is added. + * Verifies the NATS-native dead-letter advisory bridge installed by + * {@link com.github.sonus21.rqueue.nats.js.NatsDeadLetterBridgeRegistrar}: when JetStream emits + * {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES..} for a queue's durable + * consumer, the registrar's dispatcher looks up the offending message by sequence number and + * republishes it onto the queue's DLQ stream ({@code }). + * + *

Why a synthetic advisory. In normal rqueue flow the framework either acks + * (success / forced-discard / moveToDlq) or naks (retry) every delivery, so NATS sees a terminal + * action before {@code maxDeliver} elapses and the advisory never fires from a real handler — that + * path is covered by + * {@code NatsSchedulingAdvancedE2EIT#scheduledMessageExhaustsRetriesToDlq} via the rqueue-level + * DLQ ({@code PostProcessingHandler.moveToDlq}). The advisory bridge is a defensive net for cases + * outside rqueue's control (process crash mid-handler, or a handler that blocks past its + * visibility timeout AND past every retry while NATS keeps redelivering). Triggering that path + * end-to-end is racy and slow, so this test instead publishes a synthetic advisory matching the + * shape {@code nats-server 2.12} actually emits and asserts the dispatcher reacts: enqueues a + * payload, looks up its stream sequence, fakes the advisory, and waits for the DLQ stream to + * receive it. */ @SpringBootTest( classes = NatsRetryAndDlqE2EIT.TestApp.class, - properties = {"rqueue.backend=nats"}) + properties = { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NatsRetryAndDlqE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsRetryAndDlqE2EIT.SUBJECT_PREFIX + }) @Tag("nats") -@Disabled("This test exercises the NATS-native advisory bridge path" - + " (JetStreamMessageBroker.installDeadLetterBridge /" - + " $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES), which is not yet wired by" - + " RqueueNatsAutoConfig. The rqueue-level DLQ path (PostProcessingHandler → broker.moveToDlq)" - + " is already tested in NatsSchedulingAdvancedE2EIT#scheduledMessageExhaustsRetriesToDlq and" - + " works without this bridge. Enable this test once RqueueNatsAutoConfig provisions the" - + " advisory dispatcher per queue during container start.") class NatsRetryAndDlqE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-retryDlqE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.retryDlqE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + @Autowired RqueueMessageEnqueuer enqueuer; @Autowired - FailingListener listener; + Connection natsConnection; @Autowired JetStreamManagement jsm; + @Autowired + BlockingListener listener; + @Test - void exhaustedMessageLandsOnDlqStream() { - enqueuer.enqueue("failing", "boom"); + void advisoryBridgeRepublishesIntoDlqStream() throws Exception { + String stream = STREAM_PREFIX + "boom"; + String dlqStream = stream + "-dlq"; + + // Publish via the normal enqueue path so the source stream + bridge get provisioned exactly + // the way they would in production. The blocking listener picks up the delivery but never + // returns, so the message stays in flight (un-acked) in the WorkQueue stream — which is the + // realistic state when JetStream actually fires the max-delivery advisory in production + // (handler hung, ack never sent). The long visibilityTimeout keeps NATS from redelivering + // during the test. + enqueuer.enqueue("boom", "marker-payload"); + + // Confirm the listener has the message in flight (proves the source stream still has it: in + // WorkQueue retention an unacked message stays in the stream until acked or AckWait expires). + assertThat(listener.received.await(20, TimeUnit.SECONDS)) + .as("Listener must receive the marker payload before we synthesize the advisory") + .isTrue(); + + // Bridge installer provisioned the DLQ stream from boot (installDeadLetterBridge → + // provisionDlq). + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(jsm.getStreamInfo(dlqStream)).isNotNull(); + }); + long sourceSeq = jsm.getStreamInfo(stream).getStreamState().getLastSequence(); - Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> listener.attempts.get() >= 2); + // Synthetic max-delivery advisory matching nats-server 2.12's payload shape: only stream_seq + // is required by the bridge's republish logic. The advisory subject must include + // .; consumer name is the rqueue default for this queue, which is + // -consumer (see QueueDetail#resolvedConsumerName). + String consumer = "boom-consumer"; + String advisorySubject = + "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES." + stream + "." + consumer; + String advisoryJson = "{\"type\":\"io.nats.jetstream.advisory.v1.max_deliveries\"," + + "\"stream\":\"" + stream + "\"," + + "\"consumer\":\"" + consumer + "\"," + + "\"stream_seq\":" + sourceSeq + "," + + "\"deliveries\":3}"; + natsConnection.publish(advisorySubject, advisoryJson.getBytes(StandardCharsets.UTF_8)); + natsConnection.flush(Duration.ofSeconds(2)); - Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { - StreamInfo dlq = jsm.getStreamInfo("rqueue-js-failing-dlq"); + // The bridge's dispatcher should have republished the source message onto the DLQ stream. + Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> { + StreamInfo dlq = jsm.getStreamInfo(dlqStream); assertThat(dlq.getStreamState().getMsgCount()).isGreaterThanOrEqualTo(1); }); } @SpringBootApplication( exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) + @Import(BlockingListener.class) static class TestApp {} + /** + * Receives the marker payload, signals the test, then blocks past the test's runtime so the + * message stays in flight (un-acked) in the WorkQueue source stream. That keeps it reachable via + * {@code jsm.getMessage(stream, seq)} — which is what the advisory bridge does on receipt — and + * mirrors the production scenario where the bridge fires (handler hung past + * {@code visibilityTimeout}, NATS still considers the message un-acked). The 2-minute + * visibility timeout prevents NATS from redelivering before the test completes. + */ @Component - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); + static class BlockingListener { + final CountDownLatch received = new CountDownLatch(1); - @RqueueListener(value = "failing", numRetries = "2") - void onMessage(String payload) { - attempts.incrementAndGet(); - throw new RuntimeException("simulated failure for payload=" + payload); + @RqueueListener(value = "boom", visibilityTimeout = "120000") + void onMessage(String payload) throws Exception { + received.countDown(); + Thread.sleep(120_000L); // far exceeds the test's 30s budget } } } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java index dcf37646..6224b041 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsScheduledMessageE2EIT.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -45,14 +46,41 @@ */ @SpringBootTest( classes = NatsScheduledMessageE2EIT.TestApp.class, - properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"}) + properties = { + "rqueue.backend=nats", + "rqueue.reactive.enabled=true", + // Per-class prefix isolates this test's NATS streams from every other NATS-backed test + // running against the same nats-server (CI shares one instance across all classes, and a + // persistent JetStream dir survives reruns). Same queue name → distinct stream so we never + // inherit stale config or in-flight messages from an earlier class/run. + "rqueue.nats.naming.stream-prefix=" + NatsScheduledMessageE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsScheduledMessageE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsScheduledMessageE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-schedE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.schedE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + static final Duration DELAY = Duration.ofSeconds(3); static final Duration NOT_YET_GUARD = Duration.ofMillis(800); static final Duration TOTAL_WAIT = Duration.ofSeconds(12); + /** + * Longer delay for the "enqueue-first, then enqueueIn" regression test. The stream upgrade + * (updateStream round-trip + consumer re-creation) adds latency before the scheduler header is + * accepted, so we use 10 s to give the system enough headroom that the "not-yet" assertion + * cannot race with the scheduler firing. + */ + static final Duration ETD_DELAY = Duration.ofSeconds(10); + + static final Duration ETD_TOTAL_WAIT = Duration.ofSeconds(20); + @Autowired NatsProvisioner natsProvisioner; @@ -65,6 +93,9 @@ class NatsScheduledMessageE2EIT extends AbstractNatsBootIT { @Autowired ScheduledListener listener; + @Autowired + EtdListener etdListener; + @BeforeEach void requireSchedulingSupport() { assumeTrue( @@ -88,6 +119,52 @@ void scheduledMessageIsDeliveredAfterDelay() throws Exception { assertThat(listener.syncReceived).containsExactly("delayed-payload"); } + /** + * Regression test for the "enqueue-first, then enqueueIn" stream-upgrade path. + * + *

When a plain {@link RqueueMessageEnqueuer#enqueue enqueue()} call happens before the first + * {@link RqueueMessageEnqueuer#enqueueIn enqueueIn()} call the stream is provisioned without + * the {@code allow_msg_schedules} flag and without the sched-wildcard subject. The provisioner + * must detect this on the delayed call and upgrade the stream in-place (add both the flag and the + * sched-wildcard subject via a single {@code updateStream()}) before publishing the scheduled + * message — otherwise NATS rejects the publish with "no stream matches subject". + * + *

Expected behaviour: + *

    + *
  1. The immediate message is delivered right away (stream has it before any upgrade).
  2. + *
  3. After the stream upgrade the scheduled message is held by JetStream until + * {@code ETD_DELAY} has passed.
  4. + *
  5. Both messages arrive within {@code ETD_TOTAL_WAIT}.
  6. + *
+ */ + @Test + void enqueueFirst_thenEnqueueIn_streamUpgradedAndBothDelivered() throws Exception { + // Step 1: plain enqueue — stream is created with only the work subject, no sched flag + enqueuer.enqueue("etd-e2e", "immediate"); + + // Step 2: delayed enqueue — provisioner must upgrade the stream in-place + enqueuer.enqueueIn("etd-e2e", "delayed", ETD_DELAY); + + // Immediate message must arrive before the ETD_DELAY fires + assertThat(etdListener.immediateLatch.await(TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS)) + .as("Immediate message must be delivered before delay fires") + .isTrue(); + assertThat(etdListener.received) + .as("Only the immediate message must have arrived at this point") + .containsExactly("immediate"); + + // Delayed message must NOT be visible yet (delay is 10 s, assertion runs < 1 s after immediate) + assertThat(etdListener.received).doesNotContain("delayed"); + + // Both messages must eventually arrive within the generous total-wait window + assertThat(etdListener.allLatch.await(ETD_TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS)) + .as("Both messages must be delivered within %s", ETD_TOTAL_WAIT) + .isTrue(); + assertThat(etdListener.received) + .as("Both immediate and delayed messages must have been received") + .containsExactlyInAnyOrder("immediate", "delayed"); + } + @Test void reactiveScheduledMessageIsDeliveredAfterDelay() throws Exception { // Reset latch for the reactive queue @@ -107,9 +184,31 @@ void reactiveScheduledMessageIsDeliveredAfterDelay() throws Exception { @SpringBootApplication( exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(ScheduledListener.class) + @Import({ScheduledListener.class, EtdListener.class}) static class TestApp {} + /** + * Listener for the "enqueue-first, then enqueueIn" regression test queue. + * + *

{@code immediateLatch} counts down on the very first message received (used to assert the + * immediate message arrived before the delay fires). {@code allLatch} counts down twice — once + * per message — and reaching zero signals that both the immediate and delayed messages were + * consumed. + */ + @Component + static class EtdListener { + final CountDownLatch immediateLatch = new CountDownLatch(1); + final CountDownLatch allLatch = new CountDownLatch(2); + final List received = Collections.synchronizedList(new ArrayList<>()); + + @RqueueListener(value = "etd-e2e") + void onMessage(String payload) { + received.add(payload); + immediateLatch.countDown(); // first call unblocks; subsequent calls are no-ops + allLatch.countDown(); // each call decrements; reaches 0 when both arrive + } + } + @Component static class ScheduledListener { final CountDownLatch syncLatch = new CountDownLatch(1); diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java index 9647957f..1c38c0ab 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsSchedulingAdvancedE2EIT.java @@ -27,11 +27,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -62,10 +65,23 @@ */ @SpringBootTest( classes = NatsSchedulingAdvancedE2EIT.TestApp.class, - properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"}) + properties = { + "rqueue.backend=nats", + "rqueue.reactive.enabled=true", + "rqueue.nats.naming.stream-prefix=" + NatsSchedulingAdvancedE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsSchedulingAdvancedE2EIT.SUBJECT_PREFIX + }) @Tag("nats") class NatsSchedulingAdvancedE2EIT extends AbstractNatsBootIT { + static final String STREAM_PREFIX = "rqueue-js-schedAdv-"; + static final String SUBJECT_PREFIX = "rqueue.js.schedAdv."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + static final Duration DELAY = Duration.ofSeconds(3); static final Duration PERIOD = Duration.ofSeconds(5); static final Duration TOTAL_WAIT = Duration.ofSeconds(30); @@ -157,17 +173,45 @@ void concurrentRetryOnDelayedMessageCompletesExactlyOnce() throws Exception { // ---- 4. Concurrent retry on recurring message ------------------------------ + /** + * With {@code concurrency=2} two pollers race for each periodic delivery; the JetStream + * dedup-key (per-message {@code Nats-Msg-Id} = {@code id@processAt}, set by + * {@code JetStreamMessageBroker.buildSchedulingHeaders}) guarantees each period is delivered + * exactly once. This test pins that invariant by tracking the distinct {@code processAt} values + * the listener actually saw and asserting: + * + *

+ */ @Test void concurrentRetryOnRecurringMessageNoDuplicates() throws Exception { assumeScheduling(); enqueuer.enqueuePeriodic("adv-conc-recur-e2e", "conc-recur", PERIOD); - assertThat(concurrentRecurringListener.latch.await(TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS)) - .as("Recurring message with concurrency=2 should complete at least 2 periods") + assertThat(concurrentRecurringListener.distinctPeriodsLatch.await( + TOTAL_WAIT.toSeconds(), TimeUnit.SECONDS)) + .as( + "Recurring message with concurrency=2 should fire >= 3 distinct periods within %s", + TOTAL_WAIT) .isTrue(); - assertThat(concurrentRecurringListener.count.get()) - .as("Each period must run at least twice (2 periods delivered)") - .isGreaterThanOrEqualTo(2); + // Brief quiesce so a racing duplicate (if any) has time to land before we assert. + Thread.sleep(POST_SUCCESS_QUIESCE.toMillis()); + + int count = concurrentRecurringListener.count.get(); + int distinct = concurrentRecurringListener.distinctProcessAts.size(); + assertThat(distinct) + .as("Should have observed at least 3 distinct periodic processAt values") + .isGreaterThanOrEqualTo(3); + assertThat(count) + .as( + "Each distinct period must be processed exactly once: count=%d, distinct=%d " + + "— a discrepancy means JetStream dedup (Nats-Msg-Id=id@processAt) regressed", + count, distinct) + .isEqualTo(distinct); } // ---- 6. Long-running job: keep-alive via Job.updateVisibilityTimeout ------ @@ -282,18 +326,30 @@ void onMessage(String payload) { } /** - * Listener for test 4: concurrency=2, periodic message; latch opens when 2 periods complete. - * The dedup-key fix (id@processAt) ensures period 2+ is not silently dropped by JetStream. + * Listener for test 4: concurrency=2, periodic message. Tracks every delivery's + * {@code processAt} (read from the {@link Job#getRqueueMessage()} attached to the message + * headers) so the test can assert each period is processed exactly once. + * + *

{@code count} = total handler invocations. {@code distinctProcessAts} = unique + * {@code processAt} values seen. With correct JetStream dedup keying the two are equal; if a + * duplicate slips through (e.g. a regression in the {@code id@processAt} {@code Nats-Msg-Id}) + * we observe {@code count > distinctProcessAts.size()}. */ @Component static class ConcurrentRetryRecurringListener { final AtomicInteger count = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(2); + /** Concurrent set: ConcurrentHashMap-backed so concurrent inserts don't drop entries. */ + final Set distinctProcessAts = ConcurrentHashMap.newKeySet(); + + final CountDownLatch distinctPeriodsLatch = new CountDownLatch(3); @RqueueListener(value = "adv-conc-recur-e2e", concurrency = "2") - void onMessage(String payload) { + void onMessage(String payload, @Header(RqueueMessageHeaders.JOB) Job job) { count.incrementAndGet(); - latch.countDown(); + long processAt = job.getRqueueMessage().getProcessAt(); + if (distinctProcessAts.add(processAt)) { + distinctPeriodsLatch.countDown(); + } } }