diff --git a/sdk/storage/azure-storage-common/CHANGELOG.md b/sdk/storage/azure-storage-common/CHANGELOG.md index 30d35fcc7b66..1a7c1dfbfa4b 100644 --- a/sdk/storage/azure-storage-common/CHANGELOG.md +++ b/sdk/storage/azure-storage-common/CHANGELOG.md @@ -7,6 +7,7 @@ ### Breaking Changes ### Bugs Fixed +- Fixed an async retry hang that could occur when draining a retryable response body after the response was closed. ### Other Changes diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java index 7c64c1c51479..997ffe854101 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java @@ -174,15 +174,18 @@ private Mono attemptAsync(HttpPipelineCallContext context, HttpPip int newPrimaryTry = getNewPrimaryTry(considerSecondary, primaryTry, tryingPrimary); Flux responseBody = response.getBody(); - response.close(); + final boolean consideredSecondaryForRetry = newConsiderSecondary; if (responseBody == null) { - return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, + response.close(); + return attemptAsync(context, next, originalRequest, consideredSecondaryForRetry, newPrimaryTry, attempt + 1, suppressed); } else { return responseBody.ignoreElements() - .then(attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry, - attempt + 1, suppressed)); + .doFinally(ignored -> response.close()) + .onErrorResume(drainError -> Mono.empty()) + .then(Mono.defer(() -> attemptAsync(context, next, originalRequest, consideredSecondaryForRetry, + newPrimaryTry, attempt + 1, suppressed))); } } diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java index 0238364c7c2f..8ff42bf3a55c 100644 --- a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/RequestRetryPolicyTest.java @@ -22,11 +22,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.Exceptions; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -38,6 +42,8 @@ public class RequestRetryPolicyTest { static RequestRetryOptions retryTestOptions = new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, 4, 2, 100L, 1000L, "SecondaryHost"); + private static final RequestRetryOptions FAST_RETRY_OPTIONS + = new RequestRetryOptions(RetryPolicyType.FIXED, 4, (Integer) null, 1L, 5L, null); @SyncAsyncTest public void requestRetryPolicySuccessMaxRetries() { @@ -146,6 +152,123 @@ public Mono send(HttpRequest request) { assertEquals(3, closeCalls.get()); } + @Test + public void retryDrainsBodyBeforeClosingAsync() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.defer(() -> firstResponseClosed.get() + ? Flux.never() + : Flux.just(ByteBuffer.wrap(new byte[0]))); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .expectComplete() + .verify(Duration.ofSeconds(1)); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), + "Retryable response should be closed after draining the body."); + } + + @Test + public void retryContinuesWhenDrainingBodyFailsAsync() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.error(new IllegalStateException("Body was discarded after close.")); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .expectComplete() + .verify(Duration.ofSeconds(1)); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after a drain error."); + } + + @Test + public void retryAfter500StillRetriesWhenDrainingClosedBodyErrors() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicBoolean firstResponseClosed = new AtomicBoolean(); + + final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS)) + .httpClient(new NoOpHttpClient() { + @Override + public Mono send(HttpRequest request) { + if (requestCount.incrementAndGet() == 1) { + return Mono.just(new MockHttpResponse(request, 500) { + @Override + public Flux getBody() { + return Flux.defer(() -> firstResponseClosed.get() + ? Flux.error(new IllegalStateException("Body was discarded after close.")) + : Flux.just(ByteBuffer.wrap(new byte[0]))); + } + + @Override + public void close() { + firstResponseClosed.set(true); + super.close(); + } + }); + } + + return Mono.just(new MockHttpResponse(request, 200)); + } + }) + .build(); + + StepVerifier.create(sendRequest(pipeline)) + .expectNextMatches(response -> response.getStatusCode() == 200) + .expectComplete() + .verify(Duration.ofSeconds(1)); + assertEquals(2, requestCount.get()); + Assertions.assertTrue(firstResponseClosed.get(), + "Retryable response should be closed after draining the body."); + } + @SyncAsyncTest public void fixedDelayRetry() { final int maxRetries = 5;