From 19e5ae19d677a803434d43a73d1d8bdc70b4598c Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 10 Jun 2026 12:04:42 +0100 Subject: [PATCH 1/9] Close connection on responseTo mismatch to prevent pool reuse A responseTo mismatch indicates the stream is desynchronized, so the connection must be closed to prevent it returning to the pool. Command error responses (ok: 0) leave the stream intact and do not close the connection. Command monitoring now receives exactly one started and one terminal event in all mismatch scenarios, on both sync and async paths: the commandSuccessful flag is set only after the succeeded event was actually emitted, and the async path sends the failed event for any failure when no succeeded event went out, matching the sync path. Response buffers are released before the connection is closed: NettyStream.close() requires all buffers it handed out to have been released already. JAVA-6210 --- .../connection/InternalStreamConnection.java | 35 +- .../InternalStreamConnectionTest.java | 798 ++++++++++++++++++ 2 files changed, 826 insertions(+), 7 deletions(-) create mode 100644 driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 0cad654a73a..b1de746a844 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -571,8 +571,8 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm new BsonDocumentCodec()), description.getServerAddress(), operationContext.getTimeoutContext()); } - commandSuccessful = true; commandEventSender.sendSucceededEvent(responseBuffers); + commandSuccessful = true; T commandResult = getCommandResult(decoder, responseBuffers, responseTo, operationContext.getTimeoutContext()); hasMoreToCome = responseBuffers.getReplyHeader().hasMoreToCome(); @@ -584,6 +584,13 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm return commandResult; } catch (Exception e) { + if (e instanceof MongoInternalException) { + // a MongoInternalException (e.g. responseTo mismatch) means the stream is desynchronized. + // The connection must be closed before anything else can throw, to prevent reuse. Other + // failures (e.g. MongoCommandException) leave the stream intact, as the response was fully read, + // so the connection remains usable + close(); + } if (!commandSuccessful) { commandEventSender.sendFailedEvent(e); } @@ -730,28 +737,42 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d return; } assertNotNull(responseBuffers); - T commandResult; + T commandResult = null; + boolean commandSuccessful = false; + Throwable failure = null; try { updateSessionContext(operationContext.getSessionContext(), responseBuffers); boolean commandOk = isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer()))); responseBuffers.reset(); if (!commandOk) { - MongoException commandFailureException = getCommandFailureException( + throw getCommandFailureException( responseBuffers.getResponseDocument(messageId, new BsonDocumentCodec()), description.getServerAddress(), operationContext.getTimeoutContext()); - commandEventSender.sendFailedEvent(commandFailureException); - throw commandFailureException; } commandEventSender.sendSucceededEvent(responseBuffers); + commandSuccessful = true; commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); } catch (Throwable localThrowable) { - callback.onResult(null, localThrowable); - return; + failure = localThrowable; } finally { responseBuffers.close(); } + if (failure != null) { + if (failure instanceof MongoInternalException) { + // a MongoInternalException (e.g. responseTo mismatch) means the stream is desynchronized. + // The connection must be closed before anything else can throw, to prevent reuse. Other + // failures (e.g. MongoCommandException) leave the stream intact, as the response was fully read, + // so the connection remains usable + close(); + } + if (!commandSuccessful) { + commandEventSender.sendFailedEvent(failure); + } + callback.onResult(null, failure); + return; + } callback.onResult(commandResult, null); })); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java new file mode 100644 index 00000000000..0373ecc59e4 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -0,0 +1,798 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection; + +import com.mongodb.MongoCommandException; +import com.mongodb.MongoInternalException; +import com.mongodb.ServerAddress; +import com.mongodb.async.FutureResultCallback; +import com.mongodb.connection.AsyncCompletionHandler; +import com.mongodb.connection.ClusterId; +import com.mongodb.connection.ConnectionDescription; +import com.mongodb.connection.ConnectionId; +import com.mongodb.connection.ServerConnectionState; +import com.mongodb.connection.ServerDescription; +import com.mongodb.connection.ServerId; +import com.mongodb.connection.ServerType; +import com.mongodb.event.CommandEvent; +import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.validator.NoOpFieldNameValidator; +import com.mongodb.lang.Nullable; +import org.bson.BsonBinaryWriter; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonSerializationException; +import org.bson.BsonString; +import org.bson.ByteBuf; +import org.bson.ByteBufNIO; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.EncoderContext; +import org.bson.io.BasicOutputBuffer; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.ReadPreference.primary; +import static com.mongodb.connection.ClusterConnectionMode.SINGLE; +import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize; +import static com.mongodb.connection.ConnectionDescription.getDefaultMaxWriteBatchSize; +import static com.mongodb.connection.ServerDescription.getDefaultMaxDocumentSize; +import static com.mongodb.internal.connection.MessageHeader.MESSAGE_HEADER_LENGTH; +import static com.mongodb.internal.operation.ServerVersionHelper.LATEST_WIRE_VERSION; +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class InternalStreamConnectionTest { + + private static final ServerId SERVER_ID = new ServerId(new ClusterId(), new ServerAddress()); + + /** + * Verifies that a responseTo mismatch in the synchronous path closes the connection. + */ + @Test + @DisplayName("Sync: connection closed on responseTo mismatch") + void syncClosesConnectionOnResponseToMismatch() { + AtomicInteger readCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches (commandId + 1) + return createValidResponseHeader(getCommandMessageId() + 1); + } else { + return createResponseBody(); + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertTrue(thrown.getMessage().contains("does not match the requestId")); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Verifies that a responseTo mismatch in the asynchronous path closes the connection. + */ + @Test + @DisplayName("Async: connection closed on responseTo mismatch") + void asyncClosesConnectionOnResponseToMismatch() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches (commandId + 1) + handler.completed(createValidResponseHeader(getCommandMessageId() + 1)); + } else { + handler.completed(createResponseBody()); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertTrue(thrown.getMessage().contains("does not match the requestId")); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + } + + /** + * Verifies that a command error response (ok: 0) in the synchronous path does NOT close + * the connection: the response was fully read, so the stream remains synchronized and reusable. + */ + @Test + @DisplayName("Sync: connection NOT closed on command error response") + void syncDoesNotCloseConnectionOnCommandError() { + AtomicInteger readCallCount = new AtomicInteger(); + BsonDocument errorResponse = createCommandErrorDocument(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + return createValidResponseHeader(getCommandMessageId(), errorResponse); + } else { + return createResponseBody(errorResponse); + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + MongoCommandException thrown = assertThrows(MongoCommandException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertEquals(112, thrown.getErrorCode()); + assertFalse(connection.isClosed(), + "Connection should NOT be closed on a command error response"); + assertFalse(stream.wasClosed(), + "Underlying stream should NOT be closed on a command error response"); + assertFalse(stream.hadUnexpectedCall()); + + connection.close(); + } + + /** + * Verifies that a command error response (ok: 0) in the asynchronous path does NOT close + * the connection: the response was fully read, so the stream remains synchronized and reusable. + */ + @Test + @DisplayName("Async: connection NOT closed on command error response") + void asyncDoesNotCloseConnectionOnCommandError() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + BsonDocument errorResponse = createCommandErrorDocument(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId(), errorResponse)); + } else { + handler.completed(createResponseBody(errorResponse)); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + MongoCommandException thrown = assertThrows(MongoCommandException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertEquals(112, thrown.getErrorCode()); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertFalse(connection.isClosed(), + "Connection should NOT be closed on a command error response"); + assertFalse(stream.wasClosed(), + "Underlying stream should NOT be closed on a command error response"); + + connection.close(); + } + + /** + * Verifies that when a command error response (ok: 0) carries a mismatched responseTo in the + * synchronous path, command monitoring receives exactly one started and one failed event. + */ + @Test + @DisplayName("Sync: monitoring receives started and failed events on responseTo mismatch in command error response") + void syncSendsFailedEventOnResponseToMismatchInCommandError() { + AtomicInteger readCallCount = new AtomicInteger(); + BsonDocument errorResponse = createCommandErrorDocument(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches AND the body is a command error + return createValidResponseHeader(getCommandMessageId() + 1, errorResponse); + } else { + return createResponseBody(errorResponse); + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertStartedThenFailed(listener); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Verifies that when a command error response (ok: 0) carries a mismatched responseTo in the + * asynchronous path, command monitoring receives exactly one started and one failed event. + */ + @Test + @DisplayName("Async: monitoring receives started and failed events on responseTo mismatch in command error response") + void asyncSendsFailedEventOnResponseToMismatchInCommandError() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + BsonDocument errorResponse = createCommandErrorDocument(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches AND the body is a command error + handler.completed(createValidResponseHeader(getCommandMessageId() + 1, errorResponse)); + } else { + handler.completed(createResponseBody(errorResponse)); + } + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + assertStartedThenFailed(listener); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + } + + /** + * Verifies that when an ok: 1 response carries a mismatched responseTo in the synchronous + * path, command monitoring receives exactly one started and one failed event: a succeeded + * event must not be emitted for a response that belongs to a different request. + */ + @Test + @DisplayName("Sync: monitoring receives started and failed events on responseTo mismatch in ok response") + void syncSendsFailedEventOnResponseToMismatchInOkResponse() { + AtomicInteger readCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches; the body is a success (ok: 1) + return createValidResponseHeader(getCommandMessageId() + 1); + } else { + return createResponseBody(); + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertStartedThenFailed(listener); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Verifies that when an ok: 1 response carries a mismatched responseTo in the asynchronous + * path, command monitoring receives exactly one started and one failed event: a succeeded + * event must not be emitted for a response that belongs to a different request. + */ + @Test + @DisplayName("Async: monitoring receives started and failed events on responseTo mismatch in ok response") + void asyncSendsFailedEventOnResponseToMismatchInOkResponse() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches; the body is a success (ok: 1) + handler.completed(createValidResponseHeader(getCommandMessageId() + 1)); + } else { + handler.completed(createResponseBody()); + } + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + assertStartedThenFailed(listener); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + } + + /** + * Verifies that when the response document is corrupt (but fully read) in the synchronous + * path, command monitoring receives a failed event and the connection stays open: the wire + * framing is intact, so the stream is not desynchronized. + */ + @Test + @DisplayName("Sync: monitoring receives started and failed events when response document parsing fails") + void syncSendsFailedEventWhenResponseDocumentParsingFails() { + AtomicInteger readCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + return createValidResponseHeaderForBodyLength(getCommandMessageId(), CORRUPT_BODY_LENGTH); + } else { + return createCorruptResponseBody(); + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + assertThrows(BsonSerializationException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertStartedThenFailed(listener); + assertFalse(connection.isClosed(), + "Connection should NOT be closed when a fully-read response fails to parse"); + assertFalse(stream.hadUnexpectedCall()); + + connection.close(); + } + + /** + * Verifies that when the response document is corrupt (but fully read) in the asynchronous + * path, command monitoring receives a failed event and the connection stays open: the wire + * framing is intact, so the stream is not desynchronized. + */ + @Test + @DisplayName("Async: monitoring receives started and failed events when response document parsing fails") + void asyncSendsFailedEventWhenResponseDocumentParsingFails() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeaderForBodyLength(getCommandMessageId(), CORRUPT_BODY_LENGTH)); + } else { + handler.completed(createCorruptResponseBody()); + } + } + }; + + TestCommandListener listener = new TestCommandListener(); + InternalStreamConnection connection = createOpenConnection(stream, listener); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(BsonSerializationException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + assertStartedThenFailed(listener); + assertFalse(connection.isClosed(), + "Connection should NOT be closed when a fully-read response fails to parse"); + + connection.close(); + } + + /** + * Verifies that on a responseTo mismatch in the asynchronous path, the response body buffer + * is released BEFORE the stream is closed. NettyStream.close() requires all buffers it + * handed out to have been released already; releasing after close corrupts reference counts. + */ + @Test + @DisplayName("Async: response buffer released before connection close on responseTo mismatch") + void asyncReleasesResponseBufferBeforeCloseOnResponseToMismatch() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + List order = new ArrayList<>(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId() + 1)); + } else { + handler.completed(recordingRelease(createResponseBodyBuffer(), order, "bodyRelease")); + } + } + + @Override + public void close() { + order.add("close"); + super.close(); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertEquals(asList("bodyRelease", "close"), order, + "Response body buffer must be released before the stream is closed"); + } + + private InternalStreamConnection createOpenConnection(final TestStream stream) { + return createOpenConnection(stream, null); + } + + private InternalStreamConnection createOpenConnection(final TestStream stream, @Nullable final CommandListener commandListener) { + StreamFactory streamFactory = serverAddress -> stream; + ConnectionDescription connectionDescription = new ConnectionDescription( + new ConnectionId(SERVER_ID, 1, 1L), LATEST_WIRE_VERSION, + ServerType.STANDALONE, getDefaultMaxWriteBatchSize(), + getDefaultMaxDocumentSize(), getDefaultMaxMessageSize(), Collections.emptyList()); + ServerDescription serverDescription = ServerDescription.builder() + .ok(true) + .state(ServerConnectionState.CONNECTED) + .type(ServerType.STANDALONE) + .address(new ServerAddress()) + .build(); + InternalConnectionInitializationDescription initDesc = + new InternalConnectionInitializationDescription(connectionDescription, serverDescription); + + InternalConnectionInitializer initializer = new InternalConnectionInitializer() { + @Override + public InternalConnectionInitializationDescription startHandshake(final InternalConnection connection, + final OperationContext operationContext) { + return initDesc; + } + + @Override + public InternalConnectionInitializationDescription finishHandshake(final InternalConnection connection, + final InternalConnectionInitializationDescription description, final OperationContext operationContext) { + return initDesc; + } + + @Override + public void startHandshakeAsync(final InternalConnection connection, final OperationContext operationContext, + final SingleResultCallback callback) { + callback.onResult(initDesc, null); + } + + @Override + public void finishHandshakeAsync(final InternalConnection connection, + final InternalConnectionInitializationDescription description, final OperationContext operationContext, + final SingleResultCallback callback) { + callback.onResult(initDesc, null); + } + }; + + InternalStreamConnection connection = new InternalStreamConnection( + SINGLE, SERVER_ID, new TestConnectionGenerationSupplier(), + streamFactory, Collections.emptyList(), commandListener, initializer); + connection.open(createOperationContext()); + return connection; + } + + private CommandMessage createPingCommand() { + return new CommandMessage( + "admin", + new BsonDocument("ping", new BsonInt32(1)), + NoOpFieldNameValidator.INSTANCE, + primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + SINGLE, + null); + } + + private OperationContext createOperationContext() { + return OperationContext.simpleOperationContext(new TimeoutContext(TimeoutSettings.DEFAULT)); + } + + /** + * Creates a generic command error response document (ok: 0). + */ + private static BsonDocument createCommandErrorDocument() { + return new BsonDocument("ok", new BsonInt32(0)) + .append("errmsg", new BsonString("WriteConflict error")) + .append("code", new BsonInt32(112)) + .append("codeName", new BsonString("WriteConflict")); + } + + /** + * Creates a valid OP_REPLY response body: reply header (20 bytes) + BSON {ok: 1}. + */ + private static ByteBuf createResponseBody() { + return createResponseBody(new BsonDocument("ok", new BsonInt32(1))); + } + + /** + * Creates a valid OP_REPLY response body: reply header (20 bytes) + the given BSON document. + */ + private static ByteBuf createResponseBody(final BsonDocument document) { + return new ByteBufNIO(createResponseBodyBuffer(document)); + } + + /** + * Creates the raw buffer for a valid OP_REPLY response body: reply header (20 bytes) + {ok: 1}. + */ + private static ByteBuffer createResponseBodyBuffer() { + return createResponseBodyBuffer(new BsonDocument("ok", new BsonInt32(1))); + } + + /** + * Creates the raw buffer for a valid OP_REPLY response body: reply header (20 bytes) + the given BSON document. + */ + private static ByteBuffer createResponseBodyBuffer(final BsonDocument document) { + byte[] bson = toBson(document); + ByteBuffer buffer = ByteBuffer.allocate(20 + bson.length); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(0); // responseFlags + buffer.putLong(0); // cursorId + buffer.putInt(0); // startingFrom + buffer.putInt(1); // numberReturned + buffer.put(bson); + ((Buffer) buffer).flip(); + return buffer; + } + + private static final int CORRUPT_BODY_LENGTH = 28; // reply header (20) + truncated BSON (8) + + /** + * Creates an OP_REPLY response body with a valid reply header but a corrupt BSON document: + * the document declares a length far larger than the bytes that follow. + */ + private static ByteBuf createCorruptResponseBody() { + ByteBuffer buffer = ByteBuffer.allocate(CORRUPT_BODY_LENGTH); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(0); // responseFlags + buffer.putLong(0); // cursorId + buffer.putInt(0); // startingFrom + buffer.putInt(1); // numberReturned + buffer.putInt(1000); // BSON document length (invalid: only 4 more bytes follow) + buffer.putInt(0); + ((Buffer) buffer).flip(); + return new ByteBufNIO(buffer); + } + + /** + * Creates a valid 16-byte wire protocol header (OP_REPLY) for the given responseTo. + */ + private static ByteBuf createValidResponseHeader(final int responseTo) { + return createValidResponseHeader(responseTo, new BsonDocument("ok", new BsonInt32(1))); + } + + /** + * Creates a valid 16-byte wire protocol header (OP_REPLY) for the given responseTo, + * sized for a body containing the given BSON document. + */ + private static ByteBuf createValidResponseHeader(final int responseTo, final BsonDocument document) { + return createValidResponseHeaderForBodyLength(responseTo, 20 + toBson(document).length); + } + + /** + * Creates a valid 16-byte wire protocol header (OP_REPLY) for the given responseTo and body length. + */ + private static ByteBuf createValidResponseHeaderForBodyLength(final int responseTo, final int bodyLength) { + ByteBuffer buffer = ByteBuffer.allocate(MESSAGE_HEADER_LENGTH); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(MESSAGE_HEADER_LENGTH + bodyLength); // messageLength + buffer.putInt(1); // requestId (server's response message ID) + buffer.putInt(responseTo); // responseTo (echoes client's requestId) + buffer.putInt(1); // opCode = OP_REPLY + ((Buffer) buffer).flip(); + + return new ByteBufNIO(buffer); + } + + /** + * Creates a ByteBuf over the given buffer whose release is recorded in the given order list. + */ + private static ByteBuf recordingRelease(final ByteBuffer buffer, final List order, final String label) { + return new ByteBufNIO(buffer) { + @Override + public void release() { + order.add(label); + super.release(); + } + }; + } + + private static byte[] toBson(final BsonDocument document) { + BasicOutputBuffer outputBuffer = new BasicOutputBuffer(); + new BsonDocumentCodec().encode( + new BsonBinaryWriter(outputBuffer), document, + EncoderContext.builder().build()); + return outputBuffer.toByteArray(); + } + + /** + * Asserts that command monitoring received exactly one started and one failed event. + */ + private static void assertStartedThenFailed(final TestCommandListener listener) { + List events = listener.getEvents(); + assertEquals(2, events.size(), + "Monitoring must receive exactly one started and one failed event but received: " + events); + assertInstanceOf(CommandStartedEvent.class, events.get(0)); + assertInstanceOf(CommandFailedEvent.class, events.get(1)); + } + + /** + * A test Stream implementation that handles writes (no-op) and delegates reads to subclasses. + * Unexpected calls must be recorded via {@link #recordUnexpectedCall()} rather than JUnit's + * {@code fail()}: the production code under test catches {@code Throwable}, so an assertion + * error thrown inside a stub is swallowed and translated into the very exception a test expects. + */ + private abstract static class TestStream implements Stream { + private int commandMessageId; + private boolean closed; + private final AtomicBoolean unexpectedCall = new AtomicBoolean(); + + void setCommandMessageId(final int id) { + this.commandMessageId = id; + } + + int getCommandMessageId() { + return commandMessageId; + } + + boolean wasClosed() { + return closed; + } + + void recordUnexpectedCall() { + unexpectedCall.set(true); + } + + boolean hadUnexpectedCall() { + return unexpectedCall.get(); + } + + @Override + public void open(final OperationContext operationContext) { + } + + @Override + public void openAsync(final OperationContext operationContext, + final AsyncCompletionHandler handler) { + handler.completed(null); + } + + @Override + public void write(final List buffers, final OperationContext operationContext) { + } + + @Override + public void writeAsync(final List buffers, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + handler.completed(null); + } + + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBuffer(final int size) { + return new ByteBufNIO(ByteBuffer.allocate(size)); + } + + @Override + public ServerAddress getAddress() { + return new ServerAddress(); + } + + @Override + public void close() { + closed = true; + } + + @Override + public boolean isClosed() { + return closed; + } + } +} From 8695e9769d9c4fbd64af461d462a9f3e0197d0da Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 11 Jun 2026 12:18:02 +0100 Subject: [PATCH 2/9] Harden connection close paths to prevent stream desynchronization Ensure all error paths close the connection to prevent pooled connections from retaining unread response data: - sendMessage (sync): catch Throwable instead of Exception - sendMessageAsync: catch Throwable instead of Exception - readAsync: catch Throwable instead of Exception - MessageHeaderCallback: close on header parsing failure - MessageCallback: close on body read failure and body parsing failure - MessageCallback: deliver the parsed response outside the try block so a throwing downstream callback cannot close a healthy connection Header and body buffers are released before the connection is closed: NettyStream.close() requires all buffers it handed out to have been released already. Errors are translated to MongoInternalException, consistent with the pre-existing receiveResponseBuffers behavior. JAVA-6210 --- .../connection/InternalStreamConnection.java | 35 +- .../InternalStreamConnectionTest.java | 310 ++++++++++++++++++ 2 files changed, 336 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index b1de746a844..41d0c1cc017 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -804,7 +804,7 @@ public void sendMessage(final List byteBuffers, final int lastRequestId } try { stream.write(byteBuffers, operationContext); - } catch (Exception e) { + } catch (Throwable e) { close(); throwTranslatedWriteException(e, operationContext); } @@ -824,7 +824,7 @@ public void sendMessageAsync( c.complete(c); }).thenRunTryCatchAsyncBlocks(c -> { stream.writeAsync(byteBuffers, operationContext, c.asHandler()); - }, Exception.class, (e, c) -> { + }, Throwable.class, (e, c) -> { try { close(); throwTranslatedWriteException(e, operationContext); @@ -883,7 +883,7 @@ public void failed(final Throwable t) { callback.onResult(null, translateReadException(t, operationContext)); } }); - } catch (Exception e) { + } catch (Throwable e) { close(); callback.onResult(null, translateReadException(e, operationContext)); } @@ -1023,21 +1023,29 @@ private class MessageHeaderCallback implements SingleResultCallback { @Override public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t) { if (t != null) { + close(); callback.onResult(null, t); return; } + MessageHeader messageHeader = null; + Throwable headerParsingFailure = null; try { assertNotNull(result); - MessageHeader messageHeader = new MessageHeader(result, description.getMaxMessageSize()); - readAsync(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, operationContext, - new MessageCallback(messageHeader)); + messageHeader = new MessageHeader(result, description.getMaxMessageSize()); } catch (Throwable localThrowable) { - callback.onResult(null, localThrowable); + headerParsingFailure = localThrowable; } finally { if (result != null) { result.release(); } } + if (headerParsingFailure != null) { + close(); + callback.onResult(null, headerParsingFailure); + return; + } + readAsync(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, operationContext, + new MessageCallback(messageHeader)); } private class MessageCallback implements SingleResultCallback { @@ -1050,11 +1058,14 @@ private class MessageCallback implements SingleResultCallback { @Override public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t) { if (t != null) { + close(); callback.onResult(null, t); return; } boolean releaseResult = true; assertNotNull(result); + ResponseBuffers responseBuffers = null; + Throwable bodyParsingFailure = null; try { ReplyHeader replyHeader; ByteBuf responseBuffer; @@ -1077,14 +1088,20 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t responseBuffer = result; releaseResult = false; } - callback.onResult(new ResponseBuffers(replyHeader, responseBuffer), null); + responseBuffers = new ResponseBuffers(replyHeader, responseBuffer); } catch (Throwable localThrowable) { - callback.onResult(null, localThrowable); + bodyParsingFailure = localThrowable; } finally { if (releaseResult) { result.release(); } } + if (bodyParsingFailure != null) { + close(); + callback.onResult(null, bodyParsingFailure); + return; + } + callback.onResult(responseBuffers, null); } } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 0373ecc59e4..b778aede203 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -18,6 +18,7 @@ import com.mongodb.MongoCommandException; import com.mongodb.MongoInternalException; +import com.mongodb.MongoSocketReadException; import com.mongodb.ServerAddress; import com.mongodb.async.FutureResultCallback; import com.mongodb.connection.AsyncCompletionHandler; @@ -50,6 +51,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -78,6 +80,314 @@ class InternalStreamConnectionTest { private static final ServerId SERVER_ID = new ServerId(new ClusterId(), new ServerAddress()); + /** + * Verifies that when {@code stream.readAsync()} throws an {@link OutOfMemoryError} + * during body buffer allocation (after header read succeeds), the connection is closed. + */ + @Test + @DisplayName("Async: connection closed when readAsync throws Error during body read") + void asyncClosesConnectionWhenReadAsyncThrowsError() { + // Track whether readAsync is called for the body (second call) + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + // A stream that succeeds on header read but throws OOM on body read + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + // First read: 16-byte header - complete successfully with a valid header + handler.completed(createValidResponseHeader(getCommandMessageId())); + } else { + // Second read: body - throw OutOfMemoryError (simulates heap exhaustion + // during buffer allocation in AsynchronousChannelStream.readAsync) + throw new OutOfMemoryError("Java heap space"); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + // Send a command asynchronously - the write will succeed, then readAsync for the body will throw OOM + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + // The callback should receive an error + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed after Error in readAsync to prevent pool reuse with stale data"); + assertTrue(stream.wasClosed(), + "Underlying stream should be closed to release socket resources"); + } + + /** + * Verifies that when {@code stream.readAsync()} throws a {@link RuntimeException}, + * the connection is closed, just as for an {@link Error}. + */ + @Test + @DisplayName("Async: connection closed when readAsync throws Exception during body read") + void asyncClosesConnectionWhenReadAsyncThrowsException() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId())); + } else { + // a RuntimeException must close the connection, just like an Error + throw new RuntimeException("Simulated allocation failure"); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed when readAsync throws a regular Exception"); + } + + /** + * Verifies that when header parsing throws (e.g., invalid message size), the connection + * is closed. The header bytes have already been consumed, so the body remains unread. + */ + @Test + @DisplayName("Async: connection closed when header parsing throws in MessageHeaderCallback") + void asyncClosesConnectionWhenHeaderParsingThrows() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + List order = new ArrayList<>(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + // Return a header whose messageLength exceeds the maximum message size to + // trigger a MongoInternalException in the MessageHeader constructor + ByteBuffer buffer = ByteBuffer.allocate(MESSAGE_HEADER_LENGTH); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(getDefaultMaxMessageSize() + 1); // invalid messageLength + buffer.putInt(1); // requestId + buffer.putInt(getCommandMessageId()); // responseTo + buffer.putInt(1); // opCode = OP_REPLY + ((Buffer) buffer).flip(); + handler.completed(recordingRelease(buffer, order, "headerRelease")); + } else { + recordUnexpectedCall(); + throw new UnsupportedOperationException("no second read expected after header parsing failure"); + } + } + + @Override + public void close() { + order.add("close"); + super.close(); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed when header parsing fails to prevent reuse with unread body data"); + assertEquals(asList("headerRelease", "close"), order, + "Header buffer must be released before the stream is closed"); + assertFalse(stream.hadUnexpectedCall(), + "No second read should be attempted after header parsing failure"); + } + + /** + * Verifies that when {@code stream.writeAsync()} throws an {@link Error}, + * the connection is closed. + */ + @Test + @DisplayName("Async: connection closed when writeAsync throws Error") + void asyncClosesConnectionWhenWriteAsyncThrowsError() { + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("no read expected after write failure"); + } + + @Override + public void writeAsync(final List buffers, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + throw new OutOfMemoryError("Java heap space"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed after Error in writeAsync"); + assertFalse(stream.hadUnexpectedCall(), + "No read should be attempted after write failure"); + } + + /** + * Verifies that when the body read fails (handler receives an error), the connection is closed. + * This exercises the {@code t != null} path in {@code MessageCallback.onResult}. + */ + @Test + @DisplayName("Async: connection closed when body read fails via callback error") + void asyncClosesConnectionWhenBodyReadFailsViaCallback() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId())); + } else { + handler.failed(new IOException("Connection reset by peer")); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoSocketReadException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed when body read fails via callback error"); + } + + /** + * Verifies that when body parsing throws (e.g., corrupt body data), the connection is closed. + * This exercises the {@code catch (Throwable)} path in {@code MessageCallback.onResult}. + */ + @Test + @DisplayName("Async: connection closed when body parsing throws in MessageCallback") + void asyncClosesConnectionWhenBodyParsingThrows() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + List order = new ArrayList<>(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId())); + } else { + // Return a reply header with an invalid numberReturned (2) to trigger + // a MongoInternalException during ReplyHeader construction + ByteBuffer buffer = ByteBuffer.allocate(20); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(0); // responseFlags + buffer.putLong(0); // cursorId + buffer.putInt(0); // startingFrom + buffer.putInt(2); // numberReturned (invalid: must be 1) + ((Buffer) buffer).flip(); + handler.completed(recordingRelease(buffer, order, "bodyRelease")); + } + } + + @Override + public void close() { + order.add("close"); + super.close(); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + + FutureResultCallback callback = new FutureResultCallback<>(); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertTrue(connection.isClosed(), + "Connection should be closed when body parsing fails in MessageCallback"); + assertEquals(asList("bodyRelease", "close"), order, + "Body buffer must be released before the stream is closed"); + } + + /** + * Verifies that when {@code stream.write()} throws an {@link Error} in the synchronous path, + * the connection is closed (mirrors the asynchronous writeAsync behavior). + */ + @Test + @DisplayName("Sync: connection closed when write throws Error") + void syncClosesConnectionWhenWriteThrowsError() { + TestStream stream = new TestStream() { + @Override + public void write(final List buffers, final OperationContext operationContext) { + throw new OutOfMemoryError("Java heap space"); + } + + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("no read expected after write failure"); + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertInstanceOf(OutOfMemoryError.class, thrown.getCause()); + assertTrue(connection.isClosed(), + "Connection should be closed after Error in write to prevent pool reuse with a half-written message"); + assertTrue(stream.wasClosed(), + "Underlying stream should be closed to release socket resources"); + assertFalse(stream.hadUnexpectedCall(), + "No read should be attempted after write failure"); + } + /** * Verifies that a responseTo mismatch in the synchronous path closes the connection. */ From 1ddd06b98cbd9f3cd1e3f0ee53df2daa36df82f5 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 10 Jun 2026 12:04:42 +0100 Subject: [PATCH 3/9] Add JUnit test conventions to testing guide Capture conventions surfaced during review: import types rather than inline fully-qualified names, prefer atomics over array wrappers for state captured by lambdas, assert the most specific exception type, and pair side-effect tests with their negative counterparts. JAVA-6210 --- .agents/references/testing-guide.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.agents/references/testing-guide.md b/.agents/references/testing-guide.md index 4140bfc6e33..1679150291c 100644 --- a/.agents/references/testing-guide.md +++ b/.agents/references/testing-guide.md @@ -29,6 +29,15 @@ description: Testing frameworks, conventions, and commands for the MongoDB Java - Descriptive method names: `shouldReturnEmptyListWhenNoDocumentsMatch()` not `test1()` - Use `@DisplayName` for human-readable names - Clean up test data in `@AfterEach` / `cleanup()` to prevent pollution +- Import types at the top of the file — do not use inline fully-qualified names + (`com.mongodb.connection.AsyncCompletionHandler`, `java.util.List`) in signatures or bodies +- Use `AtomicInteger` / `AtomicBoolean` for mutable state captured by lambdas or anonymous + classes — not single-element array wrappers (`int[]`, `boolean[]`) +- Assert the most specific exception type the code guarantees + (`assertInstanceOf(MongoCommandException.class, e)`, not `Exception.class`) +- When testing that a side effect happens on condition X (e.g. connection closed on stream + desync), also test that it does NOT happen on the neighbouring condition Y (e.g. connection + stays open on a command error) — one-sided tests let behavioral regressions through ## Running Tests From 78a73e4dcefbf8a2a36e5cdb57783db47add313d Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 11 Jun 2026 14:16:25 +0100 Subject: [PATCH 4/9] Release uncompressed buffer when reply header parsing fails When reading an OP_COMPRESSED response, the uncompressed buffer allocated via getBuffer(...) leaked if uncompress, flip, or ReplyHeader parsing threw before ResponseBuffers took ownership, on both the sync (receiveResponseBuffers) and async (MessageCallback) paths. Release the buffer explicitly on those failure paths. JAVA-6210 --- .../connection/InternalStreamConnection.java | 24 ++- .../InternalStreamConnectionTest.java | 167 +++++++++++++++++- 2 files changed, 183 insertions(+), 8 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 41d0c1cc017..185b704239d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -977,10 +977,15 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC Compressor compressor = getCompressor(compressedHeader); ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize()); - compressor.uncompress(messageBuffer, buffer); + try { + compressor.uncompress(messageBuffer, buffer); - buffer.flip(); - return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer); + buffer.flip(); + return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer); + } catch (Throwable localThrowable) { + buffer.release(); + throw localThrowable; + } } else { ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer); releaseMessageBuffer = false; @@ -1074,10 +1079,15 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t CompressedHeader compressedHeader = new CompressedHeader(result, messageHeader); Compressor compressor = getCompressor(compressedHeader); ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize()); - compressor.uncompress(result, buffer); - - buffer.flip(); - replyHeader = new ReplyHeader(buffer, compressedHeader); + try { + compressor.uncompress(result, buffer); + + buffer.flip(); + replyHeader = new ReplyHeader(buffer, compressedHeader); + } catch (Throwable localThrowable) { + buffer.release(); + throw localThrowable; + } responseBuffer = buffer; } finally { releaseResult = false; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index b778aede203..2d083d375d8 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoCommandException; +import com.mongodb.MongoCompressor; import com.mongodb.MongoInternalException; import com.mongodb.MongoSocketReadException; import com.mongodb.ServerAddress; @@ -51,6 +52,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; @@ -61,6 +63,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.DeflaterOutputStream; import static com.mongodb.ReadPreference.primary; import static com.mongodb.connection.ClusterConnectionMode.SINGLE; @@ -833,11 +836,119 @@ public void close() { "Response body buffer must be released before the stream is closed"); } + /** + * Verifies that in the synchronous path, the uncompressed buffer allocated for an OP_COMPRESSED + * response is released when reply header parsing fails after decompression. + */ + @Test + @DisplayName("Sync: uncompressed buffer released when reply header parsing fails for compressed response") + void syncReleasesUncompressedBufferWhenReplyHeaderParsingFails() { + AtomicInteger readCallCount = new AtomicInteger(); + List allocatedBuffers = new ArrayList<>(); + ByteBuf compressedBody = createCompressedResponseBodyWithInvalidReplyHeader(); + int compressedBodyLength = compressedBody.remaining(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf getBuffer(final int size) { + ByteBuf buffer = super.getBuffer(size); + allocatedBuffers.add(buffer); + return buffer; + } + + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + return createCompressedResponseHeader(getCommandMessageId(), compressedBodyLength); + } else { + return compressedBody; + } + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in the sync path"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream, null, + Collections.singletonList(MongoCompressor.createZlibCompressor())); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertTrue(thrown.getMessage().contains("number of returned documents")); + assertTrue(connection.isClosed(), + "Connection should be closed after reply header parsing failure"); + for (ByteBuf buffer : allocatedBuffers) { + assertEquals(0, buffer.getReferenceCount(), + "Every buffer handed out by the stream must be released after the failure"); + } + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Verifies that in the asynchronous path, the uncompressed buffer allocated for an OP_COMPRESSED + * response is released when reply header parsing fails after decompression. + */ + @Test + @DisplayName("Async: uncompressed buffer released when reply header parsing fails for compressed response") + void asyncReleasesUncompressedBufferWhenReplyHeaderParsingFails() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + List allocatedBuffers = new ArrayList<>(); + ByteBuf compressedBody = createCompressedResponseBodyWithInvalidReplyHeader(); + int compressedBodyLength = compressedBody.remaining(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf getBuffer(final int size) { + ByteBuf buffer = super.getBuffer(size); + allocatedBuffers.add(buffer); + return buffer; + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createCompressedResponseHeader(getCommandMessageId(), compressedBodyLength)); + } else { + handler.completed(compressedBody); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream, null, + Collections.singletonList(MongoCompressor.createZlibCompressor())); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertTrue(thrown.getMessage().contains("number of returned documents")); + assertTrue(connection.isClosed(), + "Connection should be closed after reply header parsing failure"); + for (ByteBuf buffer : allocatedBuffers) { + assertEquals(0, buffer.getReferenceCount(), + "Every buffer handed out by the stream must be released after the failure"); + } + } + private InternalStreamConnection createOpenConnection(final TestStream stream) { return createOpenConnection(stream, null); } private InternalStreamConnection createOpenConnection(final TestStream stream, @Nullable final CommandListener commandListener) { + return createOpenConnection(stream, commandListener, Collections.emptyList()); + } + + private InternalStreamConnection createOpenConnection(final TestStream stream, @Nullable final CommandListener commandListener, + final List compressorList) { StreamFactory streamFactory = serverAddress -> stream; ConnectionDescription connectionDescription = new ConnectionDescription( new ConnectionId(SERVER_ID, 1, 1L), LATEST_WIRE_VERSION, @@ -881,7 +992,7 @@ public void finishHandshakeAsync(final InternalConnection connection, InternalStreamConnection connection = new InternalStreamConnection( SINGLE, SERVER_ID, new TestConnectionGenerationSupplier(), - streamFactory, Collections.emptyList(), commandListener, initializer); + streamFactory, compressorList, commandListener, initializer); connection.open(createOperationContext()); return connection; } @@ -997,6 +1108,60 @@ private static ByteBuf createValidResponseHeaderForBodyLength(final int response return new ByteBufNIO(buffer); } + /** + * Creates a valid 16-byte wire protocol header (OP_COMPRESSED) for the given responseTo and + * compressed body length. + */ + private static ByteBuf createCompressedResponseHeader(final int responseTo, final int compressedBodyLength) { + ByteBuffer buffer = ByteBuffer.allocate(MESSAGE_HEADER_LENGTH); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(MESSAGE_HEADER_LENGTH + compressedBodyLength); // messageLength + buffer.putInt(1); // requestId (server's response message ID) + buffer.putInt(responseTo); // responseTo (echoes client's requestId) + buffer.putInt(OpCode.OP_COMPRESSED.getValue()); + ((Buffer) buffer).flip(); + + return new ByteBufNIO(buffer); + } + + private static final int UNCOMPRESSED_REPLY_HEADER_LENGTH = 20; + + /** + * Creates an OP_COMPRESSED response body whose zlib-compressed payload uncompresses to an + * invalid OP_REPLY header: numberReturned is 2 where 1 is required, so reply header parsing + * fails after the uncompressed buffer was allocated. + */ + private static ByteBuf createCompressedResponseBodyWithInvalidReplyHeader() { + ByteBuffer uncompressed = ByteBuffer.allocate(UNCOMPRESSED_REPLY_HEADER_LENGTH); + uncompressed.order(ByteOrder.LITTLE_ENDIAN); + uncompressed.putInt(0); // responseFlags + uncompressed.putLong(0); // cursorId + uncompressed.putInt(0); // startingFrom + uncompressed.putInt(2); // numberReturned (invalid: must be 1) + + byte[] compressed = zlibCompress(uncompressed.array()); + + ByteBuffer buffer = ByteBuffer.allocate(CompressedHeader.COMPRESSED_HEADER_LENGTH + compressed.length); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(1); // originalOpcode = OP_REPLY + buffer.putInt(UNCOMPRESSED_REPLY_HEADER_LENGTH); // uncompressedSize + buffer.put((byte) 2); // compressorId = zlib + buffer.put(compressed); + ((Buffer) buffer).flip(); + + return new ByteBufNIO(buffer); + } + + private static byte[] zlibCompress(final byte[] bytes) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DeflaterOutputStream deflater = new DeflaterOutputStream(out)) { + deflater.write(bytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.toByteArray(); + } + /** * Creates a ByteBuf over the given buffer whose release is recorded in the given order list. */ From 8619393a2ce49aa760845e41ccb37c42a21117b7 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 17 Jun 2026 11:29:07 +0100 Subject: [PATCH 5/9] Address review feedback for connection desync hardening - Replace the instanceof MongoInternalException close trigger with a safe-by-default check: close the connection unless the failure proves the full response was read (extracted as connectionIsReusable()). - Widen the sync command-response catch to Throwable for symmetry with the async path, so a decode-time Error still emits a failed event. - Release the uncompressed buffer in the existing outer catch instead of a nested try/catch (sync and async paths). - Remove the redundant close() calls in the MessageHeaderCallback and MessageCallback t != null branches; readAsync already closes first. - Tests: parametrize the readAsync body-read failure cases and assert the original cause is preserved, add a sync buffer-release-before-close test, tighten the responseTo-mismatch buffer-release assertions, and lift the duplicated readAsync stub into the TestStream base. --- .../connection/InternalStreamConnection.java | 103 +++++---- .../InternalStreamConnectionTest.java | 213 ++++++++---------- 2 files changed, 157 insertions(+), 159 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 185b704239d..caf74869026 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -56,6 +56,7 @@ import com.mongodb.observability.micrometer.MongodbObservationContext; import org.bson.BsonBinaryReader; import org.bson.BsonDocument; +import org.bson.BsonSerializationException; import org.bson.ByteBuf; import org.bson.codecs.BsonDocumentCodec; import org.bson.codecs.Decoder; @@ -583,27 +584,23 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm } return commandResult; - } catch (Exception e) { - if (e instanceof MongoInternalException) { - // a MongoInternalException (e.g. responseTo mismatch) means the stream is desynchronized. - // The connection must be closed before anything else can throw, to prevent reuse. Other - // failures (e.g. MongoCommandException) leave the stream intact, as the response was fully read, - // so the connection remains usable + } catch (Throwable t) { + if (!connectionIsReusable(t)) { close(); } if (!commandSuccessful) { - commandEventSender.sendFailedEvent(e); + commandEventSender.sendFailedEvent(t); } if (tracingSpan != null) { - if (e instanceof MongoCommandException) { + if (t instanceof MongoCommandException) { MongodbObservationContext ctx = tracingSpan.getMongodbObservationContext(); if (ctx != null) { - ctx.setResponseStatusCode(String.valueOf(((MongoCommandException) e).getErrorCode())); + ctx.setResponseStatusCode(String.valueOf(((MongoCommandException) t).getErrorCode())); } } - tracingSpan.error(e); + tracingSpan.error(t); } - throw e; + throw t; } finally { if (tracingSpan != null) { tracingSpan.closeScope(); @@ -612,6 +609,26 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm } } + /** + * Determines whether a failure raised while processing a command response leaves the connection + * reusable. These exception types are only raised after the full framed response has been read off + * the wire, so the stream remains synchronized and the connection can be returned to the pool: + *
    + *
  • {@link MongoCommandException} — an {@code ok: 0} error response (and subclasses)
  • + *
  • {@link MongoWriteConcernWithResponseException} — a write concern error carrying the response
  • + *
  • {@link MongoOperationTimeoutException} — a write concern timeout
  • + *
  • {@link BsonSerializationException} — a corrupt BSON body whose exact byte count was still consumed
  • + *
+ * Any other failure (e.g. a responseTo mismatch or an unexpected error) may have left the stream + * desynchronized, so the connection must be closed to prevent pool reuse. + */ + private static boolean connectionIsReusable(final Throwable failure) { + return failure instanceof MongoCommandException + || failure instanceof MongoWriteConcernWithResponseException + || failure instanceof MongoOperationTimeoutException + || failure instanceof BsonSerializationException; + } + private void sendAndReceiveAsyncInternal(final CommandMessage message, final Decoder decoder, final OperationContext operationContext, final SingleResultCallback callback) { if (isClosed()) { @@ -760,11 +777,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d responseBuffers.close(); } if (failure != null) { - if (failure instanceof MongoInternalException) { - // a MongoInternalException (e.g. responseTo mismatch) means the stream is desynchronized. - // The connection must be closed before anything else can throw, to prevent reuse. Other - // failures (e.g. MongoCommandException) leave the stream intact, as the response was fully read, - // so the connection remains usable + if (!connectionIsReusable(failure)) { close(); } if (!commandSuccessful) { @@ -804,9 +817,9 @@ public void sendMessage(final List byteBuffers, final int lastRequestId } try { stream.write(byteBuffers, operationContext); - } catch (Throwable e) { + } catch (Throwable t) { close(); - throwTranslatedWriteException(e, operationContext); + throwTranslatedWriteException(t, operationContext); } } @@ -824,10 +837,10 @@ public void sendMessageAsync( c.complete(c); }).thenRunTryCatchAsyncBlocks(c -> { stream.writeAsync(byteBuffers, operationContext, c.asHandler()); - }, Throwable.class, (e, c) -> { + }, Throwable.class, (t, c) -> { try { close(); - throwTranslatedWriteException(e, operationContext); + throwTranslatedWriteException(t, operationContext); } catch (Throwable translatedException) { c.completeExceptionally(translatedException); } @@ -883,9 +896,9 @@ public void failed(final Throwable t) { callback.onResult(null, translateReadException(t, operationContext)); } }); - } catch (Throwable e) { + } catch (Throwable t) { close(); - callback.onResult(null, translateReadException(e, operationContext)); + callback.onResult(null, translateReadException(t, operationContext)); } } @@ -959,6 +972,9 @@ private MongoSocketReadTimeoutException createReadTimeoutException(final Socket } private ResponseBuffers receiveResponseBuffers(final OperationContext operationContext) { + // The uncompressed buffer is allocated by us (not handed to ResponseBuffers until the last + // statement of the compressed branch), so it must be released if anything fails beforehand. + ByteBuf uncompressedBuffer = null; try { ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, operationContext); MessageHeader messageHeader; @@ -976,16 +992,11 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC Compressor compressor = getCompressor(compressedHeader); - ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize()); - try { - compressor.uncompress(messageBuffer, buffer); + uncompressedBuffer = getBuffer(compressedHeader.getUncompressedSize()); + compressor.uncompress(messageBuffer, uncompressedBuffer); - buffer.flip(); - return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer); - } catch (Throwable localThrowable) { - buffer.release(); - throw localThrowable; - } + uncompressedBuffer.flip(); + return new ResponseBuffers(new ReplyHeader(uncompressedBuffer, compressedHeader), uncompressedBuffer); } else { ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer); releaseMessageBuffer = false; @@ -997,6 +1008,9 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC } } } catch (Throwable t) { + if (uncompressedBuffer != null) { + uncompressedBuffer.release(); + } close(); throw translateReadException(t, operationContext); } @@ -1028,7 +1042,6 @@ private class MessageHeaderCallback implements SingleResultCallback { @Override public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t) { if (t != null) { - close(); callback.onResult(null, t); return; } @@ -1063,7 +1076,6 @@ private class MessageCallback implements SingleResultCallback { @Override public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t) { if (t != null) { - close(); callback.onResult(null, t); return; } @@ -1071,6 +1083,9 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t assertNotNull(result); ResponseBuffers responseBuffers = null; Throwable bodyParsingFailure = null; + // The uncompressed buffer is allocated by us and is not handed to ResponseBuffers until the + // last statement of the try, so it must be released if anything fails beforehand. + ByteBuf uncompressedBuffer = null; try { ReplyHeader replyHeader; ByteBuf responseBuffer; @@ -1078,17 +1093,12 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t try { CompressedHeader compressedHeader = new CompressedHeader(result, messageHeader); Compressor compressor = getCompressor(compressedHeader); - ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize()); - try { - compressor.uncompress(result, buffer); - - buffer.flip(); - replyHeader = new ReplyHeader(buffer, compressedHeader); - } catch (Throwable localThrowable) { - buffer.release(); - throw localThrowable; - } - responseBuffer = buffer; + uncompressedBuffer = getBuffer(compressedHeader.getUncompressedSize()); + compressor.uncompress(result, uncompressedBuffer); + + uncompressedBuffer.flip(); + replyHeader = new ReplyHeader(uncompressedBuffer, compressedHeader); + responseBuffer = uncompressedBuffer; } finally { releaseResult = false; result.release(); @@ -1098,8 +1108,13 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t responseBuffer = result; releaseResult = false; } + // Must be the last statement in the try: ResponseBuffers now owns responseBuffer, so + // nothing that could throw may follow it, or the buffer would leak (it is not released below). responseBuffers = new ResponseBuffers(replyHeader, responseBuffer); } catch (Throwable localThrowable) { + if (uncompressedBuffer != null) { + uncompressedBuffer.release(); + } bodyParsingFailure = localThrowable; } finally { if (releaseResult) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 2d083d375d8..3a2191cdcf9 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -51,6 +51,8 @@ import org.bson.io.BasicOutputBuffer; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -76,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -84,85 +87,65 @@ class InternalStreamConnectionTest { private static final ServerId SERVER_ID = new ServerId(new ClusterId(), new ServerAddress()); /** - * Verifies that when {@code stream.readAsync()} throws an {@link OutOfMemoryError} - * during body buffer allocation (after header read succeeds), the connection is closed. + * Unchecked failures that {@code stream.readAsync()} may raise during body buffer allocation: + * an {@link Error} (e.g. heap exhaustion) and a {@link RuntimeException}. */ - @Test - @DisplayName("Async: connection closed when readAsync throws Error during body read") - void asyncClosesConnectionWhenReadAsyncThrowsError() { - // Track whether readAsync is called for the body (second call) - AtomicInteger readAsyncCallCount = new AtomicInteger(); - - // A stream that succeeds on header read but throws OOM on body read - TestStream stream = new TestStream() { - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - if (readAsyncCallCount.incrementAndGet() == 1) { - // First read: 16-byte header - complete successfully with a valid header - handler.completed(createValidResponseHeader(getCommandMessageId())); - } else { - // Second read: body - throw OutOfMemoryError (simulates heap exhaustion - // during buffer allocation in AsynchronousChannelStream.readAsync) - throw new OutOfMemoryError("Java heap space"); - } - } - }; - - InternalStreamConnection connection = createOpenConnection(stream); - - // Send a command asynchronously - the write will succeed, then readAsync for the body will throw OOM - FutureResultCallback callback = new FutureResultCallback<>(); - CommandMessage commandMessage = createPingCommand(); - stream.setCommandMessageId(commandMessage.getId()); - - connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); - - // The callback should receive an error - assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); - assertFalse(callback.wasInvokedMultipleTimes()); - - assertTrue(connection.isClosed(), - "Connection should be closed after Error in readAsync to prevent pool reuse with stale data"); - assertTrue(stream.wasClosed(), - "Underlying stream should be closed to release socket resources"); + // Fully-qualified java.util.stream.Stream: an import would shadow the same-package + // com.mongodb.internal.connection.Stream that TestStream implements. + private static java.util.stream.Stream readAsyncBodyFailures() { + return java.util.stream.Stream.of( + new OutOfMemoryError("Java heap space"), + new RuntimeException("Simulated allocation failure")); } /** - * Verifies that when {@code stream.readAsync()} throws a {@link RuntimeException}, - * the connection is closed, just as for an {@link Error}. + * Verifies that when {@code stream.readAsync()} throws during body buffer allocation (after the + * header read succeeds), the connection is closed and the original throwable is preserved as the + * cause. Covers both an {@link Error} and a {@link RuntimeException}. */ - @Test - @DisplayName("Async: connection closed when readAsync throws Exception during body read") - void asyncClosesConnectionWhenReadAsyncThrowsException() { + @ParameterizedTest + @MethodSource("readAsyncBodyFailures") + @DisplayName("Async: connection closed when readAsync throws during body read") + void asyncClosesConnectionWhenReadAsyncThrowsDuringBodyRead(final Throwable bodyReadFailure) { + // Track whether readAsync is called for the body (second call) AtomicInteger readAsyncCallCount = new AtomicInteger(); + // A stream that succeeds on header read but throws on body read (simulates heap exhaustion + // or an unexpected failure during buffer allocation in AsynchronousChannelStream.readAsync) TestStream stream = new TestStream() { @Override public void readAsync(final int numBytes, final OperationContext operationContext, final AsyncCompletionHandler handler) { if (readAsyncCallCount.incrementAndGet() == 1) { + // First read: 16-byte header - complete successfully with a valid header handler.completed(createValidResponseHeader(getCommandMessageId())); + } else if (bodyReadFailure instanceof Error) { + throw (Error) bodyReadFailure; } else { - // a RuntimeException must close the connection, just like an Error - throw new RuntimeException("Simulated allocation failure"); + throw (RuntimeException) bodyReadFailure; } } }; InternalStreamConnection connection = createOpenConnection(stream); + // Send a command asynchronously - the write will succeed, then readAsync for the body will throw FutureResultCallback callback = new FutureResultCallback<>(); CommandMessage commandMessage = createPingCommand(); stream.setCommandMessageId(commandMessage.getId()); connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); - assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + // The callback should receive an error wrapping the original throwable + MongoInternalException thrown = assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertSame(bodyReadFailure, thrown.getCause(), + "The original throwable should be preserved as the cause"); assertFalse(callback.wasInvokedMultipleTimes()); assertTrue(connection.isClosed(), - "Connection should be closed when readAsync throws a regular Exception"); + "Connection should be closed after a failure in readAsync to prevent pool reuse with stale data"); + assertTrue(stream.wasClosed(), + "Underlying stream should be closed to release socket resources"); } /** @@ -367,15 +350,7 @@ public void write(final List buffers, final OperationContext operationC public ByteBuf read(final int numBytes, final OperationContext operationContext) { recordUnexpectedCall(); throw new UnsupportedOperationException("no read expected after write failure"); - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -408,15 +383,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(); } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -484,15 +451,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(errorResponse); } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -570,15 +529,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(errorResponse); } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -650,15 +601,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(); } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -729,15 +672,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createCorruptResponseBody(); } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -829,11 +764,56 @@ public void close() { FutureResultCallback callback = new FutureResultCallback<>(); connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); - assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + MongoInternalException thrown = assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertTrue(thrown.getMessage().contains("does not match the requestId")); + assertFalse(callback.wasInvokedMultipleTimes()); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertEquals(asList("bodyRelease", "close"), order, + "Response body buffer must be released before the stream is closed"); + } + + /** + * Verifies that on a responseTo mismatch in the synchronous path, the response body buffer is + * released BEFORE the stream is closed. The try-with-resources in receiveCommandMessageResponse + * closes the ResponseBuffers (releasing the body buffer) before the catch block closes the + * connection; NettyStream.close() requires all buffers it handed out to be released first. + */ + @Test + @DisplayName("Sync: response buffer released before connection close on responseTo mismatch") + void syncReleasesResponseBufferBeforeCloseOnResponseToMismatch() { + AtomicInteger readCallCount = new AtomicInteger(); + List order = new ArrayList<>(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + // Header: responseTo deliberately mismatches (commandId + 1) + return createValidResponseHeader(getCommandMessageId() + 1); + } else { + return recordingRelease(createResponseBodyBuffer(), order, "bodyRelease"); + } + } + @Override + public void close() { + order.add("close"); + super.close(); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertTrue(thrown.getMessage().contains("does not match the requestId")); assertTrue(connection.isClosed(), "Connection should be closed after responseTo mismatch"); assertEquals(asList("bodyRelease", "close"), order, "Response body buffer must be released before the stream is closed"); + assertFalse(stream.hadUnexpectedCall()); } /** @@ -863,15 +843,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return compressedBody; } - } - - @Override - public void readAsync(final int numBytes, final OperationContext operationContext, - final AsyncCompletionHandler handler) { - recordUnexpectedCall(); - throw new UnsupportedOperationException("readAsync should not be called in the sync path"); - } - }; + } }; InternalStreamConnection connection = createOpenConnection(stream, null, Collections.singletonList(MongoCompressor.createZlibCompressor())); @@ -1245,9 +1217,20 @@ public void writeAsync(final List buffers, final OperationContext opera handler.completed(null); } + // Both read methods default to recording an unexpected call and throwing: a test that uses the + // sync path overrides read() (and inherits this readAsync), and vice versa. Recording (rather + // than calling fail()) is required because the production code catches Throwable. @Override public ByteBuf read(final int numBytes, final OperationContext operationContext) { - throw new UnsupportedOperationException(); + recordUnexpectedCall(); + throw new UnsupportedOperationException("read should not be called in this test"); + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + recordUnexpectedCall(); + throw new UnsupportedOperationException("readAsync should not be called in this test"); } @Override From b6974f73206d4a61cf41d1621e7bc19cc9516361 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 17 Jun 2026 12:40:52 +0100 Subject: [PATCH 6/9] Address review feedback: dedupe command-failure handling and test cleanups - Extract shared onCommandFailure helper for the identical close/failed-event logic on the sync and async command paths - Clarify connectionIsReusable Javadoc for MongoOperationTimeoutException to cover both the write-concern and MaxTimeMSExpired (ok:0) timeout sources - Return List instead of fully-qualified Stream from readAsyncBodyFailures @MethodSource, removing the inline FQN - Fix mangled brace formatting in TestStream anonymous subclasses --- .../connection/InternalStreamConnection.java | 32 +++++++++++-------- .../InternalStreamConnectionTest.java | 27 +++++++++------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index caf74869026..ad06be67ccc 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -585,12 +585,7 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm return commandResult; } catch (Throwable t) { - if (!connectionIsReusable(t)) { - close(); - } - if (!commandSuccessful) { - commandEventSender.sendFailedEvent(t); - } + onCommandFailure(t, commandSuccessful, commandEventSender); if (tracingSpan != null) { if (t instanceof MongoCommandException) { MongodbObservationContext ctx = tracingSpan.getMongodbObservationContext(); @@ -616,7 +611,8 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm *
    *
  • {@link MongoCommandException} — an {@code ok: 0} error response (and subclasses)
  • *
  • {@link MongoWriteConcernWithResponseException} — a write concern error carrying the response
  • - *
  • {@link MongoOperationTimeoutException} — a write concern timeout
  • + *
  • {@link MongoOperationTimeoutException} — a write concern timeout, or a server-side + * {@code MaxTimeMSExpired} ({@code ok: 0}) timeout; both are derived from the response body
  • *
  • {@link BsonSerializationException} — a corrupt BSON body whose exact byte count was still consumed
  • *
* Any other failure (e.g. a responseTo mismatch or an unexpected error) may have left the stream @@ -629,6 +625,21 @@ private static boolean connectionIsReusable(final Throwable failure) { || failure instanceof BsonSerializationException; } + /** + * Handles a failure raised while sending or processing a command, identically on the sync and async paths: + * closes the connection unless the failure leaves it {@linkplain #connectionIsReusable reusable}, and emits a + * command-failed event unless a succeeded event has already been sent. + */ + private void onCommandFailure(final Throwable failure, final boolean commandSuccessful, + final CommandEventSender commandEventSender) { + if (!connectionIsReusable(failure)) { + close(); + } + if (!commandSuccessful) { + commandEventSender.sendFailedEvent(failure); + } + } + private void sendAndReceiveAsyncInternal(final CommandMessage message, final Decoder decoder, final OperationContext operationContext, final SingleResultCallback callback) { if (isClosed()) { @@ -777,12 +788,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d responseBuffers.close(); } if (failure != null) { - if (!connectionIsReusable(failure)) { - close(); - } - if (!commandSuccessful) { - commandEventSender.sendFailedEvent(failure); - } + onCommandFailure(failure, commandSuccessful, commandEventSender); callback.onResult(null, failure); return; } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 3a2191cdcf9..8bf9bf69867 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -90,10 +90,8 @@ class InternalStreamConnectionTest { * Unchecked failures that {@code stream.readAsync()} may raise during body buffer allocation: * an {@link Error} (e.g. heap exhaustion) and a {@link RuntimeException}. */ - // Fully-qualified java.util.stream.Stream: an import would shadow the same-package - // com.mongodb.internal.connection.Stream that TestStream implements. - private static java.util.stream.Stream readAsyncBodyFailures() { - return java.util.stream.Stream.of( + private static List readAsyncBodyFailures() { + return asList( new OutOfMemoryError("Java heap space"), new RuntimeException("Simulated allocation failure")); } @@ -350,7 +348,8 @@ public void write(final List buffers, final OperationContext operationC public ByteBuf read(final int numBytes, final OperationContext operationContext) { recordUnexpectedCall(); throw new UnsupportedOperationException("no read expected after write failure"); - } }; + } + }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -383,7 +382,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(); } - } }; + } + }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -451,7 +451,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(errorResponse); } - } }; + } + }; InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); @@ -529,7 +530,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(errorResponse); } - } }; + } + }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -601,7 +603,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createResponseBody(); } - } }; + } + }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -672,7 +675,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return createCorruptResponseBody(); } - } }; + } + }; TestCommandListener listener = new TestCommandListener(); InternalStreamConnection connection = createOpenConnection(stream, listener); @@ -843,7 +847,8 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) } else { return compressedBody; } - } }; + } + }; InternalStreamConnection connection = createOpenConnection(stream, null, Collections.singletonList(MongoCompressor.createZlibCompressor())); From ac7df01df307b6bcd4446cb537d2a9f55198ea6c Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 17 Jun 2026 12:48:53 +0100 Subject: [PATCH 7/9] Propagate fatal Errors unchanged instead of wrapping as MongoInternalException The widened catch(Throwable) on the write and async-read paths routed JVM Errors (e.g. OutOfMemoryError) through the read/write exception translators, downgrading a fatal Error to a catchable MongoInternalException. This was inconsistent with the sync command path, which rethrows Throwable raw. Errors are now passed through unchanged: the sync write/read paths rethrow them, and the async paths deliver them to the callback unmodified (throwing through the async callback machinery would bounce through nested catch blocks, so the failure is delivered as a value via translateReadFailure / completeExceptionally). The connection is still closed on every path. Updates the async/sync write-Error and async read-Error tests to expect the raw Error, and adds a sync read-Error test covering receiveResponseBuffers. --- .../connection/InternalStreamConnection.java | 29 +++++++++- .../InternalStreamConnectionTest.java | 56 ++++++++++++++++--- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index ad06be67ccc..13b546ecead 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -899,12 +899,12 @@ public void completed(@Nullable final ByteBuf buffer) { @Override public void failed(final Throwable t) { close(); - callback.onResult(null, translateReadException(t, operationContext)); + callback.onResult(null, translateReadFailure(t, operationContext)); } }); } catch (Throwable t) { close(); - callback.onResult(null, translateReadException(t, operationContext)); + callback.onResult(null, translateReadFailure(t, operationContext)); } } @@ -928,7 +928,19 @@ private void updateSessionContext(final SessionContext sessionContext, final Res } } + /** + * Rethrows a fatal JVM {@link Error} (e.g. {@link OutOfMemoryError}) unchanged, so it is never downgraded to a + * catchable {@link MongoException}. Used by the paths that propagate a failure by throwing; the async read path + * delivers the failure as a callback value instead and uses {@link #translateReadFailure} for the same purpose. + */ + private static void rethrowIfError(final Throwable t) { + if (t instanceof Error) { + throw (Error) t; + } + } + private void throwTranslatedWriteException(final Throwable e, final OperationContext operationContext) { + rethrowIfError(e); if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasTimeoutMS()) { throw createMongoTimeoutException(e); } @@ -946,6 +958,18 @@ private void throwTranslatedWriteException(final Throwable e, final OperationCon } } + /** + * Translates a read failure for delivery to an async callback. {@link Error}s are passed through unchanged + * rather than wrapped in a {@link MongoException}, so a fatal JVM error (e.g. {@link OutOfMemoryError}) is not + * downgraded to a catchable exception. The sync read path uses {@link #rethrowIfError} for the same purpose. + */ + private Throwable translateReadFailure(final Throwable e, final OperationContext operationContext) { + if (e instanceof Error) { + return e; + } + return translateReadException(e, operationContext); + } + private MongoException translateReadException(final Throwable e, final OperationContext operationContext) { if (operationContext.getTimeoutContext().hasTimeoutMS()) { if (e instanceof SocketTimeoutException) { @@ -1018,6 +1042,7 @@ private ResponseBuffers receiveResponseBuffers(final OperationContext operationC uncompressedBuffer.release(); } close(); + rethrowIfError(t); throw translateReadException(t, operationContext); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 8bf9bf69867..7eec1dce7a3 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -98,8 +98,9 @@ private static List readAsyncBodyFailures() { /** * Verifies that when {@code stream.readAsync()} throws during body buffer allocation (after the - * header read succeeds), the connection is closed and the original throwable is preserved as the - * cause. Covers both an {@link Error} and a {@link RuntimeException}. + * header read succeeds), the connection is closed. A fatal {@link Error} is delivered to the callback + * unchanged, while a {@link RuntimeException} is wrapped in a {@link MongoInternalException} that + * preserves the original as its cause. */ @ParameterizedTest @MethodSource("readAsyncBodyFailures") @@ -134,10 +135,17 @@ public void readAsync(final int numBytes, final OperationContext operationContex connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); - // The callback should receive an error wrapping the original throwable - MongoInternalException thrown = assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); - assertSame(bodyReadFailure, thrown.getCause(), - "The original throwable should be preserved as the cause"); + // A fatal Error is delivered unchanged; a RuntimeException is wrapped in a MongoInternalException + // with the original preserved as the cause. + Throwable thrown = assertThrows(Throwable.class, () -> callback.get(5, TimeUnit.SECONDS)); + if (bodyReadFailure instanceof Error) { + assertSame(bodyReadFailure, thrown, + "A fatal Error should be delivered unchanged, not downgraded to a MongoException"); + } else { + assertInstanceOf(MongoInternalException.class, thrown); + assertSame(bodyReadFailure, thrown.getCause(), + "The original throwable should be preserved as the cause"); + } assertFalse(callback.wasInvokedMultipleTimes()); assertTrue(connection.isClosed(), @@ -232,7 +240,9 @@ public void writeAsync(final List buffers, final OperationContext opera connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); - assertThrows(MongoInternalException.class, () -> callback.get(5, TimeUnit.SECONDS)); + OutOfMemoryError thrown = assertThrows(OutOfMemoryError.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertEquals("Java heap space", thrown.getMessage(), + "A fatal Error should be delivered unchanged, not downgraded to a MongoException"); assertFalse(callback.wasInvokedMultipleTimes()); assertTrue(connection.isClosed(), @@ -354,9 +364,10 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) InternalStreamConnection connection = createOpenConnection(stream); CommandMessage commandMessage = createPingCommand(); - MongoInternalException thrown = assertThrows(MongoInternalException.class, + OutOfMemoryError thrown = assertThrows(OutOfMemoryError.class, () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); - assertInstanceOf(OutOfMemoryError.class, thrown.getCause()); + assertEquals("Java heap space", thrown.getMessage(), + "A fatal Error should be propagated unchanged, not downgraded to a MongoException"); assertTrue(connection.isClosed(), "Connection should be closed after Error in write to prevent pool reuse with a half-written message"); assertTrue(stream.wasClosed(), @@ -365,6 +376,33 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) "No read should be attempted after write failure"); } + /** + * Verifies that when {@code stream.read()} throws an {@link Error} in the synchronous path, the + * connection is closed and the fatal Error is propagated unchanged (mirrors the async read behavior). + */ + @Test + @DisplayName("Sync: connection closed when read throws Error") + void syncClosesConnectionWhenReadThrowsError() { + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + throw new OutOfMemoryError("Java heap space"); + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + + OutOfMemoryError thrown = assertThrows(OutOfMemoryError.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertEquals("Java heap space", thrown.getMessage(), + "A fatal Error should be propagated unchanged, not downgraded to a MongoException"); + assertTrue(connection.isClosed(), + "Connection should be closed after Error in read to prevent pool reuse with stale data"); + assertTrue(stream.wasClosed(), + "Underlying stream should be closed to release socket resources"); + } + /** * Verifies that a responseTo mismatch in the synchronous path closes the connection. */ From cace84d14ff597ddc97f304cfe04383211dd4007 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 17 Jun 2026 14:16:12 +0100 Subject: [PATCH 8/9] Fix regression: add MongoExecutionTimeoutException to connectionIsReusable Without CSOT, a MaxTimeMSExpired (errorCode 50) server error is raised as MongoExecutionTimeoutException, which does not extend MongoCommandException. The new connectionIsReusable() guard did not include it, so the connection was incorrectly closed, causing abortTransaction to retry and emit a second commandStartedEvent. --- .../connection/InternalStreamConnection.java | 6 +- .../InternalStreamConnectionTest.java | 91 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 13b546ecead..9de0cc63852 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -21,6 +21,7 @@ import com.mongodb.MongoCommandException; import com.mongodb.MongoCompressor; import com.mongodb.MongoException; +import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.MongoInternalException; import com.mongodb.MongoInterruptedException; import com.mongodb.MongoOperationTimeoutException; @@ -610,9 +611,11 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm * the wire, so the stream remains synchronized and the connection can be returned to the pool: *
    *
  • {@link MongoCommandException} — an {@code ok: 0} error response (and subclasses)
  • + *
  • {@link MongoExecutionTimeoutException} — a server-side {@code MaxTimeMSExpired} ({@code ok: 0}) + * error response; derived from the fully-read response body
  • *
  • {@link MongoWriteConcernWithResponseException} — a write concern error carrying the response
  • *
  • {@link MongoOperationTimeoutException} — a write concern timeout, or a server-side - * {@code MaxTimeMSExpired} ({@code ok: 0}) timeout; both are derived from the response body
  • + * {@code MaxTimeMSExpired} ({@code ok: 0}) timeout wrapped for CSOT; both are derived from the response body *
  • {@link BsonSerializationException} — a corrupt BSON body whose exact byte count was still consumed
  • *
* Any other failure (e.g. a responseTo mismatch or an unexpected error) may have left the stream @@ -620,6 +623,7 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm */ private static boolean connectionIsReusable(final Throwable failure) { return failure instanceof MongoCommandException + || failure instanceof MongoExecutionTimeoutException || failure instanceof MongoWriteConcernWithResponseException || failure instanceof MongoOperationTimeoutException || failure instanceof BsonSerializationException; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 7eec1dce7a3..5c81f6ca86e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -18,6 +18,7 @@ import com.mongodb.MongoCommandException; import com.mongodb.MongoCompressor; +import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.MongoInternalException; import com.mongodb.MongoSocketReadException; import com.mongodb.ServerAddress; @@ -549,6 +550,89 @@ public void readAsync(final int numBytes, final OperationContext operationContex connection.close(); } + /** + * Verifies that a MaxTimeMSExpired (errorCode 50) server response in the synchronous path does NOT close + * the connection. Without CSOT the exception is {@link MongoExecutionTimeoutException}, which extends + * {@link com.mongodb.MongoException} but NOT {@link MongoCommandException}. Before the fix, it was + * incorrectly excluded from the reusable-connection list and triggered a spurious {@code close()}. + */ + @Test + @DisplayName("Sync: connection NOT closed on MaxTimeMSExpired (errorCode 50) without CSOT") + void syncDoesNotCloseConnectionOnMaxTimeMSExpired() { + AtomicInteger readCallCount = new AtomicInteger(); + BsonDocument errorResponse = createMaxTimeMSExpiredDocument(); + + TestStream stream = new TestStream() { + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + if (readCallCount.incrementAndGet() == 1) { + return createValidResponseHeader(getCommandMessageId(), errorResponse); + } else { + return createResponseBody(errorResponse); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + MongoExecutionTimeoutException thrown = assertThrows(MongoExecutionTimeoutException.class, + () -> connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), createOperationContext())); + assertEquals(50, thrown.getCode()); + assertFalse(connection.isClosed(), + "Connection should NOT be closed on MaxTimeMSExpired (the response was fully read)"); + assertFalse(stream.wasClosed(), + "Underlying stream should NOT be closed on MaxTimeMSExpired"); + assertFalse(stream.hadUnexpectedCall()); + + connection.close(); + } + + /** + * Verifies that a MaxTimeMSExpired (errorCode 50) server response in the asynchronous path does NOT close + * the connection. Without CSOT the exception is {@link MongoExecutionTimeoutException}, which extends + * {@link com.mongodb.MongoException} but NOT {@link MongoCommandException}. Before the fix, it was + * incorrectly excluded from the reusable-connection list and triggered a spurious {@code close()}. + */ + @Test + @DisplayName("Async: connection NOT closed on MaxTimeMSExpired (errorCode 50) without CSOT") + void asyncDoesNotCloseConnectionOnMaxTimeMSExpired() { + AtomicInteger readAsyncCallCount = new AtomicInteger(); + BsonDocument errorResponse = createMaxTimeMSExpiredDocument(); + + TestStream stream = new TestStream() { + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + if (readAsyncCallCount.incrementAndGet() == 1) { + handler.completed(createValidResponseHeader(getCommandMessageId(), errorResponse)); + } else { + handler.completed(createResponseBody(errorResponse)); + } + } + }; + + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), createOperationContext(), callback); + + MongoExecutionTimeoutException thrown = assertThrows(MongoExecutionTimeoutException.class, + () -> callback.get(5, TimeUnit.SECONDS)); + assertEquals(50, thrown.getCode()); + assertFalse(callback.wasInvokedMultipleTimes()); + + assertFalse(connection.isClosed(), + "Connection should NOT be closed on MaxTimeMSExpired (the response was fully read)"); + assertFalse(stream.wasClosed(), + "Underlying stream should NOT be closed on MaxTimeMSExpired"); + + connection.close(); + } + /** * Verifies that when a command error response (ok: 0) carries a mismatched responseTo in the * synchronous path, command monitoring receives exactly one started and one failed event. @@ -1037,6 +1121,13 @@ private static BsonDocument createCommandErrorDocument() { .append("codeName", new BsonString("WriteConflict")); } + private static BsonDocument createMaxTimeMSExpiredDocument() { + return new BsonDocument("ok", new BsonInt32(0)) + .append("errmsg", new BsonString("operation exceeded time limit")) + .append("code", new BsonInt32(50)) + .append("codeName", new BsonString("MaxTimeMSExpired")); + } + /** * Creates a valid OP_REPLY response body: reply header (20 bytes) + BSON {ok: 1}. */ From 0ecb0fef2e2cfd691f149c5b30d3daa6f8f484f6 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 17 Jun 2026 16:09:56 +0100 Subject: [PATCH 9/9] Detect responseTo mismatch before processing the response body A responseTo mismatch means the stream is desynchronized: the reply belongs to a different request than the one just sent. Previously the mismatch was only detected once the body was being turned into a result (in ReplyMessage, reached via getCommandResult, or via getResponseDocument while building the succeeded/failed event). By then updateSessionContext had already run against the mismatched body, gossiping its operationTime, clusterTime, snapshot timestamp and recovery token into the session - i.e. advancing session state from a reply that does not belong to this operation. The detection point was also inconsistent: with a command listener attached it surfaced in sendSucceededEvent -> getResponseDocument, but with NoOpCommandEventSender it slipped through to getCommandResult. Hoist the check into a new assertResponseToMatches() helper, invoked as the first statement of the response-handling try block on both the sync and async paths - before updateSessionContext and before any event is built. Both call sites remain inside the existing try, so the failure still routes through onCommandFailure, which closes the connection to keep the desynchronized stream out of the pool, and the buffer-release-before-close ordering is preserved. ReplyMessage's own check is retained as a backstop for its other callers. The mismatch message is shared via ReplyMessage.responseToMismatchMessage() so the backstop and the hoisted guard cannot drift apart. Tests run over both the sync and async paths and assert that session state (operation/cluster time, snapshot timestamp) and, with an active transaction, the recovery token are not advanced on a mismatch - each anchored by a matching-responseTo control that asserts the recording session context observed the exact values the reply body carried. JAVA-6210 --- .../connection/InternalStreamConnection.java | 18 ++ .../internal/connection/ReplyMessage.java | 13 +- .../InternalStreamConnectionTest.java | 257 ++++++++++++++++++ 3 files changed, 286 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 9de0cc63852..55759ce50fd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -567,6 +567,7 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm final OperationContext operationContext, @Nullable final Span tracingSpan) { boolean commandSuccessful = false; try (ResponseBuffers responseBuffers = receiveResponseBuffers(operationContext)) { + assertResponseToMatches(responseBuffers, responseTo); updateSessionContext(operationContext.getSessionContext(), responseBuffers); if (!isCommandOk(responseBuffers)) { throw getCommandFailureException(responseBuffers.getResponseDocument(responseTo, @@ -773,6 +774,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d boolean commandSuccessful = false; Throwable failure = null; try { + assertResponseToMatches(responseBuffers, messageId); updateSessionContext(operationContext.getSessionContext(), responseBuffers); boolean commandOk = isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer()))); @@ -802,6 +804,22 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d }); } + /** + * Verifies that the {@code responseTo} in the reply header matches the {@code requestId} of the request just sent. + * A mismatch means the stream is desynchronized — the reply belongs to a different request — so this check is made + * as soon as the framed response has been read, before the body is processed. In particular it runs before + * {@link #updateSessionContext} could advance session state (operation/cluster time, snapshot timestamp, recovery + * token) from a reply that does not belong to this operation. The failure is raised on the same path that + * {@linkplain #onCommandFailure handles command failures}, which closes the connection to keep the desynchronized + * stream out of the pool. + */ + private void assertResponseToMatches(final ResponseBuffers responseBuffers, final int requestId) { + int actualResponseTo = responseBuffers.getReplyHeader().getResponseTo(); + if (requestId != actualResponseTo) { + throw new MongoInternalException(ReplyMessage.responseToMismatchMessage(actualResponseTo, requestId)); + } + } + private T getCommandResult(final Decoder decoder, final ResponseBuffers responseBuffers, final int messageId, diff --git a/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java b/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java index 68af818281e..a99d828e1fd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ReplyMessage.java @@ -34,10 +34,19 @@ public class ReplyMessage { private final T document; + /** + * The message used by every responseTo-mismatch failure. A mismatch means the reply belongs to a different request + * than the one just sent, i.e. the stream is desynchronized. Shared so the check in this class and the earlier guard + * in {@link InternalStreamConnection} cannot drift apart. + */ + static String responseToMismatchMessage(final long responseTo, final long requestId) { + return format("The responseTo (%d) in the response does not match the requestId (%d) in the request", responseTo, requestId); + } + public ReplyMessage(final ResponseBuffers responseBuffers, final Decoder decoder, final long requestId) { if (requestId != responseBuffers.getReplyHeader().getResponseTo()) { - throw new MongoInternalException(format("The responseTo (%d) in the response does not match the requestId (%d) in the " - + "request", responseBuffers.getReplyHeader().getResponseTo(), requestId)); + throw new MongoInternalException( + responseToMismatchMessage(responseBuffers.getReplyHeader().getResponseTo(), requestId)); } try (BsonInput bsonInput = new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer().duplicate())) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java index 5c81f6ca86e..9b8f22aca32 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionTest.java @@ -45,6 +45,7 @@ import org.bson.BsonInt32; import org.bson.BsonSerializationException; import org.bson.BsonString; +import org.bson.BsonTimestamp; import org.bson.ByteBuf; import org.bson.ByteBufNIO; import org.bson.codecs.BsonDocumentCodec; @@ -54,6 +55,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -79,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -779,6 +782,108 @@ public void readAsync(final int numBytes, final OperationContext operationContex "Connection should be closed after responseTo mismatch"); } + /** + * Verifies that on a responseTo mismatch, session state (operation time, cluster time, snapshot + * timestamp) is NOT advanced from the mismatched reply on either path. The reply belongs to a + * different request, so advancing the session from its body would corrupt session state; the + * mismatch must be detected before the body is processed. + */ + @ParameterizedTest(name = "async={0}") + @ValueSource(booleans = {false, true}) + void doesNotAdvanceSessionStateOnResponseToMismatch(final boolean async) { + TestStream stream = okResponseStream(MISMATCHED_RESPONSE_TO, createOkResponseWithSessionState()); + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + RecordingSessionContext sessionContext = new RecordingSessionContext(); + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> sendAndReceive(connection, commandMessage, sessionContext, async)); + assertTrue(thrown.getMessage().contains("does not match the requestId")); + assertFalse(sessionContext.sessionStateAdvanced(), + "Session state must not be advanced from a reply that does not match the request"); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Control for {@link #doesNotAdvanceSessionStateOnResponseToMismatch}: when the responseTo matches, + * session state IS advanced from the reply body, and to the values the body carried. This proves the + * recording session context observes {@code updateSessionContext} on both paths, so the "not advanced" + * assertions on a mismatch are meaningful rather than vacuously true. + */ + @ParameterizedTest(name = "async={0}") + @ValueSource(booleans = {false, true}) + void advancesSessionStateWhenResponseToMatches(final boolean async) { + TestStream stream = okResponseStream(MATCHING_RESPONSE_TO, createOkResponseWithSessionState()); + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + RecordingSessionContext sessionContext = new RecordingSessionContext(); + sendAndReceive(connection, commandMessage, sessionContext, async); + assertEquals(SESSION_STATE_TIMESTAMP, sessionContext.advancedOperationTime(), + "Operation time must be advanced from the matching reply body"); + assertEquals(new BsonDocument("clusterTime", SESSION_STATE_TIMESTAMP), sessionContext.advancedClusterTime(), + "Cluster time must be advanced from the matching reply body"); + assertEquals(SESSION_STATE_TIMESTAMP, sessionContext.advancedSnapshotTimestamp(), + "Snapshot timestamp must be advanced from the matching reply body"); + assertFalse(connection.isClosed(), + "Connection should stay open after a matching reply"); + assertFalse(stream.hadUnexpectedCall()); + + connection.close(); + } + + /** + * Verifies that on a responseTo mismatch with an active transaction, the recovery token is NOT advanced + * from the mismatched reply. {@code updateSessionContext} only gossips the recovery token while a + * transaction is active, so this exercises the branch the other session-state tests cannot reach. + */ + @ParameterizedTest(name = "async={0}") + @ValueSource(booleans = {false, true}) + void doesNotAdvanceRecoveryTokenOnResponseToMismatch(final boolean async) { + TestStream stream = okResponseStream(MISMATCHED_RESPONSE_TO, createOkResponseWithRecoveryToken()); + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + RecordingSessionContext sessionContext = new RecordingSessionContext(true); + MongoInternalException thrown = assertThrows(MongoInternalException.class, + () -> sendAndReceive(connection, commandMessage, sessionContext, async)); + assertTrue(thrown.getMessage().contains("does not match the requestId")); + assertNull(sessionContext.advancedRecoveryToken(), + "Recovery token must not be advanced from a reply that does not match the request"); + assertTrue(connection.isClosed(), + "Connection should be closed after responseTo mismatch"); + assertFalse(stream.hadUnexpectedCall()); + } + + /** + * Control for {@link #doesNotAdvanceRecoveryTokenOnResponseToMismatch}: when the responseTo matches, + * the recovery token IS advanced from the reply, to the value the body carried, proving the recording + * session context observes the recovery-token branch and the "not advanced" assertion above is meaningful. + */ + @ParameterizedTest(name = "async={0}") + @ValueSource(booleans = {false, true}) + void advancesRecoveryTokenWhenResponseToMatches(final boolean async) { + TestStream stream = okResponseStream(MATCHING_RESPONSE_TO, createOkResponseWithRecoveryToken()); + InternalStreamConnection connection = createOpenConnection(stream); + CommandMessage commandMessage = createPingCommand(); + stream.setCommandMessageId(commandMessage.getId()); + + RecordingSessionContext sessionContext = new RecordingSessionContext(true); + sendAndReceive(connection, commandMessage, sessionContext, async); + assertEquals(RECOVERY_TOKEN, sessionContext.advancedRecoveryToken(), + "Recovery token must be advanced from the matching reply body"); + assertFalse(connection.isClosed(), + "Connection should stay open after a matching reply"); + assertFalse(stream.hadUnexpectedCall()); + + connection.close(); + } + /** * Verifies that when the response document is corrupt (but fully read) in the synchronous * path, command monitoring receives a failed event and the connection stays open: the wire @@ -1038,6 +1143,58 @@ public void readAsync(final int numBytes, final OperationContext operationContex } } + private static final int MATCHING_RESPONSE_TO = 0; + private static final int MISMATCHED_RESPONSE_TO = 1; + private static final BsonTimestamp SESSION_STATE_TIMESTAMP = new BsonTimestamp(42, 1); + private static final BsonDocument RECOVERY_TOKEN = new BsonDocument("id", new BsonInt32(1)); + + /** + * Builds a {@link TestStream} that serves a single OP_REPLY framed response (16-byte header + body) for + * {@code body}, on either the sync or async read path. The reply header's responseTo is the connection's + * request id plus {@code responseToOffset}: {@link #MATCHING_RESPONSE_TO} yields a matching reply, while + * {@link #MISMATCHED_RESPONSE_TO} yields a desynchronized (mismatched) reply. + */ + private static TestStream okResponseStream(final int responseToOffset, final BsonDocument body) { + return new TestStream() { + private final AtomicInteger readCallCount = new AtomicInteger(); + + @Override + public ByteBuf read(final int numBytes, final OperationContext operationContext) { + return nextChunk(); + } + + @Override + public void readAsync(final int numBytes, final OperationContext operationContext, + final AsyncCompletionHandler handler) { + handler.completed(nextChunk()); + } + + private ByteBuf nextChunk() { + if (readCallCount.incrementAndGet() == 1) { + return createValidResponseHeader(getCommandMessageId() + responseToOffset, body); + } + return createResponseBody(body); + } + }; + } + + /** + * Sends {@code commandMessage} and waits for the response on the sync or async path, using {@code sessionContext} + * for the operation. Any failure (including a responseTo mismatch) is rethrown on both paths so callers can assert + * on it uniformly. + */ + private void sendAndReceive(final InternalStreamConnection connection, final CommandMessage commandMessage, + final RecordingSessionContext sessionContext, final boolean async) { + OperationContext operationContext = createOperationContext().withSessionContext(sessionContext); + if (async) { + FutureResultCallback callback = new FutureResultCallback<>(); + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), operationContext, callback); + callback.get(5, TimeUnit.SECONDS); + } else { + connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), operationContext); + } + } + private InternalStreamConnection createOpenConnection(final TestStream stream) { return createOpenConnection(stream, null); } @@ -1128,6 +1285,26 @@ private static BsonDocument createMaxTimeMSExpiredDocument() { .append("codeName", new BsonString("MaxTimeMSExpired")); } + /** + * Creates a successful (ok: 1) response document that also carries session state the driver would + * gossip into the session context: operationTime, $clusterTime and an atClusterTime snapshot timestamp. + */ + private static BsonDocument createOkResponseWithSessionState() { + return new BsonDocument("ok", new BsonInt32(1)) + .append("operationTime", SESSION_STATE_TIMESTAMP) + .append("$clusterTime", new BsonDocument("clusterTime", SESSION_STATE_TIMESTAMP)) + .append("atClusterTime", SESSION_STATE_TIMESTAMP); + } + + /** + * Creates a successful (ok: 1) response document carrying a transaction recovery token, which the driver + * gossips into the session context only while a transaction is active. + */ + private static BsonDocument createOkResponseWithRecoveryToken() { + return new BsonDocument("ok", new BsonInt32(1)) + .append("recoveryToken", RECOVERY_TOKEN); + } + /** * Creates a valid OP_REPLY response body: reply header (20 bytes) + BSON {ok: 1}. */ @@ -1300,6 +1477,86 @@ private static void assertStartedThenFailed(final TestCommandListener listener) assertInstanceOf(CommandFailedEvent.class, events.get(1)); } + /** + * A {@link SessionContext} that records whether the driver advanced any session state + * (operation time, cluster time, snapshot timestamp or, inside a transaction, the recovery token) + * from a response. Used to assert that a mismatched reply never contaminates the session. + * + *

When constructed with {@code activeTransaction = true} it reports an active transaction so the + * recovery-token branch of {@code updateSessionContext} is exercised; {@code getTransactionNumber} then + * returns a fixed value purely so command encoding succeeds. + */ + private static final class RecordingSessionContext extends NoOpSessionContext { + private final boolean activeTransaction; + private volatile BsonTimestamp advancedOperationTime; + private volatile BsonDocument advancedClusterTime; + private volatile BsonTimestamp advancedSnapshotTimestamp; + private volatile BsonDocument advancedRecoveryToken; + + RecordingSessionContext() { + this(false); + } + + RecordingSessionContext(final boolean activeTransaction) { + this.activeTransaction = activeTransaction; + } + + @Override + public void advanceOperationTime(@Nullable final BsonTimestamp operationTime) { + this.advancedOperationTime = operationTime; + } + + @Override + public void advanceClusterTime(@Nullable final BsonDocument clusterTime) { + this.advancedClusterTime = clusterTime; + } + + @Override + public void setSnapshotTimestamp(@Nullable final BsonTimestamp snapshotTimestamp) { + this.advancedSnapshotTimestamp = snapshotTimestamp; + } + + @Override + public boolean hasActiveTransaction() { + return activeTransaction; + } + + @Override + public long getTransactionNumber() { + return 1; + } + + @Override + public void setRecoveryToken(final BsonDocument recoveryToken) { + this.advancedRecoveryToken = recoveryToken; + } + + /** True if any of operation time, cluster time or snapshot timestamp was advanced from a response. */ + boolean sessionStateAdvanced() { + return advancedOperationTime != null || advancedClusterTime != null || advancedSnapshotTimestamp != null; + } + + @Nullable + BsonTimestamp advancedOperationTime() { + return advancedOperationTime; + } + + @Nullable + BsonDocument advancedClusterTime() { + return advancedClusterTime; + } + + @Nullable + BsonTimestamp advancedSnapshotTimestamp() { + return advancedSnapshotTimestamp; + } + + @Nullable + BsonDocument advancedRecoveryToken() { + return advancedRecoveryToken; + } + } + /** * A test Stream implementation that handles writes (no-op) and delegates reads to subclasses. * Unexpected calls must be recorded via {@link #recordUnexpectedCall()} rather than JUnit's