headers = new java.util.HashMap<>();
+ headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, "Session");
+ headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, "LatestCommitted");
+
+ RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null);
+ RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null);
+ RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(headers, null);
+
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)).isFalse();
+ assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .isEqualTo("LatestCommitted");
+ }
+
+ // endregion
+
+ // region Helpers
+
+ private static ThinClientStoreModel createMockThinClientStoreModel() {
+ DatabaseAccount mockAccount = Mockito.mock(DatabaseAccount.class);
+ Mockito.when(mockAccount.getId()).thenReturn("test-account");
+
+ GlobalEndpointManager mockGem = Mockito.mock(GlobalEndpointManager.class);
+ Mockito.when(mockGem.getLatestDatabaseAccount()).thenReturn(mockAccount);
+
+ return new ThinClientStoreModel(
+ null, // clientContext — not used in wrapInHttpRequest
+ Mockito.mock(ISessionContainer.class),
+ ConsistencyLevel.SESSION,
+ new UserAgentContainer(),
+ mockGem,
+ Mockito.mock(HttpClient.class));
+ }
+
+ private static RxDocumentServiceRequest createDocumentReadRequest() {
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
+ null, OperationType.Read, "dbs/testdb/colls/testcoll/docs/testdoc", ResourceType.Document);
+ // Set resolved partition key range to avoid NPE in wrapInHttpRequest
+ request.requestContext.resolvedPartitionKeyRange = new PartitionKeyRange();
+ request.requestContext.resolvedPartitionKeyRange.setMinInclusive("00");
+ request.requestContext.resolvedPartitionKeyRange.setMaxExclusive("FF");
+ return request;
+ }
+
+ private static byte[] collectHttpBody(HttpRequest httpRequest) {
+ return httpRequest.body().reduce((a, b) -> {
+ byte[] merged = new byte[a.length + b.length];
+ System.arraycopy(a, 0, merged, 0, a.length);
+ System.arraycopy(b, 0, merged, a.length, b.length);
+ return merged;
+ }).block();
+ }
+
+ private static void resolveEffectiveConsistencyHeaders(RxDocumentServiceRequest request) {
+ RxGatewayStoreModel.resolveEffectiveConsistencyHeaders(
+ request.getHeaders(),
+ request.requestContext != null ? request.requestContext.readConsistencyStrategy : null);
+ }
+
+ // region RNTBD frame helpers
+
+ /**
+ * Decodes the RNTBD binary frame using the production decoder.
+ * Token presence/absence is determined by the actual RNTBD wire format.
+ */
+ private static RntbdRequest decodeRntbdFrame(byte[] rntbdFrame) {
+ ByteBuf buffer = Unpooled.wrappedBuffer(rntbdFrame);
+ return RntbdRequest.decode(buffer);
+ }
+
+ // endregion
+}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java
new file mode 100644
index 000000000000..ac2eb8d6d261
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategyE2ETest.java
@@ -0,0 +1,719 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.cosmos.rx;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosAsyncClient;
+import com.azure.cosmos.CosmosAsyncContainer;
+import com.azure.cosmos.CosmosAsyncDatabase;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosDiagnostics;
+import com.azure.cosmos.FlakyTestRetryAnalyzer;
+import com.azure.cosmos.GatewayConnectionConfig;
+import com.azure.cosmos.Http2ConnectionConfig;
+import com.azure.cosmos.ReadConsistencyStrategy;
+import com.azure.cosmos.implementation.BadRequestException;
+import com.azure.cosmos.implementation.TestConfigurations;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosItemIdentity;
+import com.azure.cosmos.models.CosmosItemRequestOptions;
+import com.azure.cosmos.models.CosmosItemResponse;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.CosmosReadManyRequestOptions;
+import com.azure.cosmos.models.CosmosRequestOptions;
+import com.azure.cosmos.models.FeedRange;
+import com.azure.cosmos.models.FeedResponse;
+import com.azure.cosmos.models.PartitionKey;
+import com.azure.cosmos.models.SqlParameter;
+import com.azure.cosmos.models.SqlQuerySpec;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Unified E2E tests for ReadConsistencyStrategy propagation across Gateway V1 and Gateway V2.
+ *
+ * Creates two {@link CosmosAsyncClient} instances — one for each transport:
+ *
+ * - Gateway V1 (HTTP/1): Standard gateway mode.
+ * - Gateway V2 (HTTP/2 + thin client): Gateway mode with thin client JVM flag enabled.
+ * V2 tests additionally assert that requests target the thin client endpoint ({@code :10250}).
+ *
+ *
+ * Does not extend {@link TestSuiteBase} to avoid {@code @BeforeSuite} shared container
+ * initialization which requires provisioned throughput (incompatible with serverless accounts).
+ * Uses {@link TestSuiteBase#createCollection(CosmosAsyncClient, String, CosmosContainerProperties)}
+ * as a static utility for serverless-safe container creation.
+ *
+ *
Run with test group "thinclient".
+ */
+public class GatewayReadConsistencyStrategyE2ETest {
+ private static final Logger logger = LoggerFactory.getLogger(GatewayReadConsistencyStrategyE2ETest.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long TIMEOUT = 60_000L;
+
+ private static final String GATEWAY_V1 = "GatewayV1";
+ private static final String GATEWAY_V2 = "GatewayV2";
+ private static final String DIRECT = "Direct";
+
+ private CosmosAsyncClient gatewayV1Client;
+ private CosmosAsyncClient gatewayV2Client;
+ private CosmosAsyncClient directClient;
+ private CosmosAsyncDatabase database;
+ private CosmosAsyncContainer gatewayV1Container;
+ private CosmosAsyncContainer gatewayV2Container;
+ private CosmosAsyncContainer directContainer;
+ private String databaseId;
+ private String containerId;
+
+ @BeforeClass(groups = {"thinclient"}, timeOut = TIMEOUT)
+ public void beforeClass() {
+ System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
+
+ databaseId = "readConsistencyStrategy-e2e-" + UUID.randomUUID().toString().substring(0, 8);
+ containerId = "testcontainer";
+
+ gatewayV1Client = createGatewayV1Builder().buildAsyncClient();
+
+ gatewayV1Client.createDatabaseIfNotExists(databaseId).block();
+ database = gatewayV1Client.getDatabase(databaseId);
+
+ CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/pk");
+ TestSuiteBase.createCollection(gatewayV1Client, databaseId, containerProperties);
+
+ gatewayV1Container = gatewayV1Client.getDatabase(databaseId).getContainer(containerId);
+
+ // Gateway V2 client — HTTP/2 enabled, thin client flag set
+ gatewayV2Client = createGatewayV2Builder().buildAsyncClient();
+ gatewayV2Container = gatewayV2Client.getDatabase(databaseId).getContainer(containerId);
+
+ // Direct mode client
+ directClient = createDirectBuilder().buildAsyncClient();
+ directContainer = directClient.getDatabase(databaseId).getContainer(containerId);
+
+ logger.info("Created E2E test resources: db={}, container={}", databaseId, containerId);
+ }
+
+ @AfterClass(groups = {"thinclient"}, alwaysRun = true)
+ public void afterClass() {
+ if (database != null) {
+ try {
+ database.delete().block();
+ } catch (Exception e) {
+ logger.warn("Failed to delete test database", e);
+ }
+ }
+ safeClose(gatewayV1Client);
+ safeClose(gatewayV2Client);
+ safeClose(directClient);
+ }
+
+ @DataProvider(name = "transportModes")
+ public Object[][] transportModes() {
+ return new Object[][] { { GATEWAY_V1 }, { GATEWAY_V2 }, { DIRECT } };
+ }
+
+ // region ItemRequestOptions — point reads
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readItem_withLatestCommitted(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ CosmosItemResponse response =
+ containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readItem_withEventual(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL);
+
+ CosmosItemResponse response =
+ containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.EVENTUAL);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readItem_withSession(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION);
+
+ CosmosItemResponse response =
+ containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ }
+
+ // endregion
+
+ // region QueryRequestOptions
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void queryItems_withLatestCommitted(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
+ .setPartitionKey(new PartitionKey(id))
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ SqlQuerySpec querySpec = new SqlQuerySpec("SELECT * FROM c WHERE c.id=@id");
+ querySpec.setParameters(Arrays.asList(new SqlParameter("@id", id)));
+
+ FeedResponse response = containerFor(mode)
+ .queryItems(querySpec, queryOptions, ObjectNode.class)
+ .byPage()
+ .blockFirst();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void queryItems_withSession(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
+ .setPartitionKey(new PartitionKey(id))
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION);
+
+ SqlQuerySpec querySpec = new SqlQuerySpec("SELECT * FROM c WHERE c.id=@id");
+ querySpec.setParameters(Arrays.asList(new SqlParameter("@id", id)));
+
+ FeedResponse response = containerFor(mode)
+ .queryItems(querySpec, queryOptions, ObjectNode.class)
+ .byPage()
+ .blockFirst();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ // endregion
+
+ // region ReadAllItems
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readAllItems_withLatestCommitted(String mode) {
+ String pk = UUID.randomUUID().toString();
+ seedDocument(mode, UUID.randomUUID().toString(), pk);
+
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ FeedResponse response = containerFor(mode)
+ .readAllItems(new PartitionKey(pk), queryOptions, ObjectNode.class)
+ .byPage()
+ .blockFirst();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readAllItems_withSession(String mode) {
+ String pk = UUID.randomUUID().toString();
+ seedDocument(mode, UUID.randomUUID().toString(), pk);
+
+ CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION);
+
+ FeedResponse response = containerFor(mode)
+ .readAllItems(new PartitionKey(pk), queryOptions, ObjectNode.class)
+ .byPage()
+ .blockFirst();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ // endregion
+
+ // region ChangeFeedRequestOptions
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void changeFeed_withLatestCommitted(String mode) {
+ String pkValue = UUID.randomUUID().toString();
+ seedDocument(mode, pkValue, pkValue);
+
+ CosmosChangeFeedRequestOptions cfOptions = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(
+ FeedRange.forLogicalPartition(new PartitionKey(pkValue)))
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ List> pages = containerFor(mode)
+ .queryChangeFeed(cfOptions, ObjectNode.class)
+ .byPage()
+ .collectList()
+ .block();
+
+ assertThat(pages).isNotNull();
+ assertThat(pages.isEmpty()).isFalse();
+
+ FeedResponse firstPage = pages.get(0);
+ assertEffectiveReadConsistencyStrategy(firstPage.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, firstPage.getCosmosDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void changeFeed_withSession(String mode) {
+ String pkValue = UUID.randomUUID().toString();
+ seedDocument(mode, pkValue, pkValue);
+
+ CosmosChangeFeedRequestOptions cfOptions = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(
+ FeedRange.forLogicalPartition(new PartitionKey(pkValue)))
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION);
+
+ List> pages = containerFor(mode)
+ .queryChangeFeed(cfOptions, ObjectNode.class)
+ .byPage()
+ .collectList()
+ .block();
+
+ assertThat(pages).isNotNull();
+ assertThat(pages.isEmpty()).isFalse();
+
+ FeedResponse firstPage = pages.get(0);
+ assertEffectiveReadConsistencyStrategy(firstPage.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, firstPage.getCosmosDiagnostics());
+ }
+
+ // endregion
+
+ // region ReadManyRequestOptions
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readMany_withLatestCommitted(String mode) {
+ String pkValue = UUID.randomUUID().toString();
+ String id1 = UUID.randomUUID().toString();
+ String id2 = UUID.randomUUID().toString();
+ seedDocument(mode, id1, pkValue);
+ seedDocument(mode, id2, pkValue);
+
+ List identities = Arrays.asList(
+ new CosmosItemIdentity(new PartitionKey(pkValue), id1),
+ new CosmosItemIdentity(new PartitionKey(pkValue), id2));
+
+ CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ FeedResponse response =
+ containerFor(mode).readMany(identities, readManyOptions, ObjectNode.class).block();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertThat(response.getResults().size()).isEqualTo(2);
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readMany_withSession(String mode) {
+ String pkValue = UUID.randomUUID().toString();
+ String id1 = UUID.randomUUID().toString();
+ String id2 = UUID.randomUUID().toString();
+ seedDocument(mode, id1, pkValue);
+ seedDocument(mode, id2, pkValue);
+
+ List identities = Arrays.asList(
+ new CosmosItemIdentity(new PartitionKey(pkValue), id1),
+ new CosmosItemIdentity(new PartitionKey(pkValue), id2));
+
+ CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.SESSION);
+
+ FeedResponse response =
+ containerFor(mode).readMany(identities, readManyOptions, ObjectNode.class).block();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getResults()).isNotNull();
+ assertThat(response.getResults().size()).isEqualTo(2);
+ assertEffectiveReadConsistencyStrategy(response.getCosmosDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, response.getCosmosDiagnostics());
+ }
+
+ // endregion
+
+ // region Client-level ReadConsistencyStrategy
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void clientLevel_latestCommitted_readItem(String mode) {
+ CosmosAsyncClient clientWithReadConsistencyStrategy = null;
+ try {
+ clientWithReadConsistencyStrategy = builderFor(mode)
+ .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED)
+ .buildAsyncClient();
+ CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(targetContainer, id);
+
+ CosmosItemResponse response =
+ targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(clientWithReadConsistencyStrategy);
+ }
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void clientLevel_session_readItem(String mode) {
+ CosmosAsyncClient clientWithReadConsistencyStrategy = null;
+ try {
+ clientWithReadConsistencyStrategy = builderFor(mode)
+ .readConsistencyStrategy(ReadConsistencyStrategy.SESSION)
+ .buildAsyncClient();
+ CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(targetContainer, id);
+
+ CosmosItemResponse response =
+ targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.SESSION);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(clientWithReadConsistencyStrategy);
+ }
+ }
+
+ // endregion
+
+ // region Write operations— ReadConsistencyStrategy forced to DEFAULT
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void writeItem_readConsistencyStrategyIgnored(String mode) {
+ CosmosAsyncClient clientWithReadConsistencyStrategy = null;
+ try {
+ clientWithReadConsistencyStrategy = builderFor(mode)
+ .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED)
+ .buildAsyncClient();
+ CosmosAsyncContainer targetContainer = clientWithReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ ObjectNode doc = createDocument(id);
+
+ CosmosItemResponse response =
+ targetContainer.createItem(doc, new PartitionKey(id), null).block();
+
+ assertThat(response).isNotNull();
+ assertThat(response.getStatusCode()).isEqualTo(201);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.DEFAULT);
+ } finally {
+ safeClose(clientWithReadConsistencyStrategy);
+ }
+ }
+
+ // endregion
+
+ // region Validation — GLOBAL_STRONG on Session account
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void readItem_globalStrong_invalidAccount_throwsBadRequest(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.GLOBAL_STRONG);
+
+ Throwable caughtError = null;
+ try {
+ containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+ } catch (Throwable t) {
+ caughtError = t;
+ }
+
+ assertThat(caughtError)
+ .as("Expected BadRequestException for GLOBAL_STRONG on Session account")
+ .isNotNull()
+ .isInstanceOf(BadRequestException.class);
+ assertThat(caughtError.getMessage()).contains("read-consistency-strategy");
+ }
+
+ // endregion
+
+ // region Contention — both ConsistencyLevel and ReadConsistencyStrategy set
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void clientLevel_bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins(String mode) {
+ CosmosAsyncClient clientWithBoth = null;
+ try {
+ clientWithBoth = builderFor(mode)
+ .consistencyLevel(ConsistencyLevel.EVENTUAL)
+ .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED)
+ .buildAsyncClient();
+ CosmosAsyncContainer targetContainer = clientWithBoth.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(targetContainer, id);
+
+ CosmosItemResponse response =
+ targetContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(clientWithBoth);
+ }
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void requestLevel_bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins(String mode) {
+ String id = seedDocument(mode);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setConsistencyLevel(ConsistencyLevel.EVENTUAL)
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ CosmosItemResponse response =
+ containerFor(mode).readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void requestLevelReadConsistencyStrategy_overridesClientLevelReadConsistencyStrategy(String mode) {
+ CosmosAsyncClient clientWithClientReadConsistencyStrategy = null;
+ try {
+ clientWithClientReadConsistencyStrategy = builderFor(mode)
+ .readConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL)
+ .buildAsyncClient();
+ CosmosAsyncContainer targetContainer = clientWithClientReadConsistencyStrategy.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(targetContainer, id);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ CosmosItemResponse response =
+ targetContainer.readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(clientWithClientReadConsistencyStrategy);
+ }
+ }
+
+ // endregion
+
+ // region Operation policy
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void operationPolicy_setsReadConsistencyStrategy(String mode) {
+ CosmosAsyncClient policyClient = null;
+ try {
+ policyClient = builderFor(mode)
+ .addOperationPolicy(cosmosOperationDetails -> {
+ CosmosRequestOptions overrides = new CosmosRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+ cosmosOperationDetails.setRequestOptions(overrides);
+ })
+ .buildAsyncClient();
+ CosmosAsyncContainer policyContainer = policyClient.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(policyContainer, id);
+
+ CosmosItemResponse response =
+ policyContainer.readItem(id, new PartitionKey(id), ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.LATEST_COMMITTED);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(policyClient);
+ }
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes", retryAnalyzer = FlakyTestRetryAnalyzer.class)
+ public void operationPolicy_readConsistencyStrategyOverridesRequestLevel(String mode) {
+ CosmosAsyncClient policyClient = null;
+ try {
+ policyClient = builderFor(mode)
+ .addOperationPolicy(cosmosOperationDetails -> {
+ CosmosRequestOptions overrides = new CosmosRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.EVENTUAL);
+ cosmosOperationDetails.setRequestOptions(overrides);
+ })
+ .buildAsyncClient();
+ CosmosAsyncContainer policyContainer = policyClient.getDatabase(databaseId).getContainer(containerId);
+
+ String id = UUID.randomUUID().toString();
+ createAndInsertDocument(policyContainer, id);
+
+ CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions()
+ .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ CosmosItemResponse response =
+ policyContainer.readItem(id, new PartitionKey(id), readOptions, ObjectNode.class).block();
+
+ assertSuccessfulRead(response);
+ assertEffectiveReadConsistencyStrategy(response.getDiagnostics(), ReadConsistencyStrategy.EVENTUAL);
+ assertEndpointForMode(mode, response.getDiagnostics());
+ } finally {
+ safeClose(policyClient);
+ }
+ }
+
+ // endregion
+
+ // region Helpers — builders
+
+ private static CosmosClientBuilder createGatewayV1Builder() {
+ return new CosmosClientBuilder()
+ .endpoint(TestConfigurations.HOST)
+ .key(TestConfigurations.MASTER_KEY)
+ .gatewayMode()
+ .consistencyLevel(ConsistencyLevel.SESSION);
+ }
+
+ private static CosmosClientBuilder createGatewayV2Builder() {
+ GatewayConnectionConfig gwConfig = new GatewayConnectionConfig();
+ gwConfig.setHttp2ConnectionConfig(new Http2ConnectionConfig().setEnabled(true));
+
+ return new CosmosClientBuilder()
+ .endpoint(TestConfigurations.HOST)
+ .key(TestConfigurations.MASTER_KEY)
+ .gatewayMode(gwConfig)
+ .consistencyLevel(ConsistencyLevel.SESSION);
+ }
+
+ private static CosmosClientBuilder createDirectBuilder() {
+ return new CosmosClientBuilder()
+ .endpoint(TestConfigurations.HOST)
+ .key(TestConfigurations.MASTER_KEY)
+ .directMode()
+ .consistencyLevel(ConsistencyLevel.SESSION);
+ }
+
+ private CosmosClientBuilder builderFor(String mode) {
+ switch (mode) {
+ case GATEWAY_V2: return createGatewayV2Builder();
+ case DIRECT: return createDirectBuilder();
+ default: return createGatewayV1Builder();
+ }
+ }
+
+ private CosmosAsyncContainer containerFor(String mode) {
+ switch (mode) {
+ case GATEWAY_V2: return gatewayV2Container;
+ case DIRECT: return directContainer;
+ default: return gatewayV1Container;
+ }
+ }
+
+ private static boolean isGatewayV2(String mode) {
+ return GATEWAY_V2.equals(mode);
+ }
+
+ // endregion
+
+ // region Helpers — documents
+
+ private ObjectNode createDocument(String id) {
+ return createDocument(id, id);
+ }
+
+ private ObjectNode createDocument(String id, String pk) {
+ ObjectNode doc = OBJECT_MAPPER.createObjectNode();
+ doc.put("id", id);
+ doc.put("pk", pk);
+ return doc;
+ }
+
+ private String seedDocument(String mode) {
+ String id = UUID.randomUUID().toString();
+ seedDocument(mode, id, id);
+ return id;
+ }
+
+ private void seedDocument(String mode, String id, String pk) {
+ ObjectNode doc = createDocument(id, pk);
+ containerFor(mode).createItem(doc, new PartitionKey(pk), null).block();
+ }
+
+ private void createAndInsertDocument(CosmosAsyncContainer targetContainer, String id) {
+ ObjectNode doc = createDocument(id);
+ targetContainer.createItem(doc, new PartitionKey(id), null).block();
+ }
+
+ // endregion
+
+ // region Helpers — assertions
+
+ private static void assertSuccessfulRead(CosmosItemResponse response) {
+ assertThat(response).isNotNull();
+ assertThat(response.getStatusCode()).isEqualTo(200);
+ }
+
+ private static void assertEffectiveReadConsistencyStrategy(CosmosDiagnostics diagnostics, ReadConsistencyStrategy expected) {
+ assertThat(diagnostics).isNotNull();
+ assertThat(diagnostics.getDiagnosticsContext()).isNotNull();
+ assertThat(diagnostics.getDiagnosticsContext().getEffectiveReadConsistencyStrategy())
+ .isEqualTo(expected);
+ }
+
+ private void assertEndpointForMode(String mode, CosmosDiagnostics diagnostics) {
+ if (isGatewayV2(mode)) {
+ TestSuiteBase.assertThinClientEndpointUsed(diagnostics);
+ }
+ }
+
+ private static void safeClose(CosmosAsyncClient clientToClose) {
+ if (clientToClose != null) {
+ try {
+ clientToClose.close();
+ } catch (Exception e) {
+ logger.warn("Failed to close client", e);
+ }
+ }
+ }
+
+ // endregion
+}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java
new file mode 100644
index 000000000000..16b59d541a1e
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/GatewayReadConsistencyStrategySpyWireTest.java
@@ -0,0 +1,486 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.cosmos.rx;
+
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosAsyncClient;
+import com.azure.cosmos.CosmosAsyncContainer;
+import com.azure.cosmos.CosmosAsyncDatabase;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.GatewayConnectionConfig;
+import com.azure.cosmos.Http2ConnectionConfig;
+import com.azure.cosmos.ReadConsistencyStrategy;
+import com.azure.cosmos.implementation.Configs;
+import com.azure.cosmos.implementation.ConnectionPolicy;
+import com.azure.cosmos.implementation.Document;
+import com.azure.cosmos.implementation.HttpConstants;
+import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
+import com.azure.cosmos.implementation.RequestOptions;
+import com.azure.cosmos.implementation.SpyClientUnderTestFactory;
+import com.azure.cosmos.implementation.TestConfigurations;
+import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants;
+import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest;
+import com.azure.cosmos.implementation.http.HttpRequest;
+import com.azure.cosmos.models.CosmosClientTelemetryConfig;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosItemRequestOptions;
+import com.azure.cosmos.models.PartitionKey;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Spy-wire tests for ReadConsistencyStrategy header propagation across both transports.
+ *
+ * Creates two spy clients that intercept at the HTTP client layer:
+ *
+ * - Gateway V1 (HTTP/1): No thin client enablement. Requests flow through
+ * {@link RxGatewayStoreModel}. Assertions inspect HTTP headers.
+ * - Gateway V2 (HTTP/2 + thin client): {@code COSMOS.THINCLIENT_ENABLED=true} +
+ * {@link Http2ConnectionConfig} enabled. Requests flow through {@link ThinClientStoreModel}
+ * when thin client read locations are available. Assertions inspect the RNTBD binary frame
+ * in the HTTP body for the ReadConsistencyStrategy token (0x00F0, Byte type).
+ *
+ *
+ * V2 tests are skipped when the test account does not have thin client read locations
+ * (the spy client falls back to V1 silently — we detect this and skip rather than false-pass).
+ */
+public class GatewayReadConsistencyStrategySpyWireTest {
+ private static final Logger logger = LoggerFactory.getLogger(GatewayReadConsistencyStrategySpyWireTest.class);
+
+ private static ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.CosmosItemRequestOptionsAccessor getItemOptionsAccessor() {
+ return ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.getCosmosItemRequestOptionsAccessor();
+ }
+
+ private static final long TIMEOUT = 60_000L;
+ private static final String DOCUMENT_ID = UUID.randomUUID().toString();
+
+ // V1 transport — HTTP/1, gateway mode, no thin client
+ private static final String V1 = "GatewayV1";
+ // V2 transport — HTTP/2, thin client enabled
+ private static final String V2 = "GatewayV2";
+
+ private CosmosAsyncClient cosmosClient;
+ private CosmosAsyncDatabase database;
+ private CosmosAsyncContainer container;
+ private String databaseId;
+ private String containerId;
+
+ private SpyClientUnderTestFactory.ClientUnderTest v1SpyClient;
+ private SpyClientUnderTestFactory.ClientUnderTest v2SpyClient;
+
+ @BeforeClass(groups = {"thinclient"}, timeOut = TIMEOUT)
+ public void beforeClass() {
+ System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
+
+ cosmosClient = new CosmosClientBuilder()
+ .endpoint(TestConfigurations.HOST)
+ .key(TestConfigurations.MASTER_KEY)
+ .gatewayMode()
+ .buildAsyncClient();
+
+ databaseId = "ReadConsistencyStrategy-spy-" + UUID.randomUUID().toString().substring(0, 8);
+ containerId = "testcontainer";
+
+ cosmosClient.createDatabaseIfNotExists(databaseId).block();
+
+ CosmosContainerProperties props = new CosmosContainerProperties(containerId, "/mypk");
+ container = TestSuiteBase.createCollection(cosmosClient, databaseId, props);
+ database = cosmosClient.getDatabase(databaseId);
+
+ // Seed a document
+ ObjectNode doc = com.fasterxml.jackson.databind.node.JsonNodeFactory.instance.objectNode();
+ doc.put("id", DOCUMENT_ID);
+ doc.put("mypk", DOCUMENT_ID);
+ container.createItem(doc).block();
+
+ // V1 spy — HTTP/1, no Http2ConnectionConfig → useThinClient = false
+ v1SpyClient = createSpyClient(null, false);
+
+ // V2 spy — HTTP/2 enabled, thin client JVM flag set
+ v2SpyClient = createSpyClient(null, true);
+
+ logger.info("Created spy-wire test resources: db={}, container={}", databaseId, containerId);
+ }
+
+ @AfterClass(groups = {"thinclient"}, alwaysRun = true)
+ public void afterClass() {
+ if (database != null) {
+ try {
+ database.delete().block();
+ } catch (Exception e) {
+ logger.warn("Failed to delete test database", e);
+ }
+ }
+ if (cosmosClient != null) {
+ cosmosClient.close();
+ }
+ if (v1SpyClient != null) {
+ v1SpyClient.close();
+ }
+ if (v2SpyClient != null) {
+ v2SpyClient.close();
+ }
+ }
+
+ @DataProvider(name = "transportModes")
+ public Object[][] transportModes() {
+ return new Object[][] { { V1 }, { V2 } };
+ }
+
+ // region No contention — single header set
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void requestLevelReadConsistencyStrategy_headerOnWire_consistencyLevelStripped(String mode) {
+ CosmosItemRequestOptions options = new CosmosItemRequestOptions();
+ options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ assertReadConsistencyStrategyOnWire(mode, spyFor(mode), options, "LatestCommitted", (byte) 0x03, true);
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void clientLevelReadConsistencyStrategy_headerOnWire_consistencyLevelStripped(String mode) {
+ SpyClientUnderTestFactory.ClientUnderTest readConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.LATEST_COMMITTED, isGatewayV2(mode));
+ try {
+ assertReadConsistencyStrategyOnWire(mode, readConsistencyStrategyClient, new RequestOptions(), "LatestCommitted", (byte) 0x03, true);
+ } finally {
+ readConsistencyStrategyClient.close();
+ }
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void defaultReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire(String mode) {
+ CosmosItemRequestOptions options = new CosmosItemRequestOptions();
+ options.setReadConsistencyStrategy(ReadConsistencyStrategy.DEFAULT);
+
+ assertNoReadConsistencyStrategyOnWire(mode, spyFor(mode), options);
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void clientLeveldefaultReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire(String mode) {
+ SpyClientUnderTestFactory.ClientUnderTest defaultReadConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.DEFAULT, isGatewayV2(mode));
+ try {
+ assertNoReadConsistencyStrategyOnWire(mode, defaultReadConsistencyStrategyClient, new RequestOptions());
+ } finally {
+ defaultReadConsistencyStrategyClient.close();
+ }
+ }
+
+ // endregion
+
+ // region Contention — both ConsistencyLevel and ReadConsistencyStrategy present, resolution determines the winner
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void bothConsistencyLevelAndReadConsistencyStrategy_readConsistencyStrategyWins_consistencyLevelStripped(String mode) {
+ CosmosItemRequestOptions options = new CosmosItemRequestOptions();
+ options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+ options.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+
+ assertReadConsistencyStrategyOnWire(mode, spyFor(mode), options, "LatestCommitted", (byte) 0x03, true);
+ }
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT, dataProvider = "transportModes")
+ public void requestLevelReadConsistencyStrategy_overridesClientLevelReadConsistencyStrategy(String mode) {
+ // Client-level ReadConsistencyStrategy = EVENTUAL (applied to every request header via builder)
+ // Request-level ReadConsistencyStrategy = LATEST_COMMITTED (per-operation override via requestContext)
+ // Resolution: request-level wins.
+ SpyClientUnderTestFactory.ClientUnderTest eventualReadConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.EVENTUAL, isGatewayV2(mode));
+ try {
+ CosmosItemRequestOptions options = new CosmosItemRequestOptions();
+ options.setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED);
+
+ assertReadConsistencyStrategyOnWire(mode, eventualReadConsistencyStrategyClient, options, "LatestCommitted", (byte) 0x03, true);
+ } finally {
+ eventualReadConsistencyStrategyClient.close();
+ }
+ }
+
+ // endregion
+
+ // region Write operations — ReadConsistencyStrategy should not appear on writes (V1 only, writes don't route to thin client)
+
+ @Test(groups = {"thinclient"}, timeOut = TIMEOUT)
+ public void writeWithClientLevelReadConsistencyStrategy_noReadConsistencyStrategyHeaderOnWire() {
+ SpyClientUnderTestFactory.ClientUnderTest readConsistencyStrategyClient = createSpyClient(ReadConsistencyStrategy.LATEST_COMMITTED, false);
+ try {
+ String writeDocId = UUID.randomUUID().toString();
+ Document writeDoc = new Document(String.format(
+ "{ \"id\": \"%s\", \"mypk\": \"%s\" }", writeDocId, writeDocId));
+
+ readConsistencyStrategyClient.clearCapturedRequests();
+ readConsistencyStrategyClient.createDocument(getCollectionLink(), writeDoc, null, false).block();
+
+ List requests = readConsistencyStrategyClient.getCapturedRequests();
+ assertThat(requests).isNotEmpty();
+
+ HttpRequest createRequest = requests.stream()
+ .filter(r -> "POST".equalsIgnoreCase(r.httpMethod().toString()))
+ .findFirst()
+ .orElse(null);
+ assertThat(createRequest).as("Expected a document create request").isNotNull();
+
+ Map headers = createRequest.headers().toMap();
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .as("Write operations should not have ReadConsistencyStrategy header")
+ .isFalse();
+ } finally {
+ readConsistencyStrategyClient.close();
+ }
+ }
+
+ // endregion
+
+ // region Assertion helpers — branch on transport mode
+
+ private void assertReadConsistencyStrategyOnWire(
+ String mode,
+ SpyClientUnderTestFactory.ClientUnderTest client,
+ CosmosItemRequestOptions cosmosOptions,
+ String expectedHeaderValue,
+ byte expectedRntbdByte,
+ boolean expectClStripped) {
+
+ HttpRequest captured = executeReadAndCapture(mode, client, cosmosOptions);
+
+ if (isGatewayV2(mode)) {
+ // Verify the request actually routed through thin client (port 10250)
+ assertThat(captured.uri().toString())
+ .as("V2 request should target thin client proxy endpoint (port 10250)")
+ .contains(":10250");
+
+ // V2: decode RNTBD frame and inspect typed tokens
+ RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured));
+ Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy);
+ assertThat(readConsistencyStrategyValue)
+ .as("ReadConsistencyStrategy token value should be 0x%02X", expectedRntbdByte)
+ .isNotNull()
+ .isEqualTo(expectedRntbdByte);
+ if (expectClStripped) {
+ Byte clValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel);
+ assertThat(clValue)
+ .as("ConsistencyLevel RNTBD token should not be set when ReadConsistencyStrategy is active (0 = unset)")
+ .isEqualTo((byte) 0);
+ }
+ } else {
+ // V1: inspect HTTP headers
+ Map headers = captured.headers().toMap();
+ assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .isEqualTo(expectedHeaderValue);
+ if (expectClStripped) {
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL))
+ .as("ConsistencyLevel header should be stripped when ReadConsistencyStrategy is set")
+ .isFalse();
+ }
+ }
+ }
+
+ private void assertReadConsistencyStrategyOnWire(
+ String mode,
+ SpyClientUnderTestFactory.ClientUnderTest client,
+ RequestOptions requestOptions,
+ String expectedHeaderValue,
+ byte expectedRntbdByte,
+ boolean expectClStripped) {
+
+ HttpRequest captured = executeReadAndCapture(mode, client, requestOptions);
+
+ if (isGatewayV2(mode)) {
+ assertThat(captured.uri().toString())
+ .as("V2 request should target thin client proxy endpoint (port 10250)")
+ .contains(":10250");
+
+ RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured));
+ Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy);
+ assertThat(readConsistencyStrategyValue)
+ .as("ReadConsistencyStrategy token value should be 0x%02X", expectedRntbdByte)
+ .isNotNull()
+ .isEqualTo(expectedRntbdByte);
+ if (expectClStripped) {
+ Byte clValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ConsistencyLevel);
+ assertThat(clValue)
+ .as("ConsistencyLevel RNTBD token should not be set when ReadConsistencyStrategy is active (0 = unset)")
+ .isEqualTo((byte) 0);
+ }
+ } else {
+ Map headers = captured.headers().toMap();
+ assertThat(headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .isEqualTo(expectedHeaderValue);
+ if (expectClStripped) {
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL))
+ .as("ConsistencyLevel header should be stripped when ReadConsistencyStrategy is set")
+ .isFalse();
+ }
+ }
+ }
+
+ private void assertNoReadConsistencyStrategyOnWire(String mode, SpyClientUnderTestFactory.ClientUnderTest client, CosmosItemRequestOptions cosmosOptions) {
+ HttpRequest captured = executeReadAndCapture(mode, client, cosmosOptions);
+
+ if (isGatewayV2(mode)) {
+ assertThat(captured.uri().toString())
+ .as("V2 request should target thin client proxy endpoint (port 10250)")
+ .contains(":10250");
+
+ RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured));
+ Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy);
+ assertThat(readConsistencyStrategyValue)
+ .as("ReadConsistencyStrategy RNTBD token should not be set when DEFAULT (0 = unset)")
+ .isEqualTo((byte) 0);
+ } else {
+ Map headers = captured.headers().toMap();
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .as("DEFAULT ReadConsistencyStrategy should not emit a header")
+ .isFalse();
+ }
+ }
+
+ private void assertNoReadConsistencyStrategyOnWire(String mode, SpyClientUnderTestFactory.ClientUnderTest client, RequestOptions requestOptions) {
+ HttpRequest captured = executeReadAndCapture(mode, client, requestOptions);
+
+ if (isGatewayV2(mode)) {
+ assertThat(captured.uri().toString())
+ .as("V2 request should target thin client proxy endpoint (port 10250)")
+ .contains(":10250");
+
+ RntbdRequest rntbdRequest = decodeRntbdFrame(collectHttpBody(captured));
+ Byte readConsistencyStrategyValue = rntbdRequest.getHeader(RntbdConstants.RntbdRequestHeader.ReadConsistencyStrategy);
+ assertThat(readConsistencyStrategyValue)
+ .as("ReadConsistencyStrategy RNTBD token should not be set when DEFAULT (0 = unset)")
+ .isEqualTo((byte) 0);
+ } else {
+ Map headers = captured.headers().toMap();
+ assertThat(headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY))
+ .as("DEFAULT ReadConsistencyStrategy should not emit a header")
+ .isFalse();
+ }
+ }
+
+ // endregion
+
+ // region Execution + capture helpers
+
+ private HttpRequest executeReadAndCapture(String mode, SpyClientUnderTestFactory.ClientUnderTest client, CosmosItemRequestOptions cosmosOptions) {
+ client.clearCapturedRequests();
+ RequestOptions requestOptions = getItemOptionsAccessor().toRequestOptions(cosmosOptions);
+ requestOptions.setPartitionKey(new PartitionKey(DOCUMENT_ID));
+ client.readDocument(getDocumentLink(), requestOptions).block();
+
+ List requests = client.getCapturedRequests();
+ assertThat(requests).isNotEmpty();
+
+ HttpRequest docRequest = findDocumentReadRequest(mode, requests);
+ assertThat(docRequest).as("Expected a document read request").isNotNull();
+ return docRequest;
+ }
+
+ private HttpRequest executeReadAndCapture(String mode, SpyClientUnderTestFactory.ClientUnderTest client, RequestOptions requestOptions) {
+ client.clearCapturedRequests();
+ requestOptions.setPartitionKey(new PartitionKey(DOCUMENT_ID));
+ client.readDocument(getDocumentLink(), requestOptions).block();
+
+ List requests = client.getCapturedRequests();
+ assertThat(requests).isNotEmpty();
+
+ HttpRequest docRequest = findDocumentReadRequest(mode, requests);
+ assertThat(docRequest).as("Expected a document read request").isNotNull();
+ return docRequest;
+ }
+
+ // endregion
+
+ // region Factory + utility helpers
+
+ private SpyClientUnderTestFactory.ClientUnderTest spyFor(String mode) {
+ return isGatewayV2(mode) ? v2SpyClient : v1SpyClient;
+ }
+
+ private static boolean isGatewayV2(String mode) {
+ return V2.equals(mode);
+ }
+
+ private SpyClientUnderTestFactory.ClientUnderTest createSpyClient(ReadConsistencyStrategy ReadConsistencyStrategy, boolean http2Enabled) {
+ ConnectionPolicy gwPolicy = new ConnectionPolicy(GatewayConnectionConfig.getDefaultConfig());
+ if (http2Enabled) {
+ gwPolicy.setHttp2ConnectionConfig(new Http2ConnectionConfig().setEnabled(true));
+ }
+ try {
+ return SpyClientUnderTestFactory.createClientUnderTest(
+ new URI(TestConfigurations.HOST),
+ TestConfigurations.MASTER_KEY,
+ gwPolicy,
+ ConsistencyLevel.SESSION,
+ ReadConsistencyStrategy,
+ new Configs(),
+ null,
+ true,
+ new CosmosClientTelemetryConfig());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getDocumentLink() {
+ return "dbs/" + databaseId + "/colls/" + containerId + "/docs/" + DOCUMENT_ID;
+ }
+
+ private String getCollectionLink() {
+ return "dbs/" + databaseId + "/colls/" + containerId;
+ }
+
+ private HttpRequest findDocumentReadRequest(String mode, List requests) {
+ for (HttpRequest request : requests) {
+ String uri = request.uri().toString();
+ if (isGatewayV2(mode)) {
+ // Thin client sends all requests as POST to proxy (:10250) with RNTBD frame body
+ if ("POST".equalsIgnoreCase(request.httpMethod().toString()) && uri.contains(":10250")) {
+ return request;
+ }
+ } else {
+ // Gateway V1 sends document reads as GET with the document link in the URI
+ if ("GET".equalsIgnoreCase(request.httpMethod().toString()) && uri.contains(getDocumentLink())) {
+ return request;
+ }
+ }
+ }
+ return null;
+ }
+
+ // endregion
+
+ // region RNTBD frame inspection helpers (for V2 assertions)
+
+ private static byte[] collectHttpBody(HttpRequest httpRequest) {
+ return httpRequest.body().reduce((a, b) -> {
+ byte[] merged = new byte[a.length + b.length];
+ System.arraycopy(a, 0, merged, 0, a.length);
+ System.arraycopy(b, 0, merged, a.length, b.length);
+ return merged;
+ }).block();
+ }
+
+ /**
+ * Decodes the RNTBD binary frame and returns the typed request object.
+ * Uses the production decoder (RntbdRequest.decode) so token presence/absence
+ * is determined by the actual RNTBD wire format, not brute-force byte scanning.
+ */
+ private static RntbdRequest decodeRntbdFrame(byte[] rntbdFrame) {
+ ByteBuf buffer = Unpooled.wrappedBuffer(rntbdFrame);
+ return RntbdRequest.decode(buffer);
+ }
+
+ // endregion
+}
diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md
index 89a22c2d1288..bf58280163fb 100644
--- a/sdk/cosmos/azure-cosmos/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md
@@ -4,6 +4,7 @@
#### Features Added
* Added support for change feed with `startFrom` point-in-time on merged partitions by enabling the `CHANGE_FEED_WITH_START_TIME_POST_MERGE` SDK capability. - See [PR 48752](https://github.com/Azure/azure-sdk-for-java/pull/48752)
+* Enabled `ReadConsistencyStrategy` for Gateway V1 (compute gateway) and Gateway V2 (thin client proxy). Previously only supported in Direct mode. Also added client-side validation for `GLOBAL_STRONG` on non-Strong consistency accounts. - See [PR 48787](https://github.com/Azure/azure-sdk-for-java/pull/48787)
* Added new `readManyByPartitionKeys` API on `CosmosAsyncContainer` / `CosmosContainer` to bulk-query all documents matching a list of partition key values with better efficiency than issuing individual queries. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
* Added `CosmosReadManyByPartitionKeysRequestOptions` - a dedicated request-options type for `readManyByPartitionKeys` that exposes `setContinuationToken(String)` for resuming previous invocations and `setMaxConcurrentBatchPrefetch(int)` to bound per-call prefetch parallelism. See [PR 48801](https://github.com/Azure/azure-sdk-for-java/pull/48801)
* Added `CosmosReadManyByPartitionKeysRequestOptions.setMaxBatchSize(Integer)` to set the max. number of partition keys used for a single batch. See [PR 48930](https://github.com/Azure/azure-sdk-for-java/pull/48930)
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
index c8bca12fcbf4..ce2f4961fb1b 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
@@ -146,20 +146,26 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
if (this.options.getReadConsistencyStrategy() != null) {
- String readConsistencyStrategyName = options.getReadConsistencyStrategy().toString();
- this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
- headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName);
+ this.client.validateReadConsistencyStrategy(options.getReadConsistencyStrategy());
+
+ if (this.options.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) {
+ headers.put(
+ HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
+ options.getReadConsistencyStrategy().toString());
+ }
consistencyLevelOverrideApplicable =
this.options.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT;
}
if (consistencyLevelOverrideApplicable && this.client.getReadConsistencyStrategy() != null) {
- String readConsistencyStrategyName = this.client.getReadConsistencyStrategy().toString();
- this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
- headers.put(
- HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
- readConsistencyStrategyName);
+ this.client.validateReadConsistencyStrategy(this.client.getReadConsistencyStrategy());
+
+ if (this.client.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) {
+ headers.put(
+ HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
+ this.client.getReadConsistencyStrategy().toString());
+ }
consistencyLevelOverrideApplicable =
this.client.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT;
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java
index 25c81ed4e2a0..02146b83f96e 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java
@@ -1927,16 +1927,18 @@ private static void validateResource(Resource resource) {
}
}
- public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName) {
- if (this.connectionPolicy.getConnectionMode() != ConnectionMode.DIRECT
- && readConsistencyStrategyName != null
- && ! readConsistencyStrategyName.equalsIgnoreCase(ReadConsistencyStrategy.DEFAULT.toString())) {
-
- logger.warn(
- "ReadConsistencyStrategy {} defined in Gateway mode. "
- + "This version of the SDK only supports ReadConsistencyStrategy in DIRECT mode. "
- + "This setting will be ignored.",
- readConsistencyStrategyName);
+ public void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy) {
+ if (readConsistencyStrategy == ReadConsistencyStrategy.GLOBAL_STRONG) {
+ ConsistencyLevel accountConsistency = this.getDefaultConsistencyLevelOfAccount();
+ if (accountConsistency != ConsistencyLevel.STRONG) {
+ throw new BadRequestException(
+ String.format(
+ RMResources.ReadConsistencyStrategyGlobalStrongOnlyAllowedForGlobalStrongAccount,
+ readConsistencyStrategy.toString(),
+ HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
+ ConsistencyLevel.STRONG,
+ accountConsistency));
+ }
}
}
@@ -1963,8 +1965,12 @@ private Map getRequestHeaders(RequestOptions options, ResourceTy
&& operationType.isReadOnlyOperation()) {
String readConsistencyStrategyName = readConsistencyStrategy.toString();
- this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
+ this.validateReadConsistencyStrategy(readConsistencyStrategy);
headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName);
+ // Compute gateway rejects requests with both x-ms-consistency-level and
+ // x-ms-cosmos-read-consistency-strategy headers. When readConsistencyStrategy is set, remove
+ // consistency-level — readConsistencyStrategy takes precedence.
+ headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
}
if (options == null) {
@@ -2008,13 +2014,20 @@ private Map getRequestHeaders(RequestOptions options, ResourceTy
&& operationType.isReadOnlyOperation()) {
String readConsistencyStrategyName = options.getReadConsistencyStrategy().toString();
- this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
+ this.validateReadConsistencyStrategy(options.getReadConsistencyStrategy());
headers.put(
HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
readConsistencyStrategyName);
+ // Compute gateway rejects requests with both x-ms-consistency-level and
+ // x-ms-cosmos-read-consistency-strategy headers. When readConsistencyStrategy is set, remove
+ // consistency-level — readConsistencyStrategy takes precedence.
+ headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
}
- if (options.getConsistencyLevel() != null) {
+ if (options.getConsistencyLevel() != null
+ && !headers.containsKey(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY)) {
+ // Only set ConsistencyLevel when ReadConsistencyStrategy is NOT already present.
+ // readConsistencyStrategy takes precedence — setting both causes gateway rejection.
headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, options.getConsistencyLevel().toString());
}
@@ -5432,8 +5445,8 @@ public ConsistencyLevel getConsistencyLevel() {
}
@Override
- public void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName) {
- RxDocumentClientImpl.this.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
+ public void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy) {
+ RxDocumentClientImpl.this.validateReadConsistencyStrategy(readConsistencyStrategy);
}
@Override
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
index 9106ab8a1abf..f5eee2100de0 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
@@ -12,7 +12,6 @@
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
-import com.azure.cosmos.implementation.directconnectivity.RequestHelper;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
@@ -320,9 +319,77 @@ public Mono performRequestInternal(RxDocumentServiceR
});
}
+ /**
+ * Resolves contention between ConsistencyLevel and ReadConsistencyStrategy headers.
+ * Gateways (V1 HTTP and V2 RNTBD) reject requests carrying both headers.
+ *
+ * Rules:
+ * 1. Request-level readConsistencyStrategy (requestContext) > client-level readConsistencyStrategy (header)
+ * 2. readConsistencyStrategy > ConsistencyLevel — strip CL when non-DEFAULT readConsistencyStrategy is effective
+ * 3. DEFAULT readConsistencyStrategy is transparent — CL stays
+ *
+ * After this method, the request headers contain at most ONE of the two consistency headers.
+ * GW V1 serializes the surviving header as HTTP; GW V2 (ThinClientStoreModel) encodes it as RNTBD.
+ *
+ * Thread safety: availability-strategy clones share the same header map (shallow copy).
+ * The mutation here (remove CL when readConsistencyStrategy present) is idempotent — concurrent clones
+ * performing the same removal is safe.
+ */
+ private void resolveEffectiveConsistencyHeaders(RxDocumentServiceRequest request) {
+ resolveEffectiveConsistencyHeaders(
+ request.getHeaders(),
+ request.requestContext != null ? request.requestContext.readConsistencyStrategy : null);
+ }
+
+ /**
+ * Core resolution logic — public for direct unit testing from cross-package test classes.
+ * Avoids test drift from duplicated simulation logic.
+ */
+ public static void resolveEffectiveConsistencyHeaders(
+ Map headers,
+ ReadConsistencyStrategy requestContextReadConsistencyStrategy) {
+
+ // Determine effective readConsistencyStrategy: requestContext (request-level) takes priority over header (client-level)
+ ReadConsistencyStrategy effectiveReadConsistencyStrategy = null;
+ if (requestContextReadConsistencyStrategy != null
+ && requestContextReadConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) {
+ effectiveReadConsistencyStrategy = requestContextReadConsistencyStrategy;
+ }
+
+ if (effectiveReadConsistencyStrategy == null) {
+ String readConsistencyStrategyHeaderValue = headers.get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY);
+ if (!Strings.isNullOrEmpty(readConsistencyStrategyHeaderValue)) {
+ effectiveReadConsistencyStrategy = ReadConsistencyStrategy.DEFAULT; // non-null marker; actual value is in header
+ // Re-parse only to check non-DEFAULT — the header string is authoritative for serialization
+ for (ReadConsistencyStrategy candidate : ReadConsistencyStrategy.values()) {
+ if (candidate != ReadConsistencyStrategy.DEFAULT
+ && candidate.toString().equals(readConsistencyStrategyHeaderValue)) {
+ effectiveReadConsistencyStrategy = candidate;
+ break;
+ }
+ }
+ }
+ }
+
+ if (effectiveReadConsistencyStrategy != null && effectiveReadConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) {
+ // readConsistencyStrategy wins — strip ConsistencyLevel to prevent dual-header rejection
+ headers.remove(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
+ // Ensure the readConsistencyStrategy header is set (requestContext-level may not have been written to headers yet)
+ headers.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, effectiveReadConsistencyStrategy.toString());
+ } else if (effectiveReadConsistencyStrategy == ReadConsistencyStrategy.DEFAULT) {
+ // DEFAULT is transparent — remove the sentinel header, let ConsistencyLevel govern
+ headers.remove(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY);
+ }
+ }
+
private Mono performRequestInternalCore(RxDocumentServiceRequest request, URI requestUri) {
try {
+ // Canonicalize consistency headers before wire serialization.
+ // Both GW V1 (HTTP) and GW V2 (RNTBD via ThinClientStoreModel) read from
+ // request.getHeaders() — this ensures only the winning header reaches the wire.
+ resolveEffectiveConsistencyHeaders(request);
+
HttpRequest httpRequest = request
.getEffectiveHttpTransportSerializer(this)
.wrapInHttpRequest(request, requestUri);
@@ -969,6 +1036,39 @@ private Mono resolvePartitionKeyRangeByPkRangeIdCore(
});
}
+ /**
+ * Determines if the effective consistency for this request is Session — needed by
+ * {@link #applySessionToken} to decide whether to attach/remove session tokens.
+ *
+ * Pure read — no side effects, no header mutation, no HashMap copy.
+ * Resolution order: request-level readConsistencyStrategy > client-level readConsistencyStrategy
+ * (header) > consistency-level header > account default.
+ */
+ private boolean isEffectiveSessionConsistency(RxDocumentServiceRequest request) {
+ // Request-level readConsistencyStrategy takes priority
+ if (request.requestContext != null
+ && request.requestContext.readConsistencyStrategy != null
+ && request.requestContext.readConsistencyStrategy != ReadConsistencyStrategy.DEFAULT) {
+ return request.requestContext.readConsistencyStrategy == ReadConsistencyStrategy.SESSION;
+ }
+
+ // Client-level readConsistencyStrategy from header
+ String rcsHeader = request.getHeaders().get(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY);
+ if (!Strings.isNullOrEmpty(rcsHeader)
+ && !ReadConsistencyStrategy.DEFAULT.toString().equals(rcsHeader)) {
+ return ReadConsistencyStrategy.SESSION.toString().equals(rcsHeader);
+ }
+
+ // Explicit consistency level header
+ String clHeader = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
+ if (!Strings.isNullOrEmpty(clHeader)) {
+ return ConsistencyLevel.SESSION.toString().equalsIgnoreCase(clHeader);
+ }
+
+ // Fall back to account default
+ return this.gatewayServiceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION;
+ }
+
private Mono applySessionToken(RxDocumentServiceRequest request) {
Map headers = request.getHeaders();
Objects.requireNonNull(headers, "RxDocumentServiceRequest::headers is required and cannot be null");
@@ -981,8 +1081,11 @@ private Mono applySessionToken(RxDocumentServiceRequest request) {
return Mono.empty();
}
- boolean sessionConsistency = (RequestHelper.getReadConsistencyStrategyToUse(this.gatewayServiceConfigurationReader,
- request) == ReadConsistencyStrategy.SESSION);
+ // Determine if the effective consistency is Session — needed to decide whether to
+ // attach/remove session tokens. This is a pure read with no side-effects; it does NOT
+ // call RequestHelper.getReadConsistencyStrategyToUse() which mutates x-ms-consistency-level
+ // (a Direct-mode telemetry concern that is harmful in Gateway mode).
+ boolean sessionConsistency = isEffectiveSessionConsistency(request);
if (!Strings.isNullOrEmpty(request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN))) {
if (!sessionConsistency ||
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java
index 40b9b82f6e3d..ca5ed4432212 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java
@@ -27,6 +27,24 @@ public static class RntbdHealthCheckResults {
public static final String SuccessValue = "Success";
}
+ public enum RntbdReadConsistencyStrategy {
+
+ Eventual((byte) 0x01),
+ Session((byte) 0x02),
+ LatestCommitted((byte) 0x03),
+ GlobalStrong((byte) 0x04);
+
+ private final byte id;
+
+ RntbdReadConsistencyStrategy(final byte id) {
+ this.id = id;
+ }
+
+ public byte id() {
+ return this.id;
+ }
+ }
+
public enum RntbdConsistencyLevel {
Strong((byte) 0x00),
@@ -599,7 +617,8 @@ public enum RntbdRequestHeader implements RntbdHeader {
GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false),
ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false),
PopulateQueryAdvice((short) 0x00DA, RntbdTokenType.Byte, false),
- HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false);
+ HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false),
+ ReadConsistencyStrategy((short)0x00FE, RntbdTokenType.Byte, false);
public static final List thinClientHeadersInOrderList = Arrays.asList(
EffectivePartitionKey,
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
index 41b820a1ad95..96a472c45763 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
@@ -18,6 +18,8 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.EnumUtils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.azure.cosmos.models.IndexingDirective;
import com.azure.cosmos.models.PriorityLevel;
import com.fasterxml.jackson.annotation.JsonFilter;
@@ -48,6 +50,7 @@
@JsonFilter("RntbdToken")
final class RntbdRequestHeaders extends RntbdTokenStream {
+ private static final Logger logger = LoggerFactory.getLogger(RntbdRequestHeaders.class);
private static ImplementationBridgeHelpers.PriorityLevelHelper.PriorityLevelAccessor priorityLevelAccessor() {
return ImplementationBridgeHelpers.PriorityLevelHelper.getPriorityLevelAccessor();
@@ -139,6 +142,7 @@ private static ImplementationBridgeHelpers.PriorityLevelHelper.PriorityLevelAcce
this.addThroughputBucket(headers);
this.addPopulateQueryAdvice(headers);
this.addHubRegionProcessingOnly(headers);
+ this.addReadConsistencyStrategy(headers);
// Normal headers (Strings, Ints, Longs, etc.)
@@ -828,6 +832,37 @@ private void addHubRegionProcessingOnly(final Map headers) {
}
}
+ private RntbdToken getReadConsistencyStrategy() {
+ return this.get(RntbdRequestHeader.ReadConsistencyStrategy);
+ }
+
+ private void addReadConsistencyStrategy(final Map headers) {
+ final String value = headers.get(HttpHeaders.READ_CONSISTENCY_STRATEGY);
+
+ if (StringUtils.isNotEmpty(value)) {
+ switch (value) {
+ case "Eventual":
+ this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.Eventual.id());
+ break;
+ case "Session":
+ this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.Session.id());
+ break;
+ case "LatestCommitted":
+ this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.LatestCommitted.id());
+ break;
+ case "GlobalStrong":
+ this.getReadConsistencyStrategy().setValue(RntbdConstants.RntbdReadConsistencyStrategy.GlobalStrong.id());
+ break;
+ default:
+ if (!"Default".equals(value)) {
+ throw new IllegalArgumentException(
+ "Unknown ReadConsistencyStrategy value '" + value + "' — cannot encode in RNTBD frame");
+ }
+ break;
+ }
+ }
+ }
+
private void addGlobalDatabaseAccountName(final Map headers)
{
final String value = headers.get(HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java
index 519dfaecebaf..4e8d26112316 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java
@@ -300,20 +300,26 @@ public Map createCommonHeadersAsync(CosmosQueryRequestOptions co
if (cosmosQueryRequestOptions.getReadConsistencyStrategy() != null) {
- String readConsistencyStrategyName = cosmosQueryRequestOptions.getReadConsistencyStrategy().toString();
- this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
- requestHeaders.put(HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY, readConsistencyStrategyName);
+ this.client.validateReadConsistencyStrategy(cosmosQueryRequestOptions.getReadConsistencyStrategy());
+
+ if (cosmosQueryRequestOptions.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) {
+ requestHeaders.put(
+ HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
+ cosmosQueryRequestOptions.getReadConsistencyStrategy().toString());
+ }
consistencyLevelOverrideApplicable =
cosmosQueryRequestOptions.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT;
}
if (consistencyLevelOverrideApplicable && this.client.getReadConsistencyStrategy() != null) {
- String readConsistencyStrategyName = this.client.getReadConsistencyStrategy().toString();
- this.client.validateAndLogNonDefaultReadConsistencyStrategy(readConsistencyStrategyName);
- requestHeaders.put(
- HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
- readConsistencyStrategyName);
+ this.client.validateReadConsistencyStrategy(this.client.getReadConsistencyStrategy());
+
+ if (this.client.getReadConsistencyStrategy() != ReadConsistencyStrategy.DEFAULT) {
+ requestHeaders.put(
+ HttpConstants.HttpHeaders.READ_CONSISTENCY_STRATEGY,
+ this.client.getReadConsistencyStrategy().toString());
+ }
consistencyLevelOverrideApplicable =
this.client.getReadConsistencyStrategy() == ReadConsistencyStrategy.DEFAULT;
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java
index 8555efc5487a..91679d40490e 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java
@@ -74,7 +74,7 @@ Mono executeFeedOperationWithAvailabilityStrategy(
ConsistencyLevel getConsistencyLevel();
- void validateAndLogNonDefaultReadConsistencyStrategy(String readConsistencyStrategyName);
+ void validateReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy);
///
/// A client query compatibility mode when making query request.