Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Breaking Changes

### Bugs Fixed
- Fixed an async retry hang that could occur when draining a retryable response body after the response was closed.

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,18 @@ private Mono<HttpResponse> attemptAsync(HttpPipelineCallContext context, HttpPip
int newPrimaryTry = getNewPrimaryTry(considerSecondary, primaryTry, tryingPrimary);

Flux<ByteBuffer> responseBody = response.getBody();
response.close();
final boolean consideredSecondaryForRetry = newConsiderSecondary;

if (responseBody == null) {
return attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry,
response.close();
return attemptAsync(context, next, originalRequest, consideredSecondaryForRetry, newPrimaryTry,
attempt + 1, suppressed);
} else {
return responseBody.ignoreElements()
.then(attemptAsync(context, next, originalRequest, newConsiderSecondary, newPrimaryTry,
attempt + 1, suppressed));
.doFinally(ignored -> response.close())
.onErrorResume(drainError -> Mono.empty())
.then(Mono.defer(() -> attemptAsync(context, next, originalRequest, consideredSecondaryForRetry,
newPrimaryTry, attempt + 1, suppressed)));
Comment thread
arnabnandy7 marked this conversation as resolved.
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

Expand All @@ -38,6 +42,8 @@
public class RequestRetryPolicyTest {
static RequestRetryOptions retryTestOptions
= new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, 4, 2, 100L, 1000L, "SecondaryHost");
private static final RequestRetryOptions FAST_RETRY_OPTIONS
= new RequestRetryOptions(RetryPolicyType.FIXED, 4, (Integer) null, 1L, 5L, null);

@SyncAsyncTest
public void requestRetryPolicySuccessMaxRetries() {
Expand Down Expand Up @@ -146,6 +152,123 @@ public Mono<HttpResponse> send(HttpRequest request) {
assertEquals(3, closeCalls.get());
}

@Test
public void retryDrainsBodyBeforeClosingAsync() {
Comment thread
arnabnandy7 marked this conversation as resolved.
AtomicInteger requestCount = new AtomicInteger();
AtomicBoolean firstResponseClosed = new AtomicBoolean();

final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS))
.httpClient(new NoOpHttpClient() {
@Override
public Mono<HttpResponse> send(HttpRequest request) {
if (requestCount.incrementAndGet() == 1) {
return Mono.just(new MockHttpResponse(request, 500) {
@Override
public Flux<ByteBuffer> getBody() {
return Flux.defer(() -> firstResponseClosed.get()
? Flux.never()
: Flux.just(ByteBuffer.wrap(new byte[0])));
}

@Override
public void close() {
firstResponseClosed.set(true);
super.close();
}
});
}

return Mono.just(new MockHttpResponse(request, 200));
}
})
.build();

StepVerifier.create(sendRequest(pipeline))
.expectNextMatches(response -> response.getStatusCode() == 200)
.expectComplete()
.verify(Duration.ofSeconds(1));
assertEquals(2, requestCount.get());
Assertions.assertTrue(firstResponseClosed.get(),
"Retryable response should be closed after draining the body.");
}

@Test
public void retryContinuesWhenDrainingBodyFailsAsync() {
AtomicInteger requestCount = new AtomicInteger();
AtomicBoolean firstResponseClosed = new AtomicBoolean();

final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS))
.httpClient(new NoOpHttpClient() {
@Override
public Mono<HttpResponse> send(HttpRequest request) {
if (requestCount.incrementAndGet() == 1) {
return Mono.just(new MockHttpResponse(request, 500) {
@Override
public Flux<ByteBuffer> getBody() {
return Flux.error(new IllegalStateException("Body was discarded after close."));
}

@Override
public void close() {
firstResponseClosed.set(true);
super.close();
}
});
}

return Mono.just(new MockHttpResponse(request, 200));
}
})
.build();

StepVerifier.create(sendRequest(pipeline))
.expectNextMatches(response -> response.getStatusCode() == 200)
.expectComplete()
.verify(Duration.ofSeconds(1));
assertEquals(2, requestCount.get());
Assertions.assertTrue(firstResponseClosed.get(), "Retryable response should be closed after a drain error.");
}

@Test
public void retryAfter500StillRetriesWhenDrainingClosedBodyErrors() {
AtomicInteger requestCount = new AtomicInteger();
AtomicBoolean firstResponseClosed = new AtomicBoolean();

final HttpPipeline pipeline = new HttpPipelineBuilder().policies(new RequestRetryPolicy(FAST_RETRY_OPTIONS))
.httpClient(new NoOpHttpClient() {
@Override
public Mono<HttpResponse> send(HttpRequest request) {
if (requestCount.incrementAndGet() == 1) {
return Mono.just(new MockHttpResponse(request, 500) {
@Override
public Flux<ByteBuffer> getBody() {
return Flux.defer(() -> firstResponseClosed.get()
? Flux.error(new IllegalStateException("Body was discarded after close."))
: Flux.just(ByteBuffer.wrap(new byte[0])));
}

@Override
public void close() {
firstResponseClosed.set(true);
super.close();
}
});
}

return Mono.just(new MockHttpResponse(request, 200));
}
})
.build();

StepVerifier.create(sendRequest(pipeline))
.expectNextMatches(response -> response.getStatusCode() == 200)
.expectComplete()
.verify(Duration.ofSeconds(1));
assertEquals(2, requestCount.get());
Assertions.assertTrue(firstResponseClosed.get(),
"Retryable response should be closed after draining the body.");
}

@SyncAsyncTest
public void fixedDelayRetry() {
final int maxRetries = 5;
Expand Down