Add graceful shutdown drain to ServiceBusProcessorClient#48192
Conversation
When ServiceBusProcessorClient.close() is called while message handlers are still executing, the receiver was disposed immediately, causing in-flight handlers to fail with IllegalStateException. Add drain-before-dispose logic using an AtomicInteger handler counter and Object monitor wait/notify to all processor shutdown paths: - MessagePump (V2 non-session) - ServiceBusProcessor.RollingMessagePump (V2 non-session lifecycle) - SessionsMessagePump.RollingSessionReceiver (V2 session) - ServiceBusProcessorClient V1 close path The drain executes before subscription cancellation/disposal, with a configurable timeout (default 30s) to prevent indefinite blocking. Includes 3 regression tests in ServiceBusProcessorGracefulShutdownTest.
There was a problem hiding this comment.
Pull request overview
This PR adds “drain before dispose” behavior to Service Bus processor shutdown paths to avoid failing in-flight message handlers (and their settlement calls) with IllegalStateException when the underlying receiver is disposed during close().
Changes:
- Track in-flight handler execution and block shutdown briefly to allow handlers to complete (with a 30s timeout).
- Apply draining to V2 non-session (
MessagePump/RollingMessagePump), V2 session (RollingSessionReceiver.terminate()), and V1 (ServiceBusProcessorClient.close()). - Add regression tests covering V2 non-session and V1 shutdown draining plus a drain-timeout test.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java | Adds handler counting + drainHandlers(Duration) used to block shutdown until in-flight handlers finish or timeout. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java | Wires draining into RollingMessagePump.dispose() before disposing the subscription. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java | Adds per-session handler counting + drain during termination before disposing the worker scheduler. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java | Adds V1 handler counting + drainV1Handlers(Duration) invoked during close() before subscription cancellation. |
| sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java | Adds new unit tests validating drain behavior for V2 non-session, V1, and drain timeout. |
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:458
SessionsMessagePump.RollingSessionReceivernow includes new drain-before-dispose behavior, but the added logic isn’t covered by the new regression tests. There are already isolated unit tests forSessionsMessagePumpbehavior; it should be possible to extend them to verify that termination waits for an in-flight handler (or respects the timeout) for the session path as well.
Please add a unit test that exercises session termination while a handler is blocked, to prevent regressions in this new shutdown behavior.
// Drain in-flight message handlers BEFORE disposing the worker scheduler.
// Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()).
// Draining first ensures handlers can complete message settlement before threads are interrupted.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainHandlers(DRAIN_TIMEOUT);
workerScheduler.dispose();
- Add ThreadLocal<Boolean> flag to detect when drainHandlers is called from within a message handler (e.g., user calls close() inside processMessage callback) - Guard all three drain paths: MessagePump.drainHandlers(), ServiceBusProcessorClient.drainV1Handlers(), and SessionsMessagePump.RollingSessionReceiver.drainHandlers() - When re-entrant call detected, skip drain with warning log and return immediately to avoid self-deadlock - Add v2DrainFromWithinHandlerShouldNotDeadlock test verifying the guard prevents deadlock when drain is called from handler thread
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:161
- This test uses a fixed
Thread.sleep(200)to “give dispose a moment to start”. Fixed sleeps are prone to flakiness on slow/loaded CI agents (either too short or unnecessarily long). Prefer a synchronization point that directly observes the expected state (e.g.,assertFalse(disposeDone.await(...)), a latch signaled right before/after entering drain, or Mockito’s timed verification APIs) so the test doesn’t depend on timing heuristics.
// Give dispose a moment to start; it should be blocked in drainHandlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose).
verify(client, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:256
- This test uses a fixed
Thread.sleep(200)to “give close a moment to start”. As written, it can be flaky under variable scheduling/CPU contention. Use an explicit synchronization condition (e.g.,assertFalse(closeDone.await(...))or a latch that confirms the close thread is blocked in the drain) instead of a fixed sleep.
// Give close a moment to start; it should be blocked in drainV1Handlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking close).
verify(asyncClient, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:54
- The class-level "Coverage Matrix" JavaDoc is now out of sync with the actual tests in this class: it lists only the V2/V1/timeout scenarios, but the class also includes a re-entrant drain regression test (
v2DrainFromWithinHandlerShouldNotDeadlock). Please update the matrix to reflect all covered scenarios so readers don’t miss this important case.
* <h3>Coverage Matrix</h3>
* <ul>
* <li><b>V2 Non-Session</b> — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}</li>
* <li><b>V1 Non-Session</b> — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}</li>
* <li><b>Drain Timeout</b> — {@link #v2DrainShouldRespectTimeout()}:
* Tests {@code MessagePump.drainHandlers()} timeout behavior directly</li>
After drainHandlers() returns but before the Flux subscription is disposed, flatMap can dispatch a new handler that attempts settlement on a closing client. Add a volatile boolean closing flag to MessagePump and SessionsMessagePump.RollingSessionReceiver, set at the start of drainHandlers() and checked at the top of handleMessage(). Handlers that see the flag skip processing and return immediately. V1 path is unaffected (isRunning already gates subscription.request). New test: v2ClosingFlagPreventsNewHandlersAfterDrainStarts.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java:176
isHandlerThreadis an instanceThreadLocal. Setting it back toFALSEleaves an entry in the thread’sThreadLocalMapfor eachMessagePumpinstance that ever ran on that pooled thread. Since pumps can be recreated (rolling/retry) and worker threads are long-lived, this can accumulate stale entries and increase memory usage over time. Prefer callingisHandlerThread.remove()in thefinallyblock instead ofset(Boolean.FALSE)to ensure the entry is cleared.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java:503
isV1HandlerThreadis an instanceThreadLocal. Resetting it withset(Boolean.FALSE)keeps a per-processor entry in the thread’sThreadLocalMap, which can accumulate on pooled threads over the lifetime of the application. PreferisV1HandlerThread.remove()in thefinallyblock to fully clear the entry after each callback.
} finally {
isV1HandlerThread.set(Boolean.FALSE);
if (activeV1HandlerCount.decrementAndGet() == 0) {
synchronized (v1DrainLock) {
v1DrainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:571
isHandlerThreadis an instanceThreadLocal. Setting it toFALSEleaves an entry behind in the thread’sThreadLocalMap, and since session pumps/receivers can be recreated while using long-lived pooled threads, this can accumulate stale entries over time. PreferisHandlerThread.remove()in thefinallyblock to clear theThreadLocalafter each handler execution.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
}
On pooled threads (Reactor boundedElastic), set(FALSE) leaves a stale entry in the ThreadLocalMap after the pump is GC'd. remove() clears the entry immediately, following Java best practice for ThreadLocal cleanup on long-lived worker threads.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (4)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:157
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:251
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:481
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(500);
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:483
- The drain implementation (counter + lock + re-entrancy guard + timeout loop) is duplicated across
MessagePump,SessionsMessagePump, and V1 (ServiceBusProcessorClient). To reduce divergence risk and make future fixes (e.g., re-entrancy behavior) consistent, consider extracting a small shared helper (package-private) that encapsulates the counter/lock/wait-notify pattern and exposes a clear “drain result”.
private void drainHandlers(Duration timeout) {
closing = true;
if (isHandlerThread.get()) {
// Re-entrant call from within a session message handler (e.g., user called close() inside processMessage).
// Waiting here would self-deadlock because this thread's handler incremented the counter and
// cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will
// complete naturally after this handler returns.
logger.atWarning()
.log("drainHandlers called from within a session message handler (re-entrant). "
+ "Skipping drain to avoid self-deadlock.");
return;
}
When a handler calls close() re-entrantly, the drain now waits for OTHER concurrent handlers to complete (threshold=1) before cancelling subscriptions and closing the client. Previously the drain returned immediately, which could interrupt concurrent handlers mid-settlement. Applied consistently across V1 (ServiceBusProcessorClient), V2 (MessagePump), and sessions (SessionsMessagePump). Notification threshold updated from == 0 to <= 1 so the re-entrant waiter gets notified.
…builder structural test The previous Spring revert (481f1bf) removed the drainTimeout property additions on the grounds that the wiring was missing and the Spring servicebus dependency was still pinned to 7.17.17. The From-Source CI run picked that up and failed in spring-cloud-azure-service: ServiceBusSessionProcessorClientBuilderFactoryTests.supportSdkBuilderAllProperties:151 expected: <true> but was: <false> Builder class owned property names: [..., drainTimeout, ...] Unsupported property names: [draintimeout] This is a structural integrity test that walks the SDK builder via reflection (against the from-source build, so it sees the new drainTimeout(Duration) method) and asserts every builder property has a corresponding Spring property. Without the Spring property, the test fails. Re-add the property scaffolding only (no factory wiring, no CHANGELOG entry): - ServiceBusProcessorClientProperties: getDrainTimeout() with javadoc explaining the deferred wiring. - AzureServiceBusProperties.Processor: drainTimeout field + getter/setter. - ServiceBusProcessorClientTestProperties: drainTimeout field + getter/setter. - ProcessorProperties (spring-messaging-azure-servicebus): drainTimeout field + getter/setter. The factory wiring (ServiceBus(Session)?ProcessorClientBuilderFactory) and the actual builder.drainTimeout(...) call cannot land yet because the Spring modules still pin azure-messaging-servicebus 7.17.17 (no drainTimeout(Duration) method); adding the wiring would break compile in non-from-source CI. The wiring will land in a follow-up PR once the Spring servicebus dep is bumped. Addresses CI failure on PR Azure#48192.
…ained throughput Both MessagePump.handleMessage and SessionsMessagePump.RollingSessionReceiver.handleMessage previously incremented activeHandlerCount BEFORE checking the closing flag. Under sustained throughput with concurrency > 1, the upstream subscription remains live while drainHandlers() is waiting (the subscription is only disposed after drain returns), so flatMap keeps dispatching messages. Each skip-path invocation incremented and then decremented the counter, but the cumulative effect could keep activeHandlerCount > 0 long enough to push drain to its full timeout even when no real user handlers were left in flight. Fix: read the volatile closing flag at the top of handleMessage (before the increment). Messages that arrive after closing=true is set are now dropped without touching the drain's exit condition. TOCTOU note: closing can flip between the early check and the increment, so the existing post-increment check is retained. Race-losers still skip work and the increment is balanced by the decrement in finally (which notifyAll's the drain when the count drops to 0). Addresses Copilot feedback on PR Azure#48192.
…drain in V1, clear currentPump after dispose Four fixes addressing Copilot review feedback on PR Azure#48192: 1. V1 errors during shutdown - V1 onNext was checking v1Closing before delivering serviceBusMessageContext.hasError() to the user's processError callback. Errors don't touch the receiver and the application has the right to know what failed before close completes. Now: errors always flow through (no shutdown skip); only message dispatches take the drain-aware fast path. 2. V1 fast-path consistency with V2 - V1 onNext now mirrors MessagePump.handleMessage by checking v1Closing before activeV1HandlerCount.incrementAndGet so message-skip-path invocations don't extend the drain under sustained throughput. The check-then-act race-loser case is handled by retaining the post-increment v1Closing check. 3. ServiceBusProcessor.RollingMessagePump.dispose() now clears currentPump after disposable.dispose() so we don't retain the pump's underlying client past its useful lifetime. A subsequent start() cycle assigns a fresh pump in beginIntern() before any drain on this RollingMessagePump can run again. 4. cspell: replaced 'TOCTOU' acronym in two comments with 'check-then-act race' since cspell flagged the acronym as an unknown word and broke the Build Analyze CI check. All 13 ServiceBusProcessorGracefulShutdownTest tests pass; full module: 959 tests, 0 failures.
The previous Javadoc included PR-specific narrative (intentionally omitted in this PR, dep pin rationale, internal factory mapping snippet) that would have become stale once the Spring servicebus dep is bumped and the wiring is added. Replaced with a timeless property-semantics description that mirrors the SDK builder setter and notes the null fallback. Addresses Copilot feedback on PR Azure#48192.
…r constructors Both constructors now Objects.requireNonNull(drainTimeout) at construction time so a null timeout cannot surface as an NPE later in RollingMessagePump.dispose() / drainHandlers(). Fail fast with a clear message instead of crashing during shutdown. ServiceBusProcessorClientOptions.setDrainTimeout already validates non-null; this adds a second-line defense for any code path that constructs a ServiceBusProcessor directly bypassing the options object. Addresses Copilot feedback on PR Azure#48192.
…ND_DELETE message loss All three drain paths (V1 onNext in ServiceBusProcessorClient, V2 MessagePump.handleMessage, V2 SessionsMessagePump.RollingSessionReceiver.handleMessage) used to skip handler dispatch for any message that arrived after closing=true (or v1Closing=true) was set. This is correct only for PEEK_LOCK - the broker still owns the lock and will redeliver on the next session. In RECEIVE_AND_DELETE the broker has already removed the message before delivery, so skipping the handler loses it permanently. Fix: gate the skip on the receive mode. Each pump caches a skipDuringDrain boolean at construction (true only for PEEK_LOCK) and the handleMessage / onNext fast path checks both closing && skipDuringDrain. RECEIVE_AND_DELETE callers always reach processMessage even during the drain window. Plumbing: - MessagePump derives skipDuringDrain from client.getReceiverOptions().getReceiveMode() at construction. Defensive against null options (test mocks that didn't stub them) - defaults to skipDuringDrain=false (no skip, the safer default since data loss is worse than slightly extended drain). - ServiceBusProcessorClient.receiveMessages() captures the same boolean once per cycle and the onNext lambda closes over it. Same null-safe default. - SessionsMessagePump caches skipDuringDrain on construction (derived from the receiveMode parameter it already accepts) and forwards it to RollingSessionReceiver via a new constructor parameter. Adds 2 RECEIVE_AND_DELETE regression tests: - v2ReceiveAndDeleteModeDoesNotSkipDuringDrain: verifies the V2 pump still dispatches message2 to the consumer when emitted after closing=true. - v1ReceiveAndDeleteModeDoesNotSkipDuringDrain: same guarantee for the V1 onNext path (concurrency=2 to use the parallel/runOn path that doesn't depend on isRunning to issue further requests). Adds peekLockOptions() test helper and updates existing 13 mocks to stub getReceiverOptions() with PEEK_LOCK so the existing tests continue exercising the skip-path. Uses a real ReceiverOptions instance (via createNonSessionOptions) instead of a Mockito mock to avoid the UnfinishedStubbing trap when the helper is invoked inside another when().thenReturn() clause. All 15 ServiceBusProcessorGracefulShutdownTest tests pass; full module: 961 tests, 0 failures.
…lose() race handleError() derived fullyQualifiedNamespace and entityPath from asyncClient.get() at the moment of the user error. In a re-entrant close() scenario (the user calls close() from inside processMessage), close()'s cleanup phase nulls asyncClient before the calling handler returns. If the calling handler then throws a Throwable that flows through onNext's catch -> handleError(), the asyncClient.get() returns null and handleError NPEs - the NPE is swallowed by the outer try/catch and processError is never invoked, silently dropping the user's error. Fix: cache the namespace and entity path in volatile fields, refreshed on every receive cycle in receiveMessages(). handleError reads from the cache, falling back to the live client only when the cache hasn't been populated yet (first-ever error before any receive cycle), so processError always gets a non-null context. Addresses Copilot feedback on PR Azure#48192.
Fixes #45716
When
ServiceBusProcessorClient.close()is called while message handlers are still executing, the receiver is disposed immediately, causing in-flight handlers to fail withIllegalStateException: Cannot perform operation on a disposed receiver.What this PR does
Adds drain-before-dispose logic to all processor shutdown paths. An
AtomicIntegerhandler counter withObjectmonitor wait/notify blocksclose()until all in-flight message handlers complete (or a 30-second timeout expires) before subscription cancellation/disposal:MessagePump(V2 non-session) —drainHandlers()addedServiceBusProcessor.RollingMessagePump(V2 lifecycle) — callspump.drainHandlers()beforedisposable.dispose()SessionsMessagePump.RollingSessionReceiver(V2 session) — per-session drain interminate()beforeworkerScheduler.dispose()ServiceBusProcessorClient(V1) —drainV1Handlers()beforereceiverSubscriptions.cancel()This mirrors the .NET SDK's
StopProcessingAsyncbehavior which awaitsTask.WhenAllon in-flight handlers before disposing.V1 lifecycle hardening (added during review)
ServiceBusProcessorClient.close()originally held the instance monitor across the entire drain wait, which let any in-flight handler calling a synchronized accessor on the same client (isRunning(),getIdentifier()) stall shutdown for the full drain timeout. Releasing the monitor across the drain opened additional races withstart()/stop()/restartMessageReceiver()and concurrentclose()calls. The PR now:v1CloseInProgressandv2CloseInProgressAtomicBooleans (both claimed viacompareAndSet) sostart(),stop(),restartMessageReceiver()return early during shutdown and only one concurrentclose()performs cleanup on each path.getIdentifier()returns a stable value during/afterclose()without lazy-creating a fresh receiver.getIdentifier()may now returnnullon the V1 path whenclose()is in progress on a brand-new processor that never started (Javadoc updated).asyncClientin the connection-monitor lambda to prevent NPE when a tick races pastmonitorDisposable.dispose()and observesasyncClient == null.Spring property scaffolding
This PR adds a
getDrainTimeout()accessor to four Spring property POJOs:ServiceBusProcessorClientProperties(interface)AzureServiceBusProperties.ProcessorServiceBusProcessorClientTestPropertiesProcessorProperties(spring-messaging-azure-servicebus)The Spring property additions are scaffolding only: the
ServiceBusSessionProcessorClientBuilderFactorydoes not yet wireproperties.getDrainTimeout()tobuilder::drainTimeout, andsdk/spring/CHANGELOG.mdhas no entry. Reason: the Spring modules pincom.azure:azure-messaging-servicebus:7.17.17, which doesn't exposebuilder::drainTimeout, so the wiring would not compile against the released dependency. The factory wiring + CHANGELOG entry will land in a follow-up PR after the Spring servicebus dependency is bumped to the release that shipsdrainTimeout(Duration).The scaffolding itself is required in this PR because
spring-cloud-azure-service'sAzureGenericServiceClientBuilderFactoryBaseTests.supportSdkBuilderAllPropertieswalks the SDK builder via reflection (against the from-source build) and asserts every public builder setter has a corresponding Spring property. WithoutgetDrainTimeout()on the property surface, that test fails CI onFrom-Sourceruns.Tests
13 tests in
ServiceBusProcessorGracefulShutdownTest:v2CloseShouldWaitForInFlightHandlerBeforeClosingClient,v1CloseShouldWaitForInFlightHandlerBeforeClosingClientv2DrainShouldRespectTimeout,v2DrainFromWithinHandlerShouldNotDeadlock,v1ReentrantCloseWaitsForOtherConcurrentHandlersv2ClosingFlagPreventsNewHandlersAfterDrainStarts,v1ClosingFlagPreventsNewHandlersAfterDrainStarts,v1StartAfterCloseResetsClosingFlagv1CloseShouldNotHoldClientMonitorDuringDrain,v1ConcurrentStartDuringCloseDrainIsIgnored,v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver,v1ConcurrentCloseCallsDoNotRacev2ConcurrentStartDuringCloseDrainIsIgnoredPlus 2 builder validation tests in
ServiceBusClientBuilderUnitTestcoveringdrainTimeout(Duration)on both processor builders (null / zero / negative throw, positive accepted, propagates throughbuildProcessorClient).All lifecycle coordination in tests uses deterministic
waitFor(...)polling (predicate-based) or controllableSinks.Manyemission instead of fixedThread.sleepwaits so the suite is robust on slow/contended CI.Full module test suite: 959 tests pass, 0 failures, 0 errors.