diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReadConsistencyStrategyHeaderTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReadConsistencyStrategyHeaderTests.java new file mode 100644 index 000000000000..beef7d9318d3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReadConsistencyStrategyHeaderTests.java @@ -0,0 +1,431 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ISessionContainer; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.RxGatewayStoreModel; +import com.azure.cosmos.implementation.ThinClientStoreModel; +import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpRequest; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.mockito.Mockito; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Unit tests for the ReadConsistencyStrategy RNTBD header (token ID 0x00F0, Byte type). + * + *

Why these tests exist

+ * The thin client proxy (Gateway V2) deserializes ReadConsistencyStrategy from the RNTBD binary + * frame — not from HTTP headers. A mismatch between the Java SDK's enum byte values and the + * proxy's C++ enum causes the proxy to either reject the request or apply the wrong strategy + * silently. These tests guard the contract: + * + * + *

Consistency headers decision matrix

+ * Users can set ConsistencyLevel (ConsistencyLevel) and ReadConsistencyStrategy (ReadConsistencyStrategy) at both client and + * request level. The SDK resolves contention before wire serialization: + *
+ * | Client ConsistencyLevel | Client ReadConsistencyStrategy | Request ConsistencyLevel | Request ReadConsistencyStrategy | Effective on wire           |
+ * |-----------|------------|------------|-------------|-----------------------------|
+ * | Session   | —          | —          | —           | ConsistencyLevel=Session (default)        |
+ * | Session   | —          | —          | LC          | ReadConsistencyStrategy=LC, ConsistencyLevel stripped         |
+ * | Session   | Eventual   | —          | LC          | ReadConsistencyStrategy=LC (req ReadConsistencyStrategy > client)   |
+ * | Session   | Eventual   | Eventual   | —           | ReadConsistencyStrategy=Eventual, ConsistencyLevel stripped   |
+ * | Session   | —          | Eventual   | LC          | ReadConsistencyStrategy=LC (req ReadConsistencyStrategy > req ConsistencyLevel)   |
+ * | Session   | LC         | —          | —           | ReadConsistencyStrategy=LC, ConsistencyLevel stripped         |
+ * | Session   | —          | —          | GLOBAL_STRONG| BadRequestException (non-Strong acct) |
+ * 
+ * Resolution rule: request-level ReadConsistencyStrategy > client-level ReadConsistencyStrategy > ConsistencyLevel. When a non-DEFAULT ReadConsistencyStrategy is + * effective, ConsistencyLevel is stripped to prevent dual-header rejection by the compute gateway or proxy. + * + *

Test regions

+ *
    + *
  1. Token-level — RNTBD token encoding, decoding, metadata, and enum constants.
  2. + *
  3. ThinClientStoreModel encoding — {@code wrapInHttpRequest()} produces correct RNTBD + * frame bytes for each strategy value.
  4. + *
  5. Resolve + encode pipeline — {@code resolveEffectiveConsistencyHeaders()} followed by + * {@code wrapInHttpRequest()} produces the correct frame for contention scenarios (both ConsistencyLevel + * and ReadConsistencyStrategy set, request-level overrides client-level, DEFAULT is transparent).
  6. + *
