Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.mongodb.internal.async;

import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.function.AsyncCallbackLoop;
import com.mongodb.internal.async.function.LoopState;
import com.mongodb.internal.async.function.RetryState;
Expand All @@ -39,15 +38,15 @@
* following "sync" method:
*
* <pre>
* public T myMethod()
* public T myMethod() {
* method1();
* method2();
* }</pre>
*
* <p>The async counterpart would be:
*
* <pre>
* public void myMethodAsync(SingleResultCallback&lt;T> callback)
* public void myMethodAsync(SingleResultCallback&lt;T> callback) {
* beginAsync().thenRun(c -> {
* method1Async(c);
* }).thenRun(c -> {
Expand Down Expand Up @@ -229,11 +228,11 @@ default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
* @return the composition of this, and the looping branch
* @see RetryingAsyncCallbackSupplier
*/
default AsyncRunnable thenRunRetryingWhile(
final TimeoutContext timeoutContext, final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
default AsyncRunnable thenRunRetryingWhile(final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
return thenRun(callback -> {
new RetryingAsyncCallbackSupplier<Void>(
new RetryState(timeoutContext),
new RetryState(),
(previouslyChosenFailure, lastAttemptFailure) -> lastAttemptFailure,
(rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure),
// `finish` is required here instead of `unsafeFinish`
// because only `finish` meets the contract of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.LoopState.AttachmentKey;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -46,62 +45,31 @@
*/
@NotThreadSafe
public final class RetryState {
public static final int RETRIES = 1;
public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
public static final int MAX_RETRIES = 1;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how the limit is called in client-backpressure.md, so I renamed the constant.

private static final int INFINITE_RETRIES = Integer.MAX_VALUE;

private final LoopState loopState;
private final int attempts;
private final boolean retryUntilTimeoutThrowsException;
@Nullable
private Throwable previouslyChosenException;

/**
* Creates a {@code RetryState} with a positive number of allowed retry attempts.
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* <p>
* If a timeout is not specified in the {@link TimeoutContext#hasTimeoutMS()}, the specified {@code retries} argument acts as a fallback
* bound. Otherwise, retries are unbounded until the timeout is reached.
* <p>
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
* which can be used to stop retrying based on a custom condition additionally to {@code retries} and {@link TimeoutContext}.
* </p>
*
* @param retries A positive number of allowed retry attempts.
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param retryUntilTimeoutThrowsException If {@code true}, then if a {@link MongoOperationTimeoutException} is thrown then retrying stops.
*/
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries > 0);
return new RetryState(retries, retryUntilTimeoutThrowsException);
}

public static RetryState withNonRetryableState() {
return new RetryState(0, false);
}

/**
* Creates a {@link RetryState} that does not limit the number of attempts.
* The number of attempts is limited iff {@link TimeoutContext#hasTimeoutMS()} is true and timeout has expired.
* <p>
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
* which can be used to stop retrying based on a custom condition additionally to {@link TimeoutContext}.
* </p>
*
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* Creates a {@link RetryState} that does not explicitly limit the number of attempts.
* Retrying still may be stopped because, for example,
* the failed result from the most recent attempt is {@link MongoOperationTimeoutException}.
*/
public RetryState(final TimeoutContext timeoutContext) {
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
public RetryState() {
this(INFINITE_RETRIES);
}

/**
* @param retries A non-negative number of allowed retry attempts.
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* {@value #INFINITE_RETRIES} is interpreted as {@linkplain #RetryState() absence of explicit limit}.
*/
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
public RetryState(final int retries) {
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
attempts = retries == INFINITE_RETRIES ? INFINITE_RETRIES : retries + 1;
}

/**
Expand Down Expand Up @@ -351,14 +319,14 @@ public boolean isFirstAttempt() {
* An attempt is known to be the last one iff any of the following applies:
* <ul>
* <li>{@link #breakAndThrowIfRetryAnd(Supplier)} / {@link #breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)} / {@link #markAsLastAttempt()} was called.</li>
* <li>A timeout is set and has been reached, as indicated by {@code attemptException}.</li>
* <li>No timeout is set, and the number of attempts is limited, and the current attempt is the last one.</li>
* <li>{@code attemptException} is a {@link MongoOperationTimeoutException}.</li>
* <li>The number of attempts is limited, and the current attempt is the last one.</li>
* </ul>
*
* @see #attempt()
*/
private boolean isLastAttempt(final Throwable attemptException) {
boolean operationTimeout = retryUntilTimeoutThrowsException && attemptException instanceof MongoOperationTimeoutException;
boolean operationTimeout = attemptException instanceof MongoOperationTimeoutException;
Comment thread
stIncMale marked this conversation as resolved.
boolean attemptLimit = attempt() == attempts - 1;
return loopState.isLastIteration() || operationTimeout || attemptLimit;
Comment thread
stIncMale marked this conversation as resolved.
}
Expand Down Expand Up @@ -403,7 +371,7 @@ public <V> Optional<V> attachment(final AttachmentKey<V> key) {
public String toString() {
return "RetryState{"
+ "loopState=" + loopState
+ ", attempts=" + (attempts == INFINITE_ATTEMPTS ? "infinite" : attempts)
+ ", attempts=" + (attempts == INFINITE_RETRIES ? "infinite" : attempts)
+ ", exception=" + previouslyChosenException
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ public RetryingAsyncCallbackSupplier(
this.asyncFunction = asyncFunction;
}

public RetryingAsyncCallbackSupplier(
final RetryState state,
final BiPredicate<RetryState, Throwable> retryPredicate,
final AsyncCallbackSupplier<R> asyncFunction) {
this(state, (previouslyChosenFailure, lastAttemptFailure) -> lastAttemptFailure, retryPredicate, asyncFunction);
}
Comment on lines -90 to -95
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor was used only once, was not documented, and as far as I can tell, wasn't really making anything clearer or simpler.


@Override
public void get(final SingleResultCallback<R> callback) {
/* `asyncFunction` and `callback` are the only externally provided pieces of code for which we do not need to care about
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ private void authenticationLoopAsync(final InternalConnection connection, final
final SingleResultCallback<Void> callback) {
fallbackState = FallbackState.INITIAL;
beginAsync().thenRunRetryingWhile(
operationContext.getTimeoutContext(),
c -> super.authenticateAsync(connection, description, operationContext, c),
e -> triggersRetry(e) && shouldRetryHandler()
).finish(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.async.function.RetryState.INFINITE_ATTEMPTS;
import static com.mongodb.internal.async.function.RetryState.MAX_RETRIES;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -123,11 +123,9 @@ private static Throwable chooseRetryableWriteException(

static RetryState initialRetryState(final boolean retry, final TimeoutContext timeoutContext) {
if (retry) {
boolean retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS();
int retries = retryUntilTimeoutThrowsException ? INFINITE_ATTEMPTS : RetryState.RETRIES;
return RetryState.withRetryableState(retries, retryUntilTimeoutThrowsException);
return timeoutContext.hasTimeoutMS() ? new RetryState() : new RetryState(MAX_RETRIES);
}
return RetryState.withNonRetryableState();
return new RetryState(0);
}

private static final List<Integer> RETRYABLE_ERROR_CODES = asList(6, 7, 89, 91, 134, 189, 262, 9001, 13436, 13435, 11602, 11600, 10107);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,15 @@ public String getCommandName() {

@Override
public BulkWriteResult execute(final WriteBinding binding, final OperationContext operationContext) {
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
/* We cannot use the tracking of attempts built in the `RetryState` class because conceptually we have to maintain multiple attempt
* counters while executing a single bulk write operation:
* - a counter that limits attempts to select server and checkout a connection before we created a batch;
* - a counter per each batch that limits attempts to execute the specific batch.
* Fortunately, these counters do not exist concurrently with each other. While maintaining the counters manually,
* we must adhere to the contract of `RetryingSyncSupplier`. When the retry timeout is implemented, there will be no counters,
* and the code related to the attempt tracking in `BulkWriteTracker` will be removed. */
RetryState retryState = new RetryState(timeoutContext);
BulkWriteTracker.attachNew(retryState, retryWrites, timeoutContext);
RetryState retryState = new RetryState();
BulkWriteTracker.attachNew(retryState, retryWrites, operationContext.getTimeoutContext());
Supplier<BulkWriteResult> retryingBulkWrite = decorateWriteWithRetries(retryState, operationContext, () ->
withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection, operationContextWithMinRTT) -> {
TimeoutContext timeoutContextWithMinRtt = operationContextWithMinRTT.getTimeoutContext();
Expand Down Expand Up @@ -226,10 +225,9 @@ public BulkWriteResult execute(final WriteBinding binding, final OperationContex
}

public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback<BulkWriteResult> callback) {
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
// see the comment in `execute(WriteBinding)` explaining the manual tracking of attempts
RetryState retryState = new RetryState(timeoutContext);
BulkWriteTracker.attachNew(retryState, retryWrites, timeoutContext);
RetryState retryState = new RetryState();
BulkWriteTracker.attachNew(retryState, retryWrites, operationContext.getTimeoutContext());
binding.retain();
AsyncCallbackSupplier<BulkWriteResult> retryingBulkWrite = this.<BulkWriteResult>decorateWriteWithRetries(retryState,
operationContext,
Expand Down Expand Up @@ -493,7 +491,7 @@ private static void attach(final RetryState retryState, final BulkWriteTracker t

private BulkWriteTracker(final boolean retry, @Nullable final BulkWriteBatch batch, final TimeoutContext timeoutContext) {
attempt = 0;
attempts = retry ? RetryState.RETRIES + 1 : 1;
attempts = retry ? RetryState.MAX_RETRIES + 1 : 1;
this.batch = batch;
this.retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package com.mongodb.internal.async;

import com.mongodb.MongoException;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.TimeoutSettings;
import org.junit.jupiter.api.Test;

import java.util.function.BiConsumer;
Expand All @@ -29,8 +27,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

abstract class AsyncFunctionsAbstractTest extends AsyncFunctionsTestBase {
private static final TimeoutContext TIMEOUT_CONTEXT = new TimeoutContext(new TimeoutSettings(0, 0, 0, 0L, 0));

@Test
void test1Method() {
// the number of expected variations is often: 1 + N methods invoked
Expand Down Expand Up @@ -856,7 +852,6 @@ void testRetryLoop() {
},
(callback) -> {
beginAsync().thenRunRetryingWhile(
TIMEOUT_CONTEXT,
c -> async(plainTest(0) ? 1 : 2, c),
e -> e.getMessage().equals("exception-1")
).finish(callback);
Expand Down
Loading