From 61670dfecb5e1b9fb31a4a5f29ea1333c3ec9f63 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 22:05:22 +0530 Subject: [PATCH 1/7] Fix async storage retry response drain ordering --- .../common/policy/RequestRetryPolicy.java | 3 +- .../common/policy/RequestRetryPolicyTest.java | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index 7c64c1c51479..66921c912b5c 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -174,13 +174,14 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip int newPrimaryTry = getNewPrimaryTry(considerSecondary, primaryTry, tryingPrimary); Flux responseBody = response.getBody(); - response.close(); if (responseBody == null) { + response.close(); return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, attempt + 1, suppressed); } else { return responseBody.ignoreElements() + .doFinally(ignored -> response.close()) .then(attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, attempt + 1, suppressed)); } diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index 0238364c7c2f..9540fcac96a0 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -22,11 +22,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.Exceptions; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -146,6 +150,43 @@ public Mono send(HttpRequest request) { assertEquals(3, closeCalls.get()); } + @Test + public void retryDrainsBodyBeforeClosingAsync() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(retryTestOptions)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.defer(() -> firstResponseClosed.get() + ? Flux.never() + : Flux.just(ByteBuffer.wrap(new byte[0]))); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .verifyComplete(); + assertEquals(2, requestCount.get()); + } + @SyncAsyncTest public void fixedDelayRetry() { final int maxRetries = 5; From e2d447ff25c72abf8d4086065d7ee7e71717a95c Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 22:13:37 +0530 Subject: [PATCH 2/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../storage/common/policy/RequestRetryPolicy.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index 66921c912b5c..93960e7b6d04 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -181,9 +181,13 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip attempt + 1, suppressed); } else { return responseBody.ignoreElements() - .doFinally(ignored -> response.close()) - .then(attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, - attempt + 1, suppressed)); + .doOnError(e -> response.close()) + .doOnCancel(response::close) + .then(Mono.defer(() -> { + response.close(); + return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, + attempt + 1, suppressed); + })); } } From c3bdf17918ff79448a3b60f443ef42dd293d8e9b Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 22:16:25 +0530 Subject: [PATCH 3/7] Fixed the test cases on basis of review suggestion from copilot --- .../azure/storage/common/policy/RequestRetryPolicyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index 9540fcac96a0..ba0928bdc181 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -183,7 +183,8 @@ public void close() { StepVerifier.create(sendRequest(pipeline)) .expectNextMatches(response -> response.getStatusCode() == 200) - .verifyComplete(); + .expectComplete() + .verify(Duration.ofSeconds(5)); assertEquals(2, requestCount.get()); } From 586309b460328184071909f63a8af61969597467 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 22:47:50 +0530 Subject: [PATCH 4/7] Handled lambda expression --- .../storage/common/policy/RequestRetryPolicy.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index 93960e7b6d04..4906695198bb 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -174,18 +174,20 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip int newPrimaryTry = getNewPrimaryTry(considerSecondary, primaryTry, tryingPrimary); Flux responseBody = response.getBody(); + final boolean retryConsiderSecondary = newConsiderSecondary; + final HttpResponse retryResponse = response; if (responseBody == null) { - response.close(); - return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, + retryResponse.close(); + return attemptAsync(context, next, originalRequest, retryConsiderSecondary, newPrimaryTry, attempt + 1, suppressed); } else { return responseBody.ignoreElements() - .doOnError(e -> response.close()) - .doOnCancel(response::close) + .doOnError(e -> retryResponse.close()) + .doOnCancel(retryResponse::close) .then(Mono.defer(() -> { - response.close(); - return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, + retryResponse.close(); + return attemptAsync(context, next, originalRequest, retryConsiderSecondary, newPrimaryTry, attempt + 1, suppressed); })); } From 5f0657e128dd016cd195786e6f6331efe2a37721 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 23:02:11 +0530 Subject: [PATCH 5/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../azure/storage/common/policy/RequestRetryPolicyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index ba0928bdc181..5fa3228c5dd0 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -185,7 +185,8 @@ public void close() { .expectNextMatches(response -> response.getStatusCode() == 200) .expectComplete() .verify(Duration.ofSeconds(5)); - assertEquals(2, requestCount.get()); +assertEquals(2, requestCount.get()); +Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after draining the body."); } @SyncAsyncTest From dc4372fd2fb4523bad2ae969f913193e21f4de2b Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Sun, 21 Jun 2026 23:16:45 +0530 Subject: [PATCH 6/7] Addressed both Copilot comments. --- .../storage/common/policy/RequestRetryPolicy.java | 10 +++------- .../storage/common/policy/RequestRetryPolicyTest.java | 5 +++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index 4906695198bb..ed2528e18ae5 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -183,13 +183,9 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip attempt + 1, suppressed); } else { return responseBody.ignoreElements() - .doOnError(e -> retryResponse.close()) - .doOnCancel(retryResponse::close) - .then(Mono.defer(() -> { - retryResponse.close(); - return attemptAsync(context, next, originalRequest, retryConsiderSecondary, newPrimaryTry, - attempt + 1, suppressed); - })); + .doFinally(ignored -> retryResponse.close()) + .then(Mono.defer(() -> attemptAsync(context, next, originalRequest, retryConsiderSecondary, + newPrimaryTry, attempt + 1, suppressed))); } } diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index 5fa3228c5dd0..e9eda52ca185 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -185,8 +185,9 @@ public void close() { .expectNextMatches(response -> response.getStatusCode() == 200) .expectComplete() .verify(Duration.ofSeconds(5)); -assertEquals(2, requestCount.get()); -Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after draining the body."); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), + "Retryable response should be closed after draining the body."); } @SyncAsyncTest From 1d5b69ce1a96776c50518fb0f983a5b46407845c Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 00:34:42 +0530 Subject: [PATCH 7/7] Suggestions implemented based on latest review by @ibrandes --- sdk/storage/azure-storage-common/CHANGELOG.md | 1 + .../common/policy/RequestRetryPolicy.java | 12 +-- .../common/policy/RequestRetryPolicyTest.java | 83 ++++++++++++++++++- 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/sdk/storage/azure-storage-common/CHANGELOG.md b/sdk/storage/azure-storage-common/CHANGELOG.md index 30d35fcc7b66..1a7c1dfbfa4b 100644 --- a/sdk/storage/azure-storage-common/CHANGELOG.md +++ b/sdk/storage/azure-storage-common/CHANGELOG.md @@ -7,6 +7,7 @@ ### Breaking Changes ### Bugs Fixed +- Fixed an async retry hang that could occur when draining a retryable response body after the response was closed. ### Other Changes diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index ed2528e18ae5..997ffe854101 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -174,17 +174,17 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip int newPrimaryTry = getNewPrimaryTry(considerSecondary, primaryTry, tryingPrimary); Flux responseBody = response.getBody(); - final boolean retryConsiderSecondary = newConsiderSecondary; - final HttpResponse retryResponse = response; + final boolean consideredSecondaryForRetry = newConsiderSecondary; if (responseBody == null) { - retryResponse.close(); - return attemptAsync(context, next, originalRequest, retryConsiderSecondary, newPrimaryTry, + response.close(); + return attemptAsync(context, next, originalRequest, consideredSecondaryForRetry, newPrimaryTry, attempt + 1, suppressed); } else { return responseBody.ignoreElements() - .doFinally(ignored -> retryResponse.close()) - .then(Mono.defer(() -> attemptAsync(context, next, originalRequest, retryConsiderSecondary, + .doFinally(ignored -> response.close()) + .onErrorResume(drainError -> Mono.empty()) + .then(Mono.defer(() -> attemptAsync(context, next, originalRequest, consideredSecondaryForRetry, newPrimaryTry, attempt + 1, suppressed))); } diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index e9eda52ca185..8ff42bf3a55c 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -42,6 +42,8 @@ public class RequestRetryPolicyTest { static RequestRetryOptions retryTestOptions = new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, 4, 2, 100L, 1000L, "SecondaryHost"); + private static final RequestRetryOptions FAST_RETRY_OPTIONS + = new RequestRetryOptions(RetryPolicyType.FIXED, 4, (Integer) null, 1L, 5L, null); @SyncAsyncTest public void requestRetryPolicySuccessMaxRetries() { @@ -155,7 +157,7 @@ public void retryDrainsBodyBeforeClosingAsync() { AtomicInteger requestCount = new AtomicInteger(); AtomicBoolean firstResponseClosed = new AtomicBoolean(); - final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(retryTestOptions)) + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) .httpClient(new NoOpHttpClient() { @Override public Mono send(HttpRequest request) { @@ -184,7 +186,84 @@ public void close() { StepVerifier.create(sendRequest(pipeline)) .expectNextMatches(response -> response.getStatusCode() == 200) .expectComplete() - .verify(Duration.ofSeconds(5)); + .verify(Duration.ofSeconds(1)); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), + "Retryable response should be closed after draining the body."); + } + + @Test + public void retryContinuesWhenDrainingBodyFailsAsync() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.error(new IllegalStateException("Body was discarded after close.")); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .expectComplete() + .verify(Duration.ofSeconds(1)); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after a drain error."); + } + + @Test + public void retryAfter500StillRetriesWhenDrainingClosedBodyErrors() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.defer(() -> firstResponseClosed.get() + ? Flux.error(new IllegalStateException("Body was discarded after close.")) + : Flux.just(ByteBuffer.wrap(new byte[0]))); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .expectComplete() + .verify(Duration.ofSeconds(1)); assertEquals(2, requestCount.get()); Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after draining the body.");