-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for server selection's deprioritized servers to all topologies. #1860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
282bf57
cb8905c
74ae21f
f39b7a0
2a4cd70
fa35dd9
7b613e4
7e30f2a
f20b482
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,7 +94,8 @@ | |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
| import static java.util.stream.Collectors.toList; | ||
|
|
||
| abstract class BaseCluster implements Cluster { | ||
| @VisibleForTesting(otherwise = PRIVATE) | ||
| public abstract class BaseCluster implements Cluster { | ||
| private static final Logger LOGGER = Loggers.getLogger("cluster"); | ||
| private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); | ||
|
|
||
|
|
@@ -112,10 +113,11 @@ abstract class BaseCluster implements Cluster { | |
| private volatile boolean isClosed; | ||
| private volatile ClusterDescription description; | ||
|
|
||
| BaseCluster(final ClusterId clusterId, | ||
| final ClusterSettings settings, | ||
| final ClusterableServerFactory serverFactory, | ||
| final ClientMetadata clientMetadata) { | ||
| @VisibleForTesting(otherwise = PRIVATE) | ||
| protected BaseCluster(final ClusterId clusterId, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The value of |
||
| final ClusterSettings settings, | ||
| final ClusterableServerFactory serverFactory, | ||
| final ClientMetadata clientMetadata) { | ||
| this.clusterId = notNull("clusterId", clusterId); | ||
| this.settings = notNull("settings", settings); | ||
| this.serverFactory = notNull("serverFactory", serverFactory); | ||
|
|
@@ -361,8 +363,7 @@ private static ServerSelector getCompleteServerSelector( | |
| final ClusterSettings settings) { | ||
| List<ServerSelector> selectors = Stream.of( | ||
| getRaceConditionPreFilteringSelector(serversSnapshot), | ||
| serverSelector, | ||
| serverDeprioritization.getServerSelector(), | ||
| serverDeprioritization.applyDeprioritization(serverSelector), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's rename the method to Let's do this automatically via IDE in a separate commit, and express in the commit message that the commit was done via automatic refactoring, so that reviewers know not to review it. |
||
| settings.getServerSelector(), // may be null | ||
| new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS), | ||
| AtMostTwoRandomServerSelector.instance(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,7 +22,6 @@ | |||||||||||||||||||||||||
| import com.mongodb.ServerAddress; | ||||||||||||||||||||||||||
| import com.mongodb.ServerApi; | ||||||||||||||||||||||||||
| import com.mongodb.connection.ClusterDescription; | ||||||||||||||||||||||||||
| import com.mongodb.connection.ClusterType; | ||||||||||||||||||||||||||
| import com.mongodb.connection.ServerDescription; | ||||||||||||||||||||||||||
| import com.mongodb.internal.IgnorableRequestContext; | ||||||||||||||||||||||||||
| import com.mongodb.internal.TimeoutContext; | ||||||||||||||||||||||||||
|
|
@@ -40,6 +39,8 @@ | |||||||||||||||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicLong; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE; | ||||||||||||||||||||||||||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; | ||||||||||||||||||||||||||
| import static java.util.stream.Collectors.toList; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||
|
|
@@ -113,6 +114,13 @@ public OperationContext withOperationName(final String operationName) { | |||||||||||||||||||||||||
| operationName, tracingSpan); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state. | ||||||||||||||||||||||||||
| // It is a temporary solution to handle cases where deprioritization state persists across operations. | ||||||||||||||||||||||||||
| public OperationContext withServerDeprioritization(final ServerDeprioritization serverDeprioritization) { | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should also leave a note in the ticket description that addressing comments with the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's turn this comment into a documentation comment. Such a change for a program element that is not part of the driver API seems to have benefits with no drawbacks compared to an end-of-line comment.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for callers of the
|
||||||||||||||||||||||||||
| return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, tracingManager, serverApi, | ||||||||||||||||||||||||||
| operationName, tracingSpan); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| public long getId() { | ||||||||||||||||||||||||||
| return id; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
@@ -228,24 +236,26 @@ public static final class ServerDeprioritization { | |||||||||||||||||||||||||
| @Nullable | ||||||||||||||||||||||||||
| private ServerAddress candidate; | ||||||||||||||||||||||||||
| private final Set<ServerAddress> deprioritized; | ||||||||||||||||||||||||||
| private final DeprioritizingSelector selector; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private ServerDeprioritization() { | ||||||||||||||||||||||||||
| @VisibleForTesting(otherwise = PRIVATE) | ||||||||||||||||||||||||||
| public ServerDeprioritization() { | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change does not seem right:
|
||||||||||||||||||||||||||
| candidate = null; | ||||||||||||||||||||||||||
| deprioritized = new HashSet<>(); | ||||||||||||||||||||||||||
| selector = new DeprioritizingSelector(); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||
| * The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select} | ||||||||||||||||||||||||||
| * only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es. | ||||||||||||||||||||||||||
| * If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}. | ||||||||||||||||||||||||||
| * The returned {@link ServerSelector} wraps the provided selector and attempts server selection in two passes: | ||||||||||||||||||||||||||
| * <ol> | ||||||||||||||||||||||||||
| * <li>First pass: calls the wrapped selector with only non-deprioritized {@link ServerDescription}s</li> | ||||||||||||||||||||||||||
| * <li>Second pass: if the first pass returns no servers, calls the wrapped selector again with all servers (including deprioritized ones)</li> | ||||||||||||||||||||||||||
| * </ol> | ||||||||||||||||||||||||||
|
Comment on lines
+247
to
+251
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [optional]
Suggested change
|
||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||
| ServerSelector getServerSelector() { | ||||||||||||||||||||||||||
| return selector; | ||||||||||||||||||||||||||
| ServerSelector applyDeprioritization(final ServerSelector wrappedSelector) { | ||||||||||||||||||||||||||
| return new DeprioritizingSelector(wrappedSelector); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| void updateCandidate(final ServerAddress serverAddress) { | ||||||||||||||||||||||||||
| @VisibleForTesting(otherwise = PACKAGE) | ||||||||||||||||||||||||||
| public void updateCandidate(final ServerAddress serverAddress) { | ||||||||||||||||||||||||||
| candidate = serverAddress; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -263,24 +273,35 @@ public void onAttemptFailure(final Throwable failure) { | |||||||||||||||||||||||||
| * which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe. | ||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||
| private final class DeprioritizingSelector implements ServerSelector { | ||||||||||||||||||||||||||
| private DeprioritizingSelector() { | ||||||||||||||||||||||||||
| private final ServerSelector wrappedSelector; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private DeprioritizingSelector(final ServerSelector wrappedSelector) { | ||||||||||||||||||||||||||
| this.wrappedSelector = wrappedSelector; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||
| public List<ServerDescription> select(final ClusterDescription clusterDescription) { | ||||||||||||||||||||||||||
| List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions(); | ||||||||||||||||||||||||||
| if (!isEnabled(clusterDescription.getType())) { | ||||||||||||||||||||||||||
| return serverDescriptions; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (serverDescriptions.size() == 1 || deprioritized.isEmpty()) { | ||||||||||||||||||||||||||
| return wrappedSelector.select(clusterDescription); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| List<ServerDescription> nonDeprioritizedServerDescriptions = serverDescriptions | ||||||||||||||||||||||||||
| .stream() | ||||||||||||||||||||||||||
| .filter(serverDescription -> !deprioritized.contains(serverDescription.getAddress())) | ||||||||||||||||||||||||||
| .collect(toList()); | ||||||||||||||||||||||||||
|
Comment on lines
+286
to
293
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave a The open questions here are:
|
||||||||||||||||||||||||||
| return nonDeprioritizedServerDescriptions.isEmpty() ? serverDescriptions : nonDeprioritizedServerDescriptions; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private boolean isEnabled(final ClusterType clusterType) { | ||||||||||||||||||||||||||
| return clusterType == ClusterType.SHARDED; | ||||||||||||||||||||||||||
| if (nonDeprioritizedServerDescriptions.isEmpty()) { | ||||||||||||||||||||||||||
| return wrappedSelector.select(clusterDescription); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| List<ServerDescription> selected = wrappedSelector.select( | ||||||||||||||||||||||||||
| new ClusterDescription(clusterDescription.getConnectionMode(), clusterDescription.getType(), | ||||||||||||||||||||||||||
| nonDeprioritizedServerDescriptions, | ||||||||||||||||||||||||||
| clusterDescription.getClusterSettings(), | ||||||||||||||||||||||||||
| clusterDescription.getServerSettings())); | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use the public ClusterDescription(final ClusterConnectionMode connectionMode, final ClusterType type,
@Nullable final MongoException srvResolutionException,
final List<ServerDescription> serverDescriptions,
@Nullable final ClusterSettings clusterSettings,
@Nullable final ServerSettings serverSettings) {constructor. |
||||||||||||||||||||||||||
| return selected.isEmpty() ? wrappedSelector.select(clusterDescription) : selected; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,7 +85,11 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T | |
| final int maxWireVersion) { | ||
| this.changeStreamOperation = changeStreamOperation; | ||
| this.binding = binding.retain(); | ||
| this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride); | ||
| this.initialOperationContext = operationContext | ||
| .withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride) | ||
| //TODO-JAVA-6058. Temporary workaround to reset any server deprioritization | ||
| // state from the previous find operation. | ||
| .withServerDeprioritization(new OperationContext.ServerDeprioritization()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the |
||
| this.wrapped = wrapped; | ||
| this.resumeToken = resumeToken; | ||
| this.maxWireVersion = maxWireVersion; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,16 +193,18 @@ public static ServerVersion getServerVersion() { | |
| if (serverVersion == null) { | ||
| serverVersion = getVersion(new CommandReadOperation<>("admin", | ||
| new BsonDocument("buildInfo", new BsonInt32(1)), new BsonDocumentCodec()) | ||
| .execute(new ClusterBinding(getCluster(), ReadPreference.nearest()), OPERATION_CONTEXT)); | ||
| .execute(new ClusterBinding(getCluster(), ReadPreference.nearest()), getOperationContext())); | ||
| } | ||
| return serverVersion; | ||
| } | ||
|
|
||
| public static final OperationContext OPERATION_CONTEXT = new OperationContext( | ||
| IgnorableRequestContext.INSTANCE, | ||
| new ReadConcernAwareNoOpSessionContext(ReadConcern.DEFAULT), | ||
| new TimeoutContext(TIMEOUT_SETTINGS), | ||
| getServerApi()); | ||
| public static OperationContext getOperationContext() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for noticing the issue with The methods returning
|
||
| return new OperationContext( | ||
| IgnorableRequestContext.INSTANCE, | ||
| new ReadConcernAwareNoOpSessionContext(ReadConcern.DEFAULT), | ||
| new TimeoutContext(TIMEOUT_SETTINGS), | ||
| getServerApi()); | ||
| } | ||
|
|
||
| public static final InternalOperationContextFactory OPERATION_CONTEXT_FACTORY = | ||
| new InternalOperationContextFactory(TIMEOUT_SETTINGS, getServerApi()); | ||
|
|
@@ -255,7 +257,7 @@ public static boolean hasEncryptionTestsEnabled() { | |
| public static Document getServerStatus() { | ||
| return new CommandReadOperation<>("admin", new BsonDocument("serverStatus", new BsonInt32(1)), | ||
| new DocumentCodec()) | ||
| .execute(getBinding(), OPERATION_CONTEXT); | ||
| .execute(getBinding(), getOperationContext()); | ||
| } | ||
|
|
||
| public static boolean supportsFsync() { | ||
|
|
@@ -270,7 +272,7 @@ static class ShutdownHook extends Thread { | |
| public void run() { | ||
| if (cluster != null) { | ||
| try { | ||
| new DropDatabaseOperation(getDefaultDatabaseName(), WriteConcern.ACKNOWLEDGED).execute(getBinding(), OPERATION_CONTEXT); | ||
| new DropDatabaseOperation(getDefaultDatabaseName(), WriteConcern.ACKNOWLEDGED).execute(getBinding(), getOperationContext()); | ||
| } catch (MongoCommandException e) { | ||
| // if we do not have permission to drop the database, assume it is cleaned up in some other way | ||
| if (!e.getMessage().contains("Command dropDatabase requires authentication")) { | ||
|
|
@@ -322,7 +324,7 @@ public static synchronized ConnectionString getConnectionString() { | |
| try { | ||
| BsonDocument helloResult = new CommandReadOperation<>("admin", | ||
| new BsonDocument(LEGACY_HELLO, new BsonInt32(1)), new BsonDocumentCodec()) | ||
| .execute(new ClusterBinding(cluster, ReadPreference.nearest()), OPERATION_CONTEXT); | ||
| .execute(new ClusterBinding(cluster, ReadPreference.nearest()), getOperationContext()); | ||
| if (helloResult.containsKey("setName")) { | ||
| connectionString = new ConnectionString(DEFAULT_URI + "/?replicaSet=" | ||
| + helloResult.getString("setName").getValue()); | ||
|
|
@@ -382,11 +384,11 @@ public static ReadWriteBinding getBinding(final OperationContext operationContex | |
| } | ||
|
|
||
| public static ReadWriteBinding getBinding(final ReadPreference readPreference) { | ||
| return getBinding(getCluster(), readPreference, OPERATION_CONTEXT); | ||
| return getBinding(getCluster(), readPreference, getOperationContext()); | ||
| } | ||
|
|
||
| public static OperationContext createNewOperationContext(final TimeoutSettings timeoutSettings) { | ||
| return OPERATION_CONTEXT.withTimeoutContext(new TimeoutContext(timeoutSettings)); | ||
| return getOperationContext().withTimeoutContext(new TimeoutContext(timeoutSettings)); | ||
| } | ||
|
|
||
| private static ReadWriteBinding getBinding(final Cluster cluster, | ||
|
|
@@ -403,23 +405,23 @@ private static ReadWriteBinding getBinding(final Cluster cluster, | |
| } | ||
|
|
||
| public static SingleConnectionBinding getSingleConnectionBinding() { | ||
| return new SingleConnectionBinding(getCluster(), ReadPreference.primary(), OPERATION_CONTEXT); | ||
| return new SingleConnectionBinding(getCluster(), ReadPreference.primary(), getOperationContext()); | ||
| } | ||
|
|
||
| public static AsyncSingleConnectionBinding getAsyncSingleConnectionBinding() { | ||
| return getAsyncSingleConnectionBinding(getAsyncCluster()); | ||
| } | ||
|
|
||
| public static AsyncSingleConnectionBinding getAsyncSingleConnectionBinding(final Cluster cluster) { | ||
| return new AsyncSingleConnectionBinding(cluster, ReadPreference.primary(), OPERATION_CONTEXT); | ||
| return new AsyncSingleConnectionBinding(cluster, ReadPreference.primary(), getOperationContext()); | ||
| } | ||
|
|
||
| public static AsyncReadWriteBinding getAsyncBinding(final Cluster cluster) { | ||
| return new AsyncClusterBinding(cluster, ReadPreference.primary()); | ||
| } | ||
|
|
||
| public static AsyncReadWriteBinding getAsyncBinding() { | ||
| return getAsyncBinding(getAsyncCluster(), ReadPreference.primary(), OPERATION_CONTEXT); | ||
| return getAsyncBinding(getAsyncCluster(), ReadPreference.primary(), getOperationContext()); | ||
| } | ||
|
|
||
| public static AsyncReadWriteBinding getAsyncBinding(final TimeoutSettings timeoutSettings) { | ||
|
|
@@ -431,7 +433,7 @@ public static AsyncReadWriteBinding getAsyncBinding(final OperationContext opera | |
| } | ||
|
|
||
| public static AsyncReadWriteBinding getAsyncBinding(final ReadPreference readPreference) { | ||
| return getAsyncBinding(getAsyncCluster(), readPreference, OPERATION_CONTEXT); | ||
| return getAsyncBinding(getAsyncCluster(), readPreference, getOperationContext()); | ||
| } | ||
|
|
||
| public static AsyncReadWriteBinding getAsyncBinding( | ||
|
|
@@ -605,7 +607,7 @@ public static BsonDocument getServerParameters() { | |
| if (serverParameters == null) { | ||
| serverParameters = new CommandReadOperation<>("admin", | ||
| new BsonDocument("getParameter", new BsonString("*")), new BsonDocumentCodec()) | ||
| .execute(getBinding(), OPERATION_CONTEXT); | ||
| .execute(getBinding(), getOperationContext()); | ||
| } | ||
| return serverParameters; | ||
| } | ||
|
|
@@ -673,7 +675,7 @@ public static void configureFailPoint(final BsonDocument failPointDocument) { | |
| if (!isSharded()) { | ||
| try { | ||
| new CommandReadOperation<>("admin", failPointDocument, new BsonDocumentCodec()) | ||
| .execute(getBinding(), OPERATION_CONTEXT); | ||
| .execute(getBinding(), getOperationContext()); | ||
| } catch (MongoCommandException e) { | ||
| if (e.getErrorCode() == COMMAND_NOT_FOUND_ERROR_CODE) { | ||
| failsPointsSupported = false; | ||
|
|
@@ -689,7 +691,7 @@ public static void disableFailPoint(final String failPoint) { | |
| .append("mode", new BsonString("off")); | ||
| try { | ||
| new CommandReadOperation<>("admin", failPointDocument, new BsonDocumentCodec()) | ||
| .execute(getBinding(), OPERATION_CONTEXT); | ||
| .execute(getBinding(), getOperationContext()); | ||
| } catch (MongoCommandException e) { | ||
| // ignore | ||
| } | ||
|
|
@@ -703,7 +705,7 @@ public static <T> T executeSync(final WriteOperation<T> op) { | |
|
|
||
| @SuppressWarnings("overloads") | ||
| public static <T> T executeSync(final WriteOperation<T> op, final ReadWriteBinding binding) { | ||
| return op.execute(binding, applySessionContext(OPERATION_CONTEXT, binding.getReadPreference())); | ||
| return op.execute(binding, applySessionContext(getOperationContext(), binding.getReadPreference())); | ||
| } | ||
|
|
||
| @SuppressWarnings("overloads") | ||
|
|
@@ -713,7 +715,7 @@ public static <T> T executeSync(final ReadOperation<T, ?> op) { | |
|
|
||
| @SuppressWarnings("overloads") | ||
| public static <T> T executeSync(final ReadOperation<T, ?> op, final ReadWriteBinding binding) { | ||
| return op.execute(binding, OPERATION_CONTEXT); | ||
| return op.execute(binding, getOperationContext()); | ||
| } | ||
|
|
||
| @SuppressWarnings("overloads") | ||
|
|
@@ -729,7 +731,7 @@ public static <T> T executeAsync(final WriteOperation<T> op) throws Throwable { | |
| @SuppressWarnings("overloads") | ||
| public static <T> T executeAsync(final WriteOperation<T> op, final AsyncReadWriteBinding binding) throws Throwable { | ||
| FutureResultCallback<T> futureResultCallback = new FutureResultCallback<>(); | ||
| op.executeAsync(binding, applySessionContext(OPERATION_CONTEXT, binding.getReadPreference()), futureResultCallback); | ||
| op.executeAsync(binding, applySessionContext(getOperationContext(), binding.getReadPreference()), futureResultCallback); | ||
| return futureResultCallback.get(TIMEOUT, SECONDS); | ||
| } | ||
|
|
||
|
|
@@ -741,7 +743,7 @@ public static <T> T executeAsync(final ReadOperation<?, T> op) throws Throwable | |
| @SuppressWarnings("overloads") | ||
| public static <T> T executeAsync(final ReadOperation<?, T> op, final AsyncReadBinding binding) throws Throwable { | ||
| FutureResultCallback<T> futureResultCallback = new FutureResultCallback<>(); | ||
| op.executeAsync(binding, OPERATION_CONTEXT, futureResultCallback); | ||
| op.executeAsync(binding, getOperationContext(), futureResultCallback); | ||
| return futureResultCallback.get(TIMEOUT, SECONDS); | ||
| } | ||
|
|
||
|
|
@@ -811,19 +813,19 @@ public static <T> List<T> collectCursorResults(final BatchCursor<T> batchCursor) | |
|
|
||
| public static AsyncConnectionSource getWriteConnectionSource(final AsyncReadWriteBinding binding) throws Throwable { | ||
| FutureResultCallback<AsyncConnectionSource> futureResultCallback = new FutureResultCallback<>(); | ||
| binding.getWriteConnectionSource(OPERATION_CONTEXT, futureResultCallback); | ||
| binding.getWriteConnectionSource(getOperationContext(), futureResultCallback); | ||
| return futureResultCallback.get(TIMEOUT, SECONDS); | ||
| } | ||
|
|
||
| public static AsyncConnectionSource getReadConnectionSource(final AsyncReadWriteBinding binding) throws Throwable { | ||
| FutureResultCallback<AsyncConnectionSource> futureResultCallback = new FutureResultCallback<>(); | ||
| binding.getReadConnectionSource(OPERATION_CONTEXT, futureResultCallback); | ||
| binding.getReadConnectionSource(getOperationContext(), futureResultCallback); | ||
| return futureResultCallback.get(TIMEOUT, SECONDS); | ||
| } | ||
|
|
||
| public static AsyncConnection getConnection(final AsyncConnectionSource source) throws Throwable { | ||
| FutureResultCallback<AsyncConnection> futureResultCallback = new FutureResultCallback<>(); | ||
| source.getConnection(OPERATION_CONTEXT, futureResultCallback); | ||
| source.getConnection(getOperationContext(), futureResultCallback); | ||
| return futureResultCallback.get(TIMEOUT, SECONDS); | ||
| } | ||
|
|
||
|
|
@@ -867,6 +869,6 @@ private static OperationContext applySessionContext(final OperationContext opera | |
| } | ||
|
|
||
| public static OperationContext getOperationContext(final ReadPreference readPreference) { | ||
| return applySessionContext(OPERATION_CONTEXT, readPreference); | ||
| return applySessionContext(getOperationContext(), readPreference); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of
otherwiseis incorrect here.