+ */ +public class RntbdReadConsistencyStrategyHeaderTests { + + // region Data providers + + @DataProvider(name = "readConsistencyStrategyToRntbdByteValues") + public Object[][] readConsistencyStrategyToRntbdByteValues() { + return new Object[][] { + { ReadConsistencyStrategy.EVENTUAL, RntbdConstants.RntbdReadConsistencyStrategy.Eventual.id() }, + { ReadConsistencyStrategy.SESSION, RntbdConstants.RntbdReadConsistencyStrategy.Session.id() }, + { ReadConsistencyStrategy.LATEST_COMMITTED, RntbdConstants.RntbdReadConsistencyStrategy.LatestCommitted.id() }, + { ReadConsistencyStrategy.GLOBAL_STRONG, RntbdConstants.RntbdReadConsistencyStrategy.GlobalStrong.id() }, + }; + } + + @DataProvider(name = "readConsistencyStrategyStringToRntbdByteValues") + public Object[][] readConsistencyStrategyStringToRntbdByteValues() { + return new Object[][] { + { "LatestCommitted", (byte) 0x03 }, + { "Eventual", (byte) 0x01 }, + { "Session", (byte) 0x02 }, + { "GlobalStrong", (byte) 0x04 }, + }; + } + + // endregion + + // region Token-level tests — RNTBD token metadata, encoding, and constants + + @Test(groups = { "unit" }) + public void readConsistencyStrategyTokenMetadata() { + assertThat(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy.id()) + .isEqualTo((short) 0x00FE); + assertThat(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy.type()) + .isEqualTo(RntbdTokenType.Byte); + assertThat(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy.isRequired()) + .isFalse(); + } + + @Test(groups = { "unit" }, dataProvider = "readConsistencyStrategyToRntbdByteValues") + public void readConsistencyStrategyTokenEncodesCorrectly( + ReadConsistencyStrategy ignoredStrategy, + byte expectedByteValue) { + + RntbdToken token = RntbdToken.create( + RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(token).isNotNull(); + assertThat(token.isPresent()).isFalse(); + + token.setValue(expectedByteValue); + assertThat(token.isPresent()).isTrue(); + assertThat(((Number) token.getValue()).byteValue()).isEqualTo(expectedByteValue); + } + + @Test(groups = { "unit" }) + public void readConsistencyStrategyTokenNotPresentWhenNotSet() { + RntbdToken token = RntbdToken.create( + RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(token.isPresent()).isFalse(); + } + + // endregion + + // region Token round-trip — verifies encode/decode symmetry for the ReadConsistencyStrategy + // Byte token. Guards against RNTBD frame corruption if the token serialization format changes. + + @Test(groups = { "unit" }, dataProvider = "readConsistencyStrategyToRntbdByteValues") + public void readConsistencyStrategyTokenRoundTrips( + ReadConsistencyStrategy ignoredStrategy, + byte expectedByteValue) { + + // Encode + RntbdToken token = RntbdToken.create( + RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + token.setValue(expectedByteValue); + + ByteBuf buffer = Unpooled.buffer(256); + try { + token.encode(buffer); + + // Decode + RntbdToken decodedToken = RntbdToken.create( + RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + // skip 3 bytes: 2 for header id + 1 for token type + buffer.readerIndex(3); + decodedToken.decode(buffer); + + assertThat(decodedToken.isPresent()).isTrue(); + assertThat(((Number) decodedToken.getValue()).byteValue()).isEqualTo(expectedByteValue); + } finally { + buffer.release(); + } + } + + @Test(groups = { "unit" }) + public void readConsistencyStrategyOverWireValuesMatchEnum() { + assertThat(ReadConsistencyStrategy.EVENTUAL.toString()).isEqualTo("Eventual"); + assertThat(ReadConsistencyStrategy.SESSION.toString()).isEqualTo("Session"); + assertThat(ReadConsistencyStrategy.LATEST_COMMITTED.toString()).isEqualTo("LatestCommitted"); + assertThat(ReadConsistencyStrategy.GLOBAL_STRONG.toString()).isEqualTo("GlobalStrong"); + assertThat(ReadConsistencyStrategy.DEFAULT.toString()).isEqualTo("Default"); + } + + @Test(groups = { "unit" }) + public void readConsistencyStrategyRntbdByteEnumValues() { + assertThat(RntbdConstants.RntbdReadConsistencyStrategy.Eventual.id()).isEqualTo((byte) 0x01); + assertThat(RntbdConstants.RntbdReadConsistencyStrategy.Session.id()).isEqualTo((byte) 0x02); + assertThat(RntbdConstants.RntbdReadConsistencyStrategy.LatestCommitted.id()).isEqualTo((byte) 0x03); + assertThat(RntbdConstants.RntbdReadConsistencyStrategy.GlobalStrong.id()).isEqualTo((byte) 0x04); + } + + @Test(groups = { "unit" }) + public void readConsistencyStrategyHttpHeaderConstant() { + assertThat(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY) + .isEqualTo("x-ms-cosmos-read-consistency-strategy"); + } + + // region No contention — single header encoding. Only one of ConsistencyLevel or ReadConsistencyStrategy is set (or neither). + // No resolution needed; tests pure RNTBD encoder correctness. + + @Test(groups = { "unit" }, dataProvider = "readConsistencyStrategyStringToRntbdByteValues") + public void thinClient_wrapInHttpRequest_readConsistencyStrategyEncodedInRntbdFrame(String readConsistencyStrategyValue, byte expectedByte) throws Exception { + // Calls ThinClientStoreModel.wrapInHttpRequest() — the actual production code — + // and verifies the RNTBD frame in the HTTP body contains the correct readConsistencyStrategy byte. + + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + request.getHeaders().put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyValue); + request.getHeaders().remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("RNTBD frame should contain readConsistencyStrategy 0x%02X for %s", expectedByte, readConsistencyStrategyValue) + .isEqualTo(expectedByte); + } + + @Test(groups = { "unit" }) + public void thinClient_wrapInHttpRequest_noReadConsistencyStrategyHeader_noRntbdToken() throws Exception { + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + // No readConsistencyStrategy header set + + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("ReadConsistencyStrategy token should not be set when header is absent (0 = unset)") + .isEqualTo((byte) 0); + } + + @Test(groups = { "unit" }) + public void thinClient_wrapInHttpRequest_readConsistencyStrategyPresent_consistencyLevelAbsent() throws Exception { + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + request.getHeaders().put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, "LatestCommitted"); + request.getHeaders().remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("ReadConsistencyStrategy should be LatestCommitted (0x03)") + .isEqualTo((byte) 0x03); + Byte clValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel should not be set when ReadConsistencyStrategy is present (0 = unset)") + .isEqualTo((byte) 0); + } + + // endregion + + // region Contention — both ConsistencyLevel and ReadConsistencyStrategy headers present. Tests resolveEffectiveConsistencyHeaders() + // followed by wrapInHttpRequest() to verify the correct header wins on the wire. + + @Test(groups = { "unit" }) + public void thinClient_resolveAndWrap_bothClAndReadConsistencyStrategy_onlyReadConsistencyStrategySurvivesInFrame() throws Exception { + // End-to-end chain: dirty headers (both ConsistencyLevel and readConsistencyStrategy set) + // → resolveEffectiveConsistencyHeaders (strips ConsistencyLevel) + // → wrapInHttpRequest (encodes RNTBD frame) + // → verify only readConsistencyStrategy in the frame, ConsistencyLevel absent + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + // Pre-resolution state: both headers present (as getRequestHeaders would set them + // before resolveEffectiveConsistencyHeaders runs in performRequestInternalCore) + request.getHeaders().put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, "Session"); + request.getHeaders().put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, "LatestCommitted"); + + // Run the same resolution logic that performRequestInternalCore() calls + resolveEffectiveConsistencyHeaders(request); + + // Now call wrapInHttpRequest with the resolved headers + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("readConsistencyStrategy=LatestCommitted (0x03) should survive in the RNTBD frame") + .isEqualTo((byte) 0x03); + Byte clValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel should be stripped — only readConsistencyStrategy survives on the wire (0 = unset)") + .isEqualTo((byte) 0); + } + + @Test(groups = { "unit" }) + public void thinClient_resolveAndWrap_requestContextReadConsistencyStrategy_overridesHeaderReadConsistencyStrategy() throws Exception { + // Header-level ReadConsistencyStrategy ("Eventual") = set by CosmosClientBuilder.readConsistencyStrategy(), + // applied to every request via getRequestHeaders(). + // Request-level ReadConsistencyStrategy (LATEST_COMMITTED) = set by CosmosItemRequestOptions.setReadConsistencyStrategy(), + // a per-operation override stored in requestContext. + // Resolution rule: requestContext (per-request) > headers (client-level). + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + request.getHeaders().put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, "Eventual"); // client-level + request.getHeaders().put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, "Session"); + request.requestContext.readConsistencyStrategy = ReadConsistencyStrategy.LATEST_COMMITTED; // per-request override + + resolveEffectiveConsistencyHeaders(request); + + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("Request-level readConsistencyStrategy=LatestCommitted should override header-level Eventual") + .isEqualTo((byte) 0x03); + Byte clValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel should be stripped (0 = unset)") + .isEqualTo((byte) 0); + } + + @Test(groups = { "unit" }) + public void thinClient_resolveAndWrap_defaultReadConsistencyStrategy_consistencyLevelSurvives() throws Exception { + // DEFAULT readConsistencyStrategy is transparent — ConsistencyLevel should remain in the RNTBD frame + ThinClientStoreModel storeModel = createMockThinClientStoreModel(); + + RxDocumentServiceRequest request = createDocumentReadRequest(); + request.getHeaders().put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, "Session"); + request.requestContext.readConsistencyStrategy = ReadConsistencyStrategy.DEFAULT; + + resolveEffectiveConsistencyHeaders(request); + + HttpRequest httpRequest = + storeModel.wrapInHttpRequest(request, URI.create("https://test-proxy:10250/")); + + byte[] rntbdFrame = collectHttpBody(httpRequest); + RntbdRequest decoded = decodeRntbdFrame(rntbdFrame); + Byte rcsValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(rcsValue) + .as("DEFAULT readConsistencyStrategy should not be set (0 = unset)") + .isEqualTo((byte) 0); + Byte clValue = decoded.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel=Session should survive when readConsistencyStrategy is DEFAULT") + .isEqualTo(RntbdConstants.RntbdConsistencyLevel.Session.id()); + } + + @Test(groups = { "unit" }) + public void resolve_noHeaders_noOp() { + Map headers = new java.util.HashMap<>(); + RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null); + assertThat(headers.size()).isEqualTo(0); + } + + @Test(groups = { "unit" }) + public void resolve_idempotent_multipleInvocations() { + // Resolution should be idempotent — multiple calls produce the same result. + // Validates safety for shared header maps across availability strategy clones. + Map headers = new java.util.HashMap<>(); + headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, "Session"); + headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, "LatestCommitted"); + + RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null); + RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null); + RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null); + + assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)).isFalse(); + assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .isEqualTo("LatestCommitted"); + } + + // endregion + + // region Helpers + + private static ThinClientStoreModel createMockThinClientStoreModel() { + DatabaseAccount mockAccount = Mockito.mock(DatabaseAccount.class); + Mockito.when(mockAccount.getId()).thenReturn("test-account"); + + GlobalEndpointManager mockGem = Mockito.mock(GlobalEndpointManager.class); + Mockito.when(mockGem.getLatestDatabaseAccount()).thenReturn(mockAccount); + + return new ThinClientStoreModel( + null, // clientContext — not used in wrapInHttpRequest + Mockito.mock(ISessionContainer.class), + ConsistencyLevel.SESSION, + new UserAgentContainer(), + mockGem, + Mockito.mock(HttpClient.class)); + } + + private static RxDocumentServiceRequest createDocumentReadRequest() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + null, OperationType.Read, "dbs/testdb/colls/testcoll/docs/testdoc", ResourceType.Document); + // Set resolved partition key range to avoid NPE in wrapInHttpRequest + request.requestContext.resolvedPartitionKeyRange = new PartitionKeyRange(); + request.requestContext.resolvedPartitionKeyRange.setMinInclusive("00"); + request.requestContext.resolvedPartitionKeyRange.setMaxExclusive("FF"); + return request; + } + + private static byte[] collectHttpBody(HttpRequest httpRequest) { + return httpRequest.body().reduce((a, b) -> { + byte[] merged = new byte[a.length + b.length]; + System.arraycopy(a, 0, merged, 0, a.length); + System.arraycopy(b, 0, merged, a.length, b.length); + return merged; + }).block(); + } + + private static void resolveEffectiveConsistencyHeaders(RxDocumentServiceRequest request) { + RxGatewayStoreModel.resolveEffectiveConsistencyHeaders( + request.getHeaders(), + request.requestContext != null ? request.requestContext.readConsistencyStrategy : null); + } + + // region RNTBD frame helpers + + /** + * Decodes the RNTBD binary frame using the production decoder. + * Token presence/absence is determined by the actual RNTBD wire format. + */ + private static RntbdRequest decodeRntbdFrame(byte[] rntbdFrame) { + ByteBuf buffer = Unpooled.wrappedBuffer(rntbdFrame); + return RntbdRequest.decode(buffer); + } + + // endregion +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java new file mode 100644 index 000000000000..ac2eb8d6d261 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java @@ -0,0 +1,719 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.FlakyTestRetryAnalyzer; +import com.azure.cosmos.GatewayConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.implementation.BadRequestException; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.CosmosRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Unified E2E tests for ReadConsistencyStrategy propagation across Gateway V1 and Gateway V2. + * + *

Creates two {@link CosmosAsyncClient} instances — one for each transport: + *

+ * + *

Does not extend {@link TestSuiteBase} to avoid {@code @BeforeSuite} shared container + * initialization which requires provisioned throughput (incompatible with serverless accounts). + * Uses {@link TestSuiteBase#createCollection(CosmosAsyncClient, String, CosmosContainerProperties)} + * as a static utility for serverless-safe container creation. + * + *

Run with test group "thinclient". + */ +public class GatewayReadConsistencyStrategyE2ETest { + private static final Logger logger = LoggerFactory.getLogger(GatewayReadConsistencyStrategyE2ETest.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final long TIMEOUT = 60_000L; + + private static final String GATEWAY_V1 = "GatewayV1"; + private static final String GATEWAY_V2 = "GatewayV2"; + private static final String DIRECT = "Direct"; + + private CosmosAsyncClient gatewayV1Client; + private CosmosAsyncClient gatewayV2Client; + private CosmosAsyncClient directClient; + private CosmosAsyncDatabase database; + private CosmosAsyncContainer gatewayV1Container; + private CosmosAsyncContainer gatewayV2Container; + private CosmosAsyncContainer directContainer; + private String databaseId; + private String containerId; + + @BeforeClass(groups = {"thinclient"}, timeOut = TIMEOUT) + public void beforeClass() { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + + databaseId = "readConsistencyStrategy-e2e-" + UUID.randomUUID().toString().substring(0, 8); + containerId = "testcontainer"; + + gatewayV1Client = createGatewayV1Builder().buildAsyncClient(); + + gatewayV1Client.createDatabaseIfNotExists(databaseId).block(); + database = gatewayV1Client.getDatabase(databaseId); + + CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/pk"); + TestSuiteBase.createCollection(gatewayV1Client, databaseId, containerProperties); + + gatewayV1Container = gatewayV1Client.getDatabase(databaseId).getContainer(containerId); + + // Gateway V2 client — HTTP/2 enabled, thin client flag set + gatewayV2Client = createGatewayV2Builder().buildAsyncClient(); + gatewayV2Container = gatewayV2Client.getDatabase(databaseId).getContainer(containerId); + + // Direct mode client + directClient = createDirectBuilder().buildAsyncClient(); + directContainer = directClient.getDatabase(databaseId).getContainer(containerId); + + logger.info("Created E2E test resources: db={}, container={}", databaseId, containerId); + } + + @AfterClass(groups = {"thinclient"}, alwaysRun = true) + public void afterClass() { + if (database != null) { + try { + database.delete().block(); + } catch (Exception e) { + logger.warn("Failed to delete test database", e); + } + } + safeClose(gatewayV1Client); + safeClose(gatewayV2Client); + safeClose(directClient); + } + + @DataProvider(name = "transportModes") + public Object[][] transportModes() { + return new Object[][] { { GATEWAY_V1 }, { GATEWAY_V2 }, { DIRECT } }; + } + + // region ItemRequestOptions — point reads + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readItem_withLatestCommitted(String mode) { + String id = seedDocument(mode); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse response = + containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readItem_withEventual(String mode) { + String id = seedDocument(mode); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL); + + CosmosItemResponse response = + containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.EVENTUAL); + assertEndpointForMode(mode, response.getDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readItem_withSession(String mode) { + String id = seedDocument(mode); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION); + + CosmosItemResponse response = + containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, response.getDiagnostics()); + } + + // endregion + + // region QueryRequestOptions + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void queryItems_withLatestCommitted(String mode) { + String id = seedDocument(mode); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setPartitionKey(new PartitionKey(id)) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + SqlQuerySpec querySpec = new SqlQuerySpec("SELECT * FROM c WHERE c.id=@id"); + querySpec.setParameters(Arrays.asList(new SqlParameter("@id", id))); + + FeedResponse response = containerFor(mode) + .queryItems(querySpec, queryOptions, ObjectNode.class) + .byPage() + .blockFirst(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void queryItems_withSession(String mode) { + String id = seedDocument(mode); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setPartitionKey(new PartitionKey(id)) + .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION); + + SqlQuerySpec querySpec = new SqlQuerySpec("SELECT * FROM c WHERE c.id=@id"); + querySpec.setParameters(Arrays.asList(new SqlParameter("@id", id))); + + FeedResponse response = containerFor(mode) + .queryItems(querySpec, queryOptions, ObjectNode.class) + .byPage() + .blockFirst(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + // endregion + + // region ReadAllItems + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readAllItems_withLatestCommitted(String mode) { + String pk = UUID.randomUUID().toString(); + seedDocument(mode, UUID.randomUUID().toString(), pk); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse response = containerFor(mode) + .readAllItems(new PartitionKey(pk), queryOptions, ObjectNode.class) + .byPage() + .blockFirst(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readAllItems_withSession(String mode) { + String pk = UUID.randomUUID().toString(); + seedDocument(mode, UUID.randomUUID().toString(), pk); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION); + + FeedResponse response = containerFor(mode) + .readAllItems(new PartitionKey(pk), queryOptions, ObjectNode.class) + .byPage() + .blockFirst(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + // endregion + + // region ChangeFeedRequestOptions + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void changeFeed_withLatestCommitted(String mode) { + String pkValue = UUID.randomUUID().toString(); + seedDocument(mode, pkValue, pkValue); + + CosmosChangeFeedRequestOptions cfOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning( + FeedRange.forLogicalPartition(new PartitionKey(pkValue))) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + List> pages = containerFor(mode) + .queryChangeFeed(cfOptions, ObjectNode.class) + .byPage() + .collectList() + .block(); + + assertThat(pages).isNotNull(); + assertThat(pages.isEmpty()).isFalse(); + + FeedResponse firstPage = pages.get(0); + assertEffectiveReadConsistencyStrategy(firstPage.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, firstPage.getCosmosDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void changeFeed_withSession(String mode) { + String pkValue = UUID.randomUUID().toString(); + seedDocument(mode, pkValue, pkValue); + + CosmosChangeFeedRequestOptions cfOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning( + FeedRange.forLogicalPartition(new PartitionKey(pkValue))) + .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION); + + List> pages = containerFor(mode) + .queryChangeFeed(cfOptions, ObjectNode.class) + .byPage() + .collectList() + .block(); + + assertThat(pages).isNotNull(); + assertThat(pages.isEmpty()).isFalse(); + + FeedResponse firstPage = pages.get(0); + assertEffectiveReadConsistencyStrategy(firstPage.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, firstPage.getCosmosDiagnostics()); + } + + // endregion + + // region ReadManyRequestOptions + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readMany_withLatestCommitted(String mode) { + String pkValue = UUID.randomUUID().toString(); + String id1 = UUID.randomUUID().toString(); + String id2 = UUID.randomUUID().toString(); + seedDocument(mode, id1, pkValue); + seedDocument(mode, id2, pkValue); + + List identities = Arrays.asList( + new CosmosItemIdentity(new PartitionKey(pkValue), id1), + new CosmosItemIdentity(new PartitionKey(pkValue), id2)); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse response = + containerFor(mode).readMany(identities, readManyOptions, ObjectNode.class).block(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertThat(response.getResults().size()).isEqualTo(2); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readMany_withSession(String mode) { + String pkValue = UUID.randomUUID().toString(); + String id1 = UUID.randomUUID().toString(); + String id2 = UUID.randomUUID().toString(); + seedDocument(mode, id1, pkValue); + seedDocument(mode, id2, pkValue); + + List identities = Arrays.asList( + new CosmosItemIdentity(new PartitionKey(pkValue), id1), + new CosmosItemIdentity(new PartitionKey(pkValue), id2)); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION); + + FeedResponse response = + containerFor(mode).readMany(identities, readManyOptions, ObjectNode.class).block(); + + assertThat(response).isNotNull(); + assertThat(response.getResults()).isNotNull(); + assertThat(response.getResults().size()).isEqualTo(2); + assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, response.getCosmosDiagnostics()); + } + + // endregion + + // region Client-level ReadConsistencyStrategy + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void clientLevel_latestCommitted_readItem(String mode) { + CosmosAsyncClient clientWithReadConsistencyStrategy = null; + try { + clientWithReadConsistencyStrategy = builderFor(mode) + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .buildAsyncClient(); + CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(targetContainer, id); + + CosmosItemResponse response = + targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(clientWithReadConsistencyStrategy); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void clientLevel_session_readItem(String mode) { + CosmosAsyncClient clientWithReadConsistencyStrategy = null; + try { + clientWithReadConsistencyStrategy = builderFor(mode) + .readConsistencyStrategy(ReadConsistencyStrategy.SESSION) + .buildAsyncClient(); + CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(targetContainer, id); + + CosmosItemResponse response = + targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.SESSION); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(clientWithReadConsistencyStrategy); + } + } + + // endregion + + // region Write operations— ReadConsistencyStrategy forced to DEFAULT + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void writeItem_readConsistencyStrategyIgnored(String mode) { + CosmosAsyncClient clientWithReadConsistencyStrategy = null; + try { + clientWithReadConsistencyStrategy = builderFor(mode) + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .buildAsyncClient(); + CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + ObjectNode doc = createDocument(id); + + CosmosItemResponse response = + targetContainer.createItem(doc, new PartitionKey(id), null).block(); + + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(201); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.DEFAULT); + } finally { + safeClose(clientWithReadConsistencyStrategy); + } + } + + // endregion + + // region Validation — GLOBAL_STRONG on Session account + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void readItem_globalStrong_invalidAccount_throwsBadRequest(String mode) { + String id = seedDocument(mode); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.GLOBAL_STRONG); + + Throwable caughtError = null; + try { + containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + } catch (Throwable t) { + caughtError = t; + } + + assertThat(caughtError) + .as("Expected BadRequestException for GLOBAL_STRONG on Session account") + .isNotNull() + .isInstanceOf(BadRequestException.class); + assertThat(caughtError.getMessage()).contains("read-consistency-strategy"); + } + + // endregion + + // region Contention — both ConsistencyLevel and ReadConsistencyStrategy set + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void clientLevel_bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins(String mode) { + CosmosAsyncClient clientWithBoth = null; + try { + clientWithBoth = builderFor(mode) + .consistencyLevel(ConsistencyLevel.EVENTUAL) + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .buildAsyncClient(); + CosmosAsyncContainer targetContainer = clientWithBoth.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(targetContainer, id); + + CosmosItemResponse response = + targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(clientWithBoth); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void requestLevel_bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins(String mode) { + String id = seedDocument(mode); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setConsistencyLevel(ConsistencyLevel.EVENTUAL) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse response = + containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void requestLevelReadConsistencyStrategy_overridesClientLevelReadConsistencyStrategy(String mode) { + CosmosAsyncClient clientWithClientReadConsistencyStrategy = null; + try { + clientWithClientReadConsistencyStrategy = builderFor(mode) + .readConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL) + .buildAsyncClient(); + CosmosAsyncContainer targetContainer = clientWithClientReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(targetContainer, id); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse response = + targetContainer.readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(clientWithClientReadConsistencyStrategy); + } + } + + // endregion + + // region Operation policy + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void operationPolicy_setsReadConsistencyStrategy(String mode) { + CosmosAsyncClient policyClient = null; + try { + policyClient = builderFor(mode) + .addOperationPolicy(cosmosOperationDetails -> { + CosmosRequestOptions overrides = new CosmosRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + cosmosOperationDetails.setRequestOptions(overrides); + }) + .buildAsyncClient(); + CosmosAsyncContainer policyContainer = policyClient.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(policyContainer, id); + + CosmosItemResponse response = + policyContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(policyClient); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void operationPolicy_readConsistencyStrategyOverridesRequestLevel(String mode) { + CosmosAsyncClient policyClient = null; + try { + policyClient = builderFor(mode) + .addOperationPolicy(cosmosOperationDetails -> { + CosmosRequestOptions overrides = new CosmosRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL); + cosmosOperationDetails.setRequestOptions(overrides); + }) + .buildAsyncClient(); + CosmosAsyncContainer policyContainer = policyClient.getDatabase(databaseId).getContainer(containerId); + + String id = UUID.randomUUID().toString(); + createAndInsertDocument(policyContainer, id); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse response = + policyContainer.readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block(); + + assertSuccessfulRead(response); + assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.EVENTUAL); + assertEndpointForMode(mode, response.getDiagnostics()); + } finally { + safeClose(policyClient); + } + } + + // endregion + + // region Helpers — builders + + private static CosmosClientBuilder createGatewayV1Builder() { + return new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .gatewayMode() + .consistencyLevel(ConsistencyLevel.SESSION); + } + + private static CosmosClientBuilder createGatewayV2Builder() { + GatewayConnectionConfig gwConfig = new GatewayConnectionConfig(); + gwConfig.setHttp2ConnectionConfig(new Http2ConnectionConfig().setEnabled(true)); + + return new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .gatewayMode(gwConfig) + .consistencyLevel(ConsistencyLevel.SESSION); + } + + private static CosmosClientBuilder createDirectBuilder() { + return new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .directMode() + .consistencyLevel(ConsistencyLevel.SESSION); + } + + private CosmosClientBuilder builderFor(String mode) { + switch (mode) { + case GATEWAY_V2: return createGatewayV2Builder(); + case DIRECT: return createDirectBuilder(); + default: return createGatewayV1Builder(); + } + } + + private CosmosAsyncContainer containerFor(String mode) { + switch (mode) { + case GATEWAY_V2: return gatewayV2Container; + case DIRECT: return directContainer; + default: return gatewayV1Container; + } + } + + private static boolean isGatewayV2(String mode) { + return GATEWAY_V2.equals(mode); + } + + // endregion + + // region Helpers — documents + + private ObjectNode createDocument(String id) { + return createDocument(id, id); + } + + private ObjectNode createDocument(String id, String pk) { + ObjectNode doc = OBJECT_MAPPER.createObjectNode(); + doc.put("id", id); + doc.put("pk", pk); + return doc; + } + + private String seedDocument(String mode) { + String id = UUID.randomUUID().toString(); + seedDocument(mode, id, id); + return id; + } + + private void seedDocument(String mode, String id, String pk) { + ObjectNode doc = createDocument(id, pk); + containerFor(mode).createItem(doc, new PartitionKey(pk), null).block(); + } + + private void createAndInsertDocument(CosmosAsyncContainer targetContainer, String id) { + ObjectNode doc = createDocument(id); + targetContainer.createItem(doc, new PartitionKey(id), null).block(); + } + + // endregion + + // region Helpers — assertions + + private static void assertSuccessfulRead(CosmosItemResponse response) { + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(200); + } + + private static void assertEffectiveReadConsistencyStrategy(CosmosDiagnostics diagnostics, ReadConsistencyStrategy expected) { + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.getDiagnosticsContext()).isNotNull(); + assertThat(diagnostics.getDiagnosticsContext().getEffectiveReadConsistencyStrategy()) + .isEqualTo(expected); + } + + private void assertEndpointForMode(String mode, CosmosDiagnostics diagnostics) { + if (isGatewayV2(mode)) { + TestSuiteBase.assertThinClientEndpointUsed(diagnostics); + } + } + + private static void safeClose(CosmosAsyncClient clientToClose) { + if (clientToClose != null) { + try { + clientToClose.close(); + } catch (Exception e) { + logger.warn("Failed to close client", e); + } + } + } + + // endregion +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java new file mode 100644 index 000000000000..16b59d541a1e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java @@ -0,0 +1,486 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.rx; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.GatewayConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.ConnectionPolicy; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.implementation.SpyClientUnderTestFactory; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest; +import com.azure.cosmos.implementation.http.HttpRequest; +import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Spy-wire tests for ReadConsistencyStrategy header propagation across both transports. + * + *

Creates two spy clients that intercept at the HTTP client layer: + *

+ * + *

V2 tests are skipped when the test account does not have thin client read locations + * (the spy client falls back to V1 silently — we detect this and skip rather than false-pass). + */ +public class GatewayReadConsistencyStrategySpyWireTest { + private static final Logger logger = LoggerFactory.getLogger(GatewayReadConsistencyStrategySpyWireTest.class); + + private static ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.CosmosItemRequestOptionsAccessor getItemOptionsAccessor() { + return ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.getCosmosItemRequestOptionsAccessor(); + } + + private static final long TIMEOUT = 60_000L; + private static final String DOCUMENT_ID = UUID.randomUUID().toString(); + + // V1 transport — HTTP/1, gateway mode, no thin client + private static final String V1 = "GatewayV1"; + // V2 transport — HTTP/2, thin client enabled + private static final String V2 = "GatewayV2"; + + private CosmosAsyncClient cosmosClient; + private CosmosAsyncDatabase database; + private CosmosAsyncContainer container; + private String databaseId; + private String containerId; + + private SpyClientUnderTestFactory.ClientUnderTest v1SpyClient; + private SpyClientUnderTestFactory.ClientUnderTest v2SpyClient; + + @BeforeClass(groups = {"thinclient"}, timeOut = TIMEOUT) + public void beforeClass() { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + + cosmosClient = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .gatewayMode() + .buildAsyncClient(); + + databaseId = "ReadConsistencyStrategy-spy-" + UUID.randomUUID().toString().substring(0, 8); + containerId = "testcontainer"; + + cosmosClient.createDatabaseIfNotExists(databaseId).block(); + + CosmosContainerProperties props = new CosmosContainerProperties(containerId, "/mypk"); + container = TestSuiteBase.createCollection(cosmosClient, databaseId, props); + database = cosmosClient.getDatabase(databaseId); + + // Seed a document + ObjectNode doc = com.fasterxml.jackson.databind.node.JsonNodeFactory.instance.objectNode(); + doc.put("id", DOCUMENT_ID); + doc.put("mypk", DOCUMENT_ID); + container.createItem(doc).block(); + + // V1 spy — HTTP/1, no Http2ConnectionConfig → useThinClient = false + v1SpyClient = createSpyClient(null, false); + + // V2 spy — HTTP/2 enabled, thin client JVM flag set + v2SpyClient = createSpyClient(null, true); + + logger.info("Created spy-wire test resources: db={}, container={}", databaseId, containerId); + } + + @AfterClass(groups = {"thinclient"}, alwaysRun = true) + public void afterClass() { + if (database != null) { + try { + database.delete().block(); + } catch (Exception e) { + logger.warn("Failed to delete test database", e); + } + } + if (cosmosClient != null) { + cosmosClient.close(); + } + if (v1SpyClient != null) { + v1SpyClient.close(); + } + if (v2SpyClient != null) { + v2SpyClient.close(); + } + } + + @DataProvider(name = "transportModes") + public Object[][] transportModes() { + return new Object[][] { { V1 }, { V2 } }; + } + + // region No contention — single header set + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void requestLevelReadConsistencyStrategy_headerOnWire_consistencyLevelStripped(String mode) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + assertReadConsistencyStrategyOnWire(mode, spyFor(mode), options, "LatestCommitted", (byte) 0x03, true); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void clientLevelReadConsistencyStrategy_headerOnWire_consistencyLevelStripped(String mode) { + SpyClientUnderTestFactory.ClientUnderTest readConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.LATEST_COMMITTED, isGatewayV2(mode)); + try { + assertReadConsistencyStrategyOnWire(mode, readConsistencyStrategyClient, new RequestOptions(), "LatestCommitted", (byte) 0x03, true); + } finally { + readConsistencyStrategyClient.close(); + } + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void defaultReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire(String mode) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + options.setReadConsistencyStrategy(ReadConsistencyStrategy.DEFAULT); + + assertNoReadConsistencyStrategyOnWire(mode, spyFor(mode), options); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void clientLeveldefaultReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire(String mode) { + SpyClientUnderTestFactory.ClientUnderTest defaultReadConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.DEFAULT, isGatewayV2(mode)); + try { + assertNoReadConsistencyStrategyOnWire(mode, defaultReadConsistencyStrategyClient, new RequestOptions()); + } finally { + defaultReadConsistencyStrategyClient.close(); + } + } + + // endregion + + // region Contention — both ConsistencyLevel and ReadConsistencyStrategy present, resolution determines the winner + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins_consistencyLevelStripped(String mode) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + options.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + + assertReadConsistencyStrategyOnWire(mode, spyFor(mode), options, "LatestCommitted", (byte) 0x03, true); + } + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes") + public void requestLevelReadConsistencyStrategy_overridesClientLevelReadConsistencyStrategy(String mode) { + // Client-level ReadConsistencyStrategy = EVENTUAL (applied to every request header via builder) + // Request-level ReadConsistencyStrategy = LATEST_COMMITTED (per-operation override via requestContext) + // Resolution: request-level wins. + SpyClientUnderTestFactory.ClientUnderTest eventualReadConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.EVENTUAL, isGatewayV2(mode)); + try { + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + assertReadConsistencyStrategyOnWire(mode, eventualReadConsistencyStrategyClient, options, "LatestCommitted", (byte) 0x03, true); + } finally { + eventualReadConsistencyStrategyClient.close(); + } + } + + // endregion + + // region Write operations — ReadConsistencyStrategy should not appear on writes (V1 only, writes don't route to thin client) + + @Test(groups = {"thinclient"}, timeOut = TIMEOUT) + public void writeWithClientLevelReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire() { + SpyClientUnderTestFactory.ClientUnderTest readConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.LATEST_COMMITTED, false); + try { + String writeDocId = UUID.randomUUID().toString(); + Document writeDoc = new Document(String.format( + "{ \"id\": \"%s\", \"mypk\": \"%s\" }", writeDocId, writeDocId)); + + readConsistencyStrategyClient.clearCapturedRequests(); + readConsistencyStrategyClient.createDocument(getCollectionLink(), writeDoc, null, false).block(); + + List requests = readConsistencyStrategyClient.getCapturedRequests(); + assertThat(requests).isNotEmpty(); + + HttpRequest createRequest = requests.stream() + .filter(r -> "POST".equalsIgnoreCase(r.httpMethod().toString())) + .findFirst() + .orElse(null); + assertThat(createRequest).as("Expected a document create request").isNotNull(); + + Map headers = createRequest.headers().toMap(); + assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .as("Write operations should not have ReadConsistencyStrategy header") + .isFalse(); + } finally { + readConsistencyStrategyClient.close(); + } + } + + // endregion + + // region Assertion helpers — branch on transport mode + + private void assertReadConsistencyStrategyOnWire( + String mode, + SpyClientUnderTestFactory.ClientUnderTest client, + CosmosItemRequestOptions cosmosOptions, + String expectedHeaderValue, + byte expectedRntbdByte, + boolean expectClStripped) { + + HttpRequest captured = executeReadAndCapture(mode, client, cosmosOptions); + + if (isGatewayV2(mode)) { + // Verify the request actually routed through thin client (port 10250) + assertThat(captured.uri().toString()) + .as("V2 request should target thin client proxy endpoint (port 10250)") + .contains(":10250"); + + // V2: decode RNTBD frame and inspect typed tokens + RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured)); + Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(readConsistencyStrategyValue) + .as("ReadConsistencyStrategy token value should be 0x%02X", expectedRntbdByte) + .isNotNull() + .isEqualTo(expectedRntbdByte); + if (expectClStripped) { + Byte clValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel RNTBD token should not be set when ReadConsistencyStrategy is active (0 = unset)") + .isEqualTo((byte) 0); + } + } else { + // V1: inspect HTTP headers + Map headers = captured.headers().toMap(); + assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .isEqualTo(expectedHeaderValue); + if (expectClStripped) { + assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)) + .as("ConsistencyLevel header should be stripped when ReadConsistencyStrategy is set") + .isFalse(); + } + } + } + + private void assertReadConsistencyStrategyOnWire( + String mode, + SpyClientUnderTestFactory.ClientUnderTest client, + RequestOptions requestOptions, + String expectedHeaderValue, + byte expectedRntbdByte, + boolean expectClStripped) { + + HttpRequest captured = executeReadAndCapture(mode, client, requestOptions); + + if (isGatewayV2(mode)) { + assertThat(captured.uri().toString()) + .as("V2 request should target thin client proxy endpoint (port 10250)") + .contains(":10250"); + + RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured)); + Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(readConsistencyStrategyValue) + .as("ReadConsistencyStrategy token value should be 0x%02X", expectedRntbdByte) + .isNotNull() + .isEqualTo(expectedRntbdByte); + if (expectClStripped) { + Byte clValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel); + assertThat(clValue) + .as("ConsistencyLevel RNTBD token should not be set when ReadConsistencyStrategy is active (0 = unset)") + .isEqualTo((byte) 0); + } + } else { + Map headers = captured.headers().toMap(); + assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .isEqualTo(expectedHeaderValue); + if (expectClStripped) { + assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)) + .as("ConsistencyLevel header should be stripped when ReadConsistencyStrategy is set") + .isFalse(); + } + } + } + + private void assertNoReadConsistencyStrategyOnWire(String mode, SpyClientUnderTestFactory.ClientUnderTest client, CosmosItemRequestOptions cosmosOptions) { + HttpRequest captured = executeReadAndCapture(mode, client, cosmosOptions); + + if (isGatewayV2(mode)) { + assertThat(captured.uri().toString()) + .as("V2 request should target thin client proxy endpoint (port 10250)") + .contains(":10250"); + + RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured)); + Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(readConsistencyStrategyValue) + .as("ReadConsistencyStrategy RNTBD token should not be set when DEFAULT (0 = unset)") + .isEqualTo((byte) 0); + } else { + Map headers = captured.headers().toMap(); + assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .as("DEFAULT ReadConsistencyStrategy should not emit a header") + .isFalse(); + } + } + + private void assertNoReadConsistencyStrategyOnWire(String mode, SpyClientUnderTestFactory.ClientUnderTest client, RequestOptions requestOptions) { + HttpRequest captured = executeReadAndCapture(mode, client, requestOptions); + + if (isGatewayV2(mode)) { + assertThat(captured.uri().toString()) + .as("V2 request should target thin client proxy endpoint (port 10250)") + .contains(":10250"); + + RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured)); + Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy); + assertThat(readConsistencyStrategyValue) + .as("ReadConsistencyStrategy RNTBD token should not be set when DEFAULT (0 = unset)") + .isEqualTo((byte) 0); + } else { + Map headers = captured.headers().toMap(); + assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) + .as("DEFAULT ReadConsistencyStrategy should not emit a header") + .isFalse(); + } + } + + // endregion + + // region Execution + capture helpers + + private HttpRequest executeReadAndCapture(String mode, SpyClientUnderTestFactory.ClientUnderTest client, CosmosItemRequestOptions cosmosOptions) { + client.clearCapturedRequests(); + RequestOptions requestOptions = getItemOptionsAccessor().toRequestOptions(cosmosOptions); + requestOptions.setPartitionKey(new PartitionKey(DOCUMENT_ID)); + client.readDocument(getDocumentLink(), requestOptions).block(); + + List requests = client.getCapturedRequests(); + assertThat(requests).isNotEmpty(); + + HttpRequest docRequest = findDocumentReadRequest(mode, requests); + assertThat(docRequest).as("Expected a document read request").isNotNull(); + return docRequest; + } + + private HttpRequest executeReadAndCapture(String mode, SpyClientUnderTestFactory.ClientUnderTest client, RequestOptions requestOptions) { + client.clearCapturedRequests(); + requestOptions.setPartitionKey(new PartitionKey(DOCUMENT_ID)); + client.readDocument(getDocumentLink(), requestOptions).block(); + + List requests = client.getCapturedRequests(); + assertThat(requests).isNotEmpty(); + + HttpRequest docRequest = findDocumentReadRequest(mode, requests); + assertThat(docRequest).as("Expected a document read request").isNotNull(); + return docRequest; + } + + // endregion + + // region Factory + utility helpers + + private SpyClientUnderTestFactory.ClientUnderTest spyFor(String mode) { + return isGatewayV2(mode) ? v2SpyClient : v1SpyClient; + } + + private static boolean isGatewayV2(String mode) { + return V2.equals(mode); + } + + private SpyClientUnderTestFactory.ClientUnderTest createSpyClient(ReadConsistencyStrategy ReadConsistencyStrategy, boolean http2Enabled) { + ConnectionPolicy gwPolicy = new ConnectionPolicy(GatewayConnectionConfig.getDefaultConfig()); + if (http2Enabled) { + gwPolicy.setHttp2ConnectionConfig(new Http2ConnectionConfig().setEnabled(true)); + } + try { + return SpyClientUnderTestFactory.createClientUnderTest( + new URI(TestConfigurations.HOST), + TestConfigurations.MASTER_KEY, + gwPolicy, + ConsistencyLevel.SESSION, + ReadConsistencyStrategy, + new Configs(), + null, + true, + new CosmosClientTelemetryConfig()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private String getDocumentLink() { + return "dbs/" + databaseId + "/colls/" + containerId + "/docs/" + DOCUMENT_ID; + } + + private String getCollectionLink() { + return "dbs/" + databaseId + "/colls/" + containerId; + } + + private HttpRequest findDocumentReadRequest(String mode, List requests) { + for (HttpRequest request : requests) { + String uri = request.uri().toString(); + if (isGatewayV2(mode)) { + // Thin client sends all requests as POST to proxy (:10250) with RNTBD frame body + if ("POST".equalsIgnoreCase(request.httpMethod().toString()) && uri.contains(":10250")) { + return request; + } + } else { + // Gateway V1 sends document reads as GET with the document link in the URI + if ("GET".equalsIgnoreCase(request.httpMethod().toString()) && uri.contains(getDocumentLink())) { + return request; + } + } + } + return null; + } + + // endregion + + // region RNTBD frame inspection helpers (for V2 assertions) + + private static byte[] collectHttpBody(HttpRequest httpRequest) { + return httpRequest.body().reduce((a, b) -> { + byte[] merged = new byte[a.length + b.length]; + System.arraycopy(a, 0, merged, 0, a.length); + System.arraycopy(b, 0, merged, a.length, b.length); + return merged; + }).block(); + } + + /** + * Decodes the RNTBD binary frame and returns the typed request object. + * Uses the production decoder (RntbdRequest.decode) so token presence/absence + * is determined by the actual RNTBD wire format, not brute-force byte scanning. + */ + private static RntbdRequest decodeRntbdFrame(byte[] rntbdFrame) { + ByteBuf buffer = Unpooled.wrappedBuffer(rntbdFrame); + return RntbdRequest.decode(buffer); + } + + // endregion +} diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 89a22c2d1288..bf58280163fb 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added support for change feed with `startFrom` point-in-time on merged partitions by enabling the `CHANGE_FEED_WITH_START_TIME_POST_MERGE` SDK capability. - See [PR 48752](https://github.com/Azure/azure-sdk-for-java/pull/48752) +* Enabled `ReadConsistencyStrategy` for Gateway V1 (compute gateway) and Gateway V2 (thin client proxy). Previously only supported in Direct mode. Also added client-side validation for `GLOBAL_STRONG` on non-Strong consistency accounts. - See [PR 48787](https://github.com/Azure/azure-sdk-for-java/pull/48787) * Added new `readManyByPartitionKeys` API on `CosmosAsyncContainer` / `CosmosContainer` to bulk-query all documents matching a list of partition key values with better efficiency than issuing individual queries. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801) * Added `CosmosReadManyByPartitionKeysRequestOptions` - a dedicated request-options type for `readManyByPartitionKeys` that exposes `setContinuationToken(String)` for resuming previous invocations and `setMaxConcurrentBatchPrefetch(int)` to bound per-call prefetch parallelism. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801) * Added `CosmosReadManyByPartitionKeysRequestOptions.setMaxBatchSize(Integer)` to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index c8bca12fcbf4..ce2f4961fb1b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -146,20 +146,26 @@ private RxDocumentServiceRequest createDocumentServiceRequest() { if (this.options.getReadConsistencyStrategy() != null) { - String readConsistencyStrategyName = options.getReadConsistencyStrategy().toString(); - this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); - headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName); + this.client.validateReadConsistencyStrategy(options.getReadConsistencyStrategy()); + + if (this.options.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) { + headers.put( + HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, + options.getReadConsistencyStrategy().toString()); + } consistencyLevelOverrideApplicable = this.options.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT; } if (consistencyLevelOverrideApplicable && this.client.getReadConsistencyStrategy() != null) { - String readConsistencyStrategyName = this.client.getReadConsistencyStrategy().toString(); - this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); - headers.put( - HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, - readConsistencyStrategyName); + this.client.validateReadConsistencyStrategy(this.client.getReadConsistencyStrategy()); + + if (this.client.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) { + headers.put( + HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, + this.client.getReadConsistencyStrategy().toString()); + } consistencyLevelOverrideApplicable = this.client.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 25c81ed4e2a0..02146b83f96e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -1927,16 +1927,18 @@ private static void validateResource(Resource resource) { } } - public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName) { - if (this.connectionPolicy.getConnectionMode() != ConnectionMode.DIRECT - && readConsistencyStrategyName != null - && ! readConsistencyStrategyName.equalsIgnoreCase(ReadConsistencyStrategy.DEFAULT.toString())) { - - logger.warn( - "ReadConsistencyStrategy {} defined in Gateway mode. " - + "This version of the SDK only supports ReadConsistencyStrategy in DIRECT mode. " - + "This setting will be ignored.", - readConsistencyStrategyName); + public void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy) { + if (readConsistencyStrategy == ReadConsistencyStrategy.GLOBAL_STRONG) { + ConsistencyLevel accountConsistency = this.getDefaultConsistencyLevelOfAccount(); + if (accountConsistency != ConsistencyLevel.STRONG) { + throw new BadRequestException( + String.format( + RMResources.ReadConsistencyStrategyGlobalStrongOnlyAllowedForGlobalStrongAccount, + readConsistencyStrategy.toString(), + HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, + ConsistencyLevel.STRONG, + accountConsistency)); + } } } @@ -1963,8 +1965,12 @@ private Map getRequestHeaders(RequestOptions options, ResourceTy && operationType.isReadOnlyOperation()) { String readConsistencyStrategyName = readConsistencyStrategy.toString(); - this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); + this.validateReadConsistencyStrategy(readConsistencyStrategy); headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName); + // Compute gateway rejects requests with both x-ms-consistency-level and + // x-ms-cosmos-read-consistency-strategy headers. When readConsistencyStrategy is set, remove + // consistency-level — readConsistencyStrategy takes precedence. + headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); } if (options == null) { @@ -2008,13 +2014,20 @@ private Map getRequestHeaders(RequestOptions options, ResourceTy && operationType.isReadOnlyOperation()) { String readConsistencyStrategyName = options.getReadConsistencyStrategy().toString(); - this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); + this.validateReadConsistencyStrategy(options.getReadConsistencyStrategy()); headers.put( HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName); + // Compute gateway rejects requests with both x-ms-consistency-level and + // x-ms-cosmos-read-consistency-strategy headers. When readConsistencyStrategy is set, remove + // consistency-level — readConsistencyStrategy takes precedence. + headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); } - if (options.getConsistencyLevel() != null) { + if (options.getConsistencyLevel() != null + && !headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) { + // Only set ConsistencyLevel when ReadConsistencyStrategy is NOT already present. + // readConsistencyStrategy takes precedence — setting both causes gateway rejection. headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, options.getConsistencyLevel().toString()); } @@ -5432,8 +5445,8 @@ public ConsistencyLevel getConsistencyLevel() { } @Override - public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName) { - RxDocumentClientImpl.this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); + public void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy) { + RxDocumentClientImpl.this.validateReadConsistencyStrategy(readConsistencyStrategy); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 9106ab8a1abf..f5eee2100de0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -12,7 +12,6 @@ import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader; import com.azure.cosmos.implementation.directconnectivity.HttpUtils; -import com.azure.cosmos.implementation.directconnectivity.RequestHelper; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; @@ -320,9 +319,77 @@ public Mono performRequestInternal(RxDocumentServiceR }); } + /** + * Resolves contention between ConsistencyLevel and ReadConsistencyStrategy headers. + * Gateways (V1 HTTP and V2 RNTBD) reject requests carrying both headers. + * + * Rules: + * 1. Request-level readConsistencyStrategy (requestContext) > client-level readConsistencyStrategy (header) + * 2. readConsistencyStrategy > ConsistencyLevel — strip CL when non-DEFAULT readConsistencyStrategy is effective + * 3. DEFAULT readConsistencyStrategy is transparent — CL stays + * + * After this method, the request headers contain at most ONE of the two consistency headers. + * GW V1 serializes the surviving header as HTTP; GW V2 (ThinClientStoreModel) encodes it as RNTBD. + * + * Thread safety: availability-strategy clones share the same header map (shallow copy). + * The mutation here (remove CL when readConsistencyStrategy present) is idempotent — concurrent clones + * performing the same removal is safe. + */ + private void resolveEffectiveConsistencyHeaders(RxDocumentServiceRequest request) { + resolveEffectiveConsistencyHeaders( + request.getHeaders(), + request.requestContext != null ? request.requestContext.readConsistencyStrategy : null); + } + + /** + * Core resolution logic — public for direct unit testing from cross-package test classes. + * Avoids test drift from duplicated simulation logic. + */ + public static void resolveEffectiveConsistencyHeaders( + Map headers, + ReadConsistencyStrategy requestContextReadConsistencyStrategy) { + + // Determine effective readConsistencyStrategy: requestContext (request-level) takes priority over header (client-level) + ReadConsistencyStrategy effectiveReadConsistencyStrategy = null; + if (requestContextReadConsistencyStrategy != null + && requestContextReadConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) { + effectiveReadConsistencyStrategy = requestContextReadConsistencyStrategy; + } + + if (effectiveReadConsistencyStrategy == null) { + String readConsistencyStrategyHeaderValue = headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY); + if (!Strings.isNullOrEmpty(readConsistencyStrategyHeaderValue)) { + effectiveReadConsistencyStrategy = ReadConsistencyStrategy.DEFAULT; // non-null marker; actual value is in header + // Re-parse only to check non-DEFAULT — the header string is authoritative for serialization + for (ReadConsistencyStrategy candidate : ReadConsistencyStrategy.values()) { + if (candidate != ReadConsistencyStrategy.DEFAULT + && candidate.toString().equals(readConsistencyStrategyHeaderValue)) { + effectiveReadConsistencyStrategy = candidate; + break; + } + } + } + } + + if (effectiveReadConsistencyStrategy != null && effectiveReadConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) { + // readConsistencyStrategy wins — strip ConsistencyLevel to prevent dual-header rejection + headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + // Ensure the readConsistencyStrategy header is set (requestContext-level may not have been written to headers yet) + headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, effectiveReadConsistencyStrategy.toString()); + } else if (effectiveReadConsistencyStrategy == ReadConsistencyStrategy.DEFAULT) { + // DEFAULT is transparent — remove the sentinel header, let ConsistencyLevel govern + headers.remove(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY); + } + } + private Mono performRequestInternalCore(RxDocumentServiceRequest request, URI requestUri) { try { + // Canonicalize consistency headers before wire serialization. + // Both GW V1 (HTTP) and GW V2 (RNTBD via ThinClientStoreModel) read from + // request.getHeaders() — this ensures only the winning header reaches the wire. + resolveEffectiveConsistencyHeaders(request); + HttpRequest httpRequest = request .getEffectiveHttpTransportSerializer(this) .wrapInHttpRequest(request, requestUri); @@ -969,6 +1036,39 @@ private Mono resolvePartitionKeyRangeByPkRangeIdCore( }); } + /** + * Determines if the effective consistency for this request is Session — needed by + * {@link #applySessionToken} to decide whether to attach/remove session tokens. + *

+ * Pure read — no side effects, no header mutation, no HashMap copy. + * Resolution order: request-level readConsistencyStrategy > client-level readConsistencyStrategy + * (header) > consistency-level header > account default. + */ + private boolean isEffectiveSessionConsistency(RxDocumentServiceRequest request) { + // Request-level readConsistencyStrategy takes priority + if (request.requestContext != null + && request.requestContext.readConsistencyStrategy != null + && request.requestContext.readConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) { + return request.requestContext.readConsistencyStrategy == ReadConsistencyStrategy.SESSION; + } + + // Client-level readConsistencyStrategy from header + String rcsHeader = request.getHeaders().get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY); + if (!Strings.isNullOrEmpty(rcsHeader) + && !ReadConsistencyStrategy.DEFAULT.toString().equals(rcsHeader)) { + return ReadConsistencyStrategy.SESSION.toString().equals(rcsHeader); + } + + // Explicit consistency level header + String clHeader = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + if (!Strings.isNullOrEmpty(clHeader)) { + return ConsistencyLevel.SESSION.toString().equalsIgnoreCase(clHeader); + } + + // Fall back to account default + return this.gatewayServiceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION; + } + private Mono applySessionToken(RxDocumentServiceRequest request) { Map headers = request.getHeaders(); Objects.requireNonNull(headers, "RxDocumentServiceRequest::headers is required and cannot be null"); @@ -981,8 +1081,11 @@ private Mono applySessionToken(RxDocumentServiceRequest request) { return Mono.empty(); } - boolean sessionConsistency = (RequestHelper.getReadConsistencyStrategyToUse(this.gatewayServiceConfigurationReader, - request) == ReadConsistencyStrategy.SESSION); + // Determine if the effective consistency is Session — needed to decide whether to + // attach/remove session tokens. This is a pure read with no side-effects; it does NOT + // call RequestHelper.getReadConsistencyStrategyToUse() which mutates x-ms-consistency-level + // (a Direct-mode telemetry concern that is harmful in Gateway mode). + boolean sessionConsistency = isEffectiveSessionConsistency(request); if (!Strings.isNullOrEmpty(request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN))) { if (!sessionConsistency || diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 40b9b82f6e3d..ca5ed4432212 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -27,6 +27,24 @@ public static class RntbdHealthCheckResults { public static final String SuccessValue = "Success"; } + public enum RntbdReadConsistencyStrategy { + + Eventual((byte) 0x01), + Session((byte) 0x02), + LatestCommitted((byte) 0x03), + GlobalStrong((byte) 0x04); + + private final byte id; + + RntbdReadConsistencyStrategy(final byte id) { + this.id = id; + } + + public byte id() { + return this.id; + } + } + public enum RntbdConsistencyLevel { Strong((byte) 0x00), @@ -599,7 +617,8 @@ public enum RntbdRequestHeader implements RntbdHeader { GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false), ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false), PopulateQueryAdvice((short) 0x00DA, RntbdTokenType.Byte, false), - HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false); + HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false), + ReadConsistencyStrategy((short)0x00FE, RntbdTokenType.Byte, false); public static final List thinClientHeadersInOrderList = Arrays.asList( EffectivePartitionKey, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java index 41b820a1ad95..96a472c45763 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java @@ -18,6 +18,8 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.apachecommons.lang.EnumUtils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.azure.cosmos.models.IndexingDirective; import com.azure.cosmos.models.PriorityLevel; import com.fasterxml.jackson.annotation.JsonFilter; @@ -48,6 +50,7 @@ @JsonFilter("RntbdToken") final class RntbdRequestHeaders extends RntbdTokenStream { + private static final Logger logger = LoggerFactory.getLogger(RntbdRequestHeaders.class); private static ImplementationBridgeHelpers.PriorityLevelHelper.PriorityLevelAccessor priorityLevelAccessor() { return ImplementationBridgeHelpers.PriorityLevelHelper.getPriorityLevelAccessor(); @@ -139,6 +142,7 @@ private static ImplementationBridgeHelpers.PriorityLevelHelper.PriorityLevelAcce this.addThroughputBucket(headers); this.addPopulateQueryAdvice(headers); this.addHubRegionProcessingOnly(headers); + this.addReadConsistencyStrategy(headers); // Normal headers (Strings, Ints, Longs, etc.) @@ -828,6 +832,37 @@ private void addHubRegionProcessingOnly(final Map headers) { } } + private RntbdToken getReadConsistencyStrategy() { + return this.get(RntbdRequestHeader.ReadConsistencyStrategy); + } + + private void addReadConsistencyStrategy(final Map headers) { + final String value = headers.get(HttpHeaders.READ_CONSISTENCY_STRATEGY); + + if (StringUtils.isNotEmpty(value)) { + switch (value) { + case "Eventual": + this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.Eventual.id()); + break; + case "Session": + this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.Session.id()); + break; + case "LatestCommitted": + this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.LatestCommitted.id()); + break; + case "GlobalStrong": + this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.GlobalStrong.id()); + break; + default: + if (!"Default".equals(value)) { + throw new IllegalArgumentException( + "Unknown ReadConsistencyStrategy value '" + value + "' — cannot encode in RNTBD frame"); + } + break; + } + } + } + private void addGlobalDatabaseAccountName(final Map headers) { final String value = headers.get(HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java index 519dfaecebaf..4e8d26112316 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java @@ -300,20 +300,26 @@ public Map createCommonHeadersAsync(CosmosQueryRequestOptions co if (cosmosQueryRequestOptions.getReadConsistencyStrategy() != null) { - String readConsistencyStrategyName = cosmosQueryRequestOptions.getReadConsistencyStrategy().toString(); - this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); - requestHeaders.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName); + this.client.validateReadConsistencyStrategy(cosmosQueryRequestOptions.getReadConsistencyStrategy()); + + if (cosmosQueryRequestOptions.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) { + requestHeaders.put( + HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, + cosmosQueryRequestOptions.getReadConsistencyStrategy().toString()); + } consistencyLevelOverrideApplicable = cosmosQueryRequestOptions.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT; } if (consistencyLevelOverrideApplicable && this.client.getReadConsistencyStrategy() != null) { - String readConsistencyStrategyName = this.client.getReadConsistencyStrategy().toString(); - this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName); - requestHeaders.put( - HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, - readConsistencyStrategyName); + this.client.validateReadConsistencyStrategy(this.client.getReadConsistencyStrategy()); + + if (this.client.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) { + requestHeaders.put( + HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, + this.client.getReadConsistencyStrategy().toString()); + } consistencyLevelOverrideApplicable = this.client.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java index 8555efc5487a..91679d40490e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java @@ -74,7 +74,7 @@ Mono executeFeedOperationWithAvailabilityStrategy( ConsistencyLevel getConsistencyLevel(); - void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName); + void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy); ///

/// A client query compatibility mode when making query request.