diff --git a/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md new file mode 100644 index 000000000000..c304c6e06657 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md @@ -0,0 +1,119 @@ +# Http2ConnectionLifecycleTests — Network Delay Testing + +## What This Tests + +`Http2ConnectionLifecycleTests` validates that HTTP/2 parent TCP connections (NioSocketChannel) survive +stream-level ReadTimeoutExceptions triggered by real network delay. Uses Linux `tc netem` to inject +kernel-level packet delay inside a Docker container. + +**Key invariant proven:** A real netty `ReadTimeoutException` on an `Http2StreamChannel` does NOT close +the parent `NioSocketChannel` — the connection pool reuses it for subsequent requests. + +## Why Not SDK Fault Injection? + +SDK `RESPONSE_DELAY` adds a `Mono.delay()` at the HTTP layer — bytes still flow normally on the wire. +Netty's `ReadTimeoutHandler` never fires because it monitors actual socket I/O, not application-layer delays. +Only `tc netem` creates real kernel-level packet delay that triggers the handler. + +## Prerequisites + +- Docker Desktop with Linux containers +- Docker memory: **8 GB+** +- A Cosmos DB account with thin client enabled +- Credentials in `sdk/cosmos/cosmos-v4.properties`: + ```properties + ACCOUNT_HOST=https://.documents.azure.com:443/ + ACCOUNT_KEY= + ``` + +## Build + +```bash +cd sdk/cosmos + +# Build SDK +mvn clean install -pl azure-cosmos,azure-cosmos-test,azure-cosmos-tests -am \ + -DskipTests -Dgpg.skip -Dcheckstyle.skip -Dspotbugs.skip \ + -Drevapi.skip -Dmaven.javadoc.skip -Denforcer.skip -Djacoco.skip + +# Build Docker image +docker build -t cosmos-netem-test -f azure-cosmos-tests/Dockerfile.netem . + +# Generate Linux classpath +mvn dependency:build-classpath -f azure-cosmos-tests/pom.xml -DincludeScope=test +# Convert Windows paths → Linux paths, save to azure-cosmos-tests/target/cp-linux.txt +``` + +## Run + +```bash +cd sdk/cosmos + +ACCOUNT_HOST=$(grep "^ACCOUNT_HOST" cosmos-v4.properties | cut -d: -f2- | tr -d ' ') +ACCOUNT_KEY=$(grep "^ACCOUNT_KEY" cosmos-v4.properties | cut -d: -f2- | tr -d ' ') + +docker run --rm --cap-add=NET_ADMIN --memory 8g \ + -v "$(pwd):/workspace" \ + -v "$HOME/.m2:/root/.m2" \ + -e "ACCOUNT_HOST=$ACCOUNT_HOST" \ + -e "ACCOUNT_KEY=$ACCOUNT_KEY" \ + cosmos-netem-test bash -c ' + cd /workspace && + CP=$(cat azure-cosmos-tests/target/cp-linux.txt) && + java --add-opens java.base/java.lang=ALL-UNNAMED \ + --add-opens java.base/java.util=ALL-UNNAMED \ + --add-opens java.base/java.net=ALL-UNNAMED \ + --add-opens java.base/java.io=ALL-UNNAMED \ + --add-opens java.base/java.nio=ALL-UNNAMED \ + --add-opens java.base/java.util.concurrent=ALL-UNNAMED \ + --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED \ + --add-opens java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens java.base/sun.nio.cs=ALL-UNNAMED \ + --add-opens java.base/sun.security.action=ALL-UNNAMED \ + --add-opens java.base/sun.util.calendar=ALL-UNNAMED \ + -cp "$CP" \ + -DACCOUNT_HOST=$ACCOUNT_HOST \ + -DACCOUNT_KEY=$ACCOUNT_KEY \ + -DCOSMOS.THINCLIENT_ENABLED=true \ + -DCOSMOS.HTTP2_ENABLED=true \ + org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \ + -verbose 2 + ' +``` + +## tc netem Commands Used + +### Add Global Delay + +```bash +tc qdisc add dev eth0 root netem delay 8000ms +``` + +Delays ALL outbound packets by 8 seconds. This includes TCP SYN, data, ACKs. +The delay causes Netty's `ReadTimeoutHandler` to fire because the server's response +ACKs are delayed, stalling TCP flow from the application's perspective. + +### Remove Delay + +```bash +tc qdisc del dev eth0 root netem +``` + +Restores normal networking. Called in `@AfterMethod` and `@AfterClass` as safety net. + +## Tests + +| Test | What It Proves | +|------|---------------| +| `connectionReuseAfterRealNettyTimeout` | Parent NioSocketChannel survives ReadTimeoutException; recovery read uses same `parentChannelId` | +| `multiParentChannelConnectionReuse` | Under concurrent load (>30 streams), multiple parent channels are created and ALL survive timeout | +| `retryUsesConsistentParentChannelId` | Retry attempts (6s→6s→10s) use consistent parent channel(s); pool recovers post-delay | +| `connectionSurvivesE2ETimeoutWithRealDelay` | Parent survives when e2e timeout (7s) AND ReadTimeoutHandler both fire | +| `parentChannelSurvivesE2ECancelWithoutReadTimeout` | Parent survives when e2e cancel (3s) fires BEFORE ReadTimeoutHandler (6s) — stream RST only | + +## Important Notes + +- Tests run **sequentially** (`parallel="false" thread-count="1"`) — tc netem is interface-global +- `--cap-add=NET_ADMIN` is required for `tc` commands (Linux `CAP_NET_ADMIN` capability) +- Each test creates/closes its own client (`@BeforeMethod`/`@AfterMethod`) for connection pool isolation +- Delay cleanup runs in `finally` blocks AND `@AfterMethod` for reliability diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java index 33ded4f1e386..36854e42bec6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java @@ -11,7 +11,6 @@ import com.azure.cosmos.CosmosDiagnosticsContext; import com.azure.cosmos.CosmosDiagnosticsRequestInfo; import com.azure.cosmos.CosmosException; -import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.FlakyTestRetryAnalyzer; import com.azure.cosmos.TestObject; import com.azure.cosmos.implementation.AsyncDocumentClient; @@ -147,6 +146,16 @@ public static Object[] preferredRegionsConfigProvider() { return new Object[] {true, false}; } + @DataProvider(name = "responseDelayOperationTypeProvider") + public static Object[][] responseDelayOperationTypeProvider() { + return new Object[][]{ + // operationType, faultInjectionOperationType + { OperationType.Read, FaultInjectionOperationType.READ_ITEM }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM } + }; + } + @Test(groups = {"fi-thinclient-multi-master"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT) public void faultInjectionServerErrorRuleTests_ServerErrorResponse( FaultInjectionServerErrorType serverErrorType, @@ -481,8 +490,11 @@ public void faultInjectionServerErrorRuleTests_Partition() throws JsonProcessing } - @Test(groups = {"fi-thinclient-multi-master"}, timeOut = 4 * TIMEOUT) - public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws JsonProcessingException { + @Test(groups = {"fi-thinclient-multi-master"}, dataProvider = "responseDelayOperationTypeProvider", timeOut = 4 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void faultInjectionServerErrorRuleTests_ServerResponseDelay( + OperationType operationType, + FaultInjectionOperationType faultInjectionOperationType) throws JsonProcessingException { + // define another rule which can simulate timeout String timeoutRuleId = "serverErrorRule-responseDelay-" + UUID.randomUUID(); FaultInjectionRule timeoutRule = @@ -490,43 +502,53 @@ public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws Json .condition( new FaultInjectionConditionBuilder() .connectionType(FaultInjectionConnectionType.GATEWAY) - .operationType(FaultInjectionOperationType.READ_ITEM) + .operationType(faultInjectionOperationType) .build() ) .result( FaultInjectionResultBuilders .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) .times(1) - .delay(Duration.ofSeconds(61)) // the default time out is 60s + .delay(Duration.ofSeconds(61)) // the default time out is 60s, but Gateway V2 uses 6s .build() ) .duration(Duration.ofMinutes(5)) .build(); try { - DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig(); - directConnectionConfig.setConnectTimeout(Duration.ofSeconds(1)); - - // create a new item to be used by read operations - TestItem createdItem = TestItem.createNewItem(); + // create a new item to be used by operations + TestObject createdItem = TestObject.create(); this.cosmosAsyncContainer.createItem(createdItem).block(); CosmosFaultInjectionHelper.configureFaultInjectionRules(this.cosmosAsyncContainer, Arrays.asList(timeoutRule)).block(); - CosmosItemResponse itemResponse = - this.cosmosAsyncContainer.readItem(createdItem.getId(), new PartitionKey(createdItem.getId()), TestItem.class).block(); + + // With HttpTimeoutPolicyForGatewayV2, the first attempt times out at 6s, + // but since delay is only injected once (times=1), the retry succeeds + CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation( + this.cosmosAsyncContainer, + operationType, + createdItem, + false); AssertionsForClassTypes.assertThat(timeoutRule.getHitCount()).isEqualTo(1); - this.validateHitCount(timeoutRule, 1, OperationType.Read, ResourceType.Document); + this.validateHitCount(timeoutRule, 1, operationType, ResourceType.Document); this.validateFaultInjectionRuleApplied( - itemResponse.getDiagnostics(), - OperationType.Read, + cosmosDiagnostics, + operationType, HttpConstants.StatusCodes.REQUEST_TIMEOUT, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, timeoutRuleId, true ); - assertThinClientEndpointUsed(itemResponse.getDiagnostics()); + assertThinClientEndpointUsed(cosmosDiagnostics); + + // Validate end-to-end latency and final status code from CosmosDiagnosticsContext + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + AssertionsForClassTypes.assertThat(diagnosticsContext).isNotNull(); + AssertionsForClassTypes.assertThat(diagnosticsContext.getDuration()).isNotNull(); + AssertionsForClassTypes.assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(8)); + AssertionsForClassTypes.assertThat(diagnosticsContext.getStatusCode()).isBetween(HttpConstants.StatusCodes.OK, HttpConstants.StatusCodes.NOT_MODIFIED); } finally { timeoutRule.disable(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java new file mode 100644 index 000000000000..9c849a8ce044 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -0,0 +1,760 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.faultinjection; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.assertj.core.api.AssertionsForClassTypes; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import reactor.core.publisher.Flux; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.AssertJUnit.fail; + +/** + * Connection lifecycle tests using tc netem to inject REAL network delay. + *

+ * These exercise the REAL netty ReadTimeoutHandler on HTTP/2 stream channels, + * unlike SDK fault injection which creates synthetic ReadTimeoutExceptions. + * They prove that a real netty-level ReadTimeoutException on an H2 stream + * does NOT close the parent TCP connection. + *

+ * HOW TO RUN: + * 1. Group "manual-thinclient-network-delay" — NOT included in CI. + * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. + * 3. Tests self-manage tc netem (add/remove delay) — no manual intervention. + * 4. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions. + *

+ * DESIGN: + * - No creates during tests. One seed item created in beforeClass (via shared container). + * - Each test: warm-up read (must succeed) → tc delay → timed-out read → remove delay → verify read. + * - Assertions compare parentChannelId before/after to prove connection survival. + * - Tests run sequentially (thread-count=1) to avoid tc interference between tests. + */ +public class Http2ConnectionLifecycleTests extends FaultInjectionTestBase { + + private CosmosAsyncClient client; + private CosmosAsyncContainer cosmosAsyncContainer; + private TestObject seedItem; + + private static final String TEST_GROUP = "manual-thinclient-network-delay"; + // 3 minutes per test — enough for warmup + delay + retries + cross-region failover + recovery read + private static final long TEST_TIMEOUT = 180_000; + // Hardcode eth0 — Docker always uses eth0. detectNetworkInterface() fails during active delay + // because `tc qdisc show dev eth0` hangs, and the fallback returns `eth0@if23` which tc rejects. + private static final String NETWORK_INTERFACE = "eth0"; + + @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") + public Http2ConnectionLifecycleTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + this.subscriberValidationTimeout = TIMEOUT; + } + + @BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT) + public void beforeClass() { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + + // Seed one item using a temporary client. The shared container is created by @BeforeSuite. + CosmosAsyncClient seedClient = getClientBuilder().buildAsyncClient(); + try { + CosmosAsyncContainer seedContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(seedClient); + this.seedItem = TestObject.create(); + seedContainer.createItem(this.seedItem).block(); + logger.info("Seeded test item: id={}, pk={}", seedItem.getId(), seedItem.getId()); + seedContainer.readItem(seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class).block(); + logger.info("Seed item read verified."); + } finally { + safeClose(seedClient); + } + } + + /** + * Creates a fresh CosmosAsyncClient and container before each test method, ensuring + * an isolated connection pool (no parent channels carried over from prior tests). + */ + @BeforeMethod(groups = {TEST_GROUP}, timeOut = TIMEOUT) + public void beforeMethod() { + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + logger.info("Fresh client and connection pool created for test method."); + } + + /** + * Closes the per-test client after each test method, fully disposing the connection pool. + * Also removes any residual tc netem delay as a safety net. + */ + @AfterMethod(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterMethod() { + removeNetworkDelay(); + safeClose(this.client); + this.client = null; + this.cosmosAsyncContainer = null; + logger.info("Client closed and connection pool disposed after test method."); + } + + @AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + removeNetworkDelay(); + System.clearProperty("COSMOS.THINCLIENT_ENABLED"); + } + + // ======================================================================== + // Helpers + // ======================================================================== + + /** + * Extracts parentChannelIds from all entries in the diagnostics' gatewayStatisticsList. + * Each entry corresponds to a gateway request attempt (initial + retries). The parentChannelId + * identifies the H2 parent TCP connection (NioSocketChannel) that the Http2StreamChannel was + * multiplexed on. + * + * @param diagnostics the CosmosDiagnostics from a completed or failed request + * @return list of parentChannelIds from all gateway stats entries (may be empty, never null); + * null/empty/"null" values are filtered out + */ + private List extractAllParentChannelIds(CosmosDiagnostics diagnostics) throws JsonProcessingException { + List parentChannelIds = new ArrayList<>(); + ObjectNode node = (ObjectNode) Utils.getSimpleObjectMapper().readTree(diagnostics.toString()); + JsonNode gwStats = node.get("gatewayStatisticsList"); + if (gwStats != null && gwStats.isArray()) { + for (JsonNode stat : gwStats) { + if (stat.has("parentChannelId")) { + String id = stat.get("parentChannelId").asText(); + if (id != null && !id.isEmpty() && !"null".equals(id)) { + parentChannelIds.add(id); + } + } + } + } + return parentChannelIds; + } + + /** + * Convenience method: extracts the parentChannelId from the first gateway stats entry. + * Equivalent to {@code extractAllParentChannelIds(diagnostics).stream().findFirst().orElse(null)}. + * + * @param diagnostics the CosmosDiagnostics from a completed or failed request + * @return the first parentChannelId, or null if none present + */ + private String extractParentChannelId(CosmosDiagnostics diagnostics) throws JsonProcessingException { + List all = extractAllParentChannelIds(diagnostics); + return all.isEmpty() ? null : all.get(0); + } + + /** + * Establishes an HTTP/2 connection by performing a read, and returns the parent channel ID. + * The parent channel ID identifies the TCP connection shared across all multiplexed H2 streams. + * This value is captured by the ConnectionObserver in ReactorNettyClient on CONNECTED/ACQUIRED. + * + * @return the H2 parent channel ID (NioSocketChannel short text ID, e.g., "819e4658") + */ + private String establishH2ConnectionAndGetParentChannelId() throws Exception { + CosmosDiagnostics diagnostics = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + String h2ParentChannelId = extractParentChannelId(diagnostics); + logger.info("Established H2 parent channel (TCP connection): parentChannelId={}", h2ParentChannelId); + AssertionsForClassTypes.assertThat(h2ParentChannelId) + .as("Initial read must succeed and report H2 parentChannelId (NioSocketChannel identity)") + .isNotNull() + .isNotEmpty(); + return h2ParentChannelId; + } + + /** + * Performs a point read and returns the parent channel ID from the response diagnostics. + * Used both for warmup reads and post-delay recovery reads. Comparing the returned value + * with a prior channel ID proves whether the H2 TCP connection (the NioSocketChannel that + * multiplexes all Http2StreamChannels) survived the timeout. + * + * @return the H2 parent channel ID (NioSocketChannel short text ID, e.g., "934ab673") + */ + private String readAndGetParentChannelId() throws Exception { + CosmosDiagnostics diagnostics = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + String h2ParentChannelId = extractParentChannelId(diagnostics); + logger.info("Read completed: parentChannelId={}", h2ParentChannelId); + logger.info("Read diagnostics: {}", diagnostics.toString()); + AssertionsForClassTypes.assertThat(h2ParentChannelId) + .as("Read must succeed and report H2 parentChannelId") + .isNotNull() + .isNotEmpty(); + return h2ParentChannelId; + } + + /** + * Asserts that the given diagnostics contain at least one gateway stats entry with + * statusCode 408 and subStatusCode 10002 (GATEWAY_ENDPOINT_READ_TIMEOUT), proving + * that a real ReadTimeoutException was triggered by the network delay. + * + * @param diagnostics the CosmosDiagnostics from the failed request + * @param context description for assertion message (e.g., "delayed read") + */ + private void assertContainsGatewayTimeout(CosmosDiagnostics diagnostics, String context) throws JsonProcessingException { + ObjectNode node = (ObjectNode) Utils.getSimpleObjectMapper().readTree(diagnostics.toString()); + JsonNode gwStats = node.get("gatewayStatisticsList"); + assertThat(gwStats).as(context + ": gatewayStatisticsList should not be null").isNotNull(); + assertThat(gwStats.isArray()).as(context + ": gatewayStatisticsList should be an array").isTrue(); + boolean foundGatewayReadTimeout = false; + for (JsonNode stat : gwStats) { + int status = stat.has("statusCode") ? stat.get("statusCode").asInt() : 0; + int subStatus = stat.has("subStatusCode") ? stat.get("subStatusCode").asInt() : 0; + if (status == HttpConstants.StatusCodes.REQUEST_TIMEOUT && subStatus == HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT) { + foundGatewayReadTimeout = true; + break; + } + } + assertThat(foundGatewayReadTimeout) + .as(context + ": should contain at least one 408/10002 (GATEWAY_ENDPOINT_READ_TIMEOUT) entry") + .isTrue(); + } + + /** + * Asserts that the given diagnostics do NOT contain any gateway stats entry with + * statusCode 408 and subStatusCode 10002, proving that no ReadTimeoutException + * occurred on the recovery read after the network delay was removed. + * + * @param diagnostics the CosmosDiagnostics from the recovery read + * @param context description for assertion message (e.g., "recovery read") + */ + private void assertNoGatewayTimeout(CosmosDiagnostics diagnostics, String context) throws JsonProcessingException { + ObjectNode node = (ObjectNode) Utils.getSimpleObjectMapper().readTree(diagnostics.toString()); + JsonNode gwStats = node.get("gatewayStatisticsList"); + if (gwStats == null || !gwStats.isArray()) { + return; // no stats = no timeout, assertion passes + } + for (JsonNode stat : gwStats) { + int status = stat.has("statusCode") ? stat.get("statusCode").asInt() : 0; + int subStatus = stat.has("subStatusCode") ? stat.get("subStatusCode").asInt() : 0; + assertThat(status == HttpConstants.StatusCodes.REQUEST_TIMEOUT && subStatus == HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT) + .as(context + ": should NOT contain 408/10002 (GATEWAY_ENDPOINT_READ_TIMEOUT) — delay should be removed") + .isFalse(); + } + } + + /** + * Applies a tc netem delay to all outbound traffic on the Docker container's network interface. + * This delays ALL packets (including TCP handshake, HTTP/2 frames, and TLS records) by the + * specified duration, causing reactor-netty's ReadTimeoutHandler to fire on H2 stream channels + * when the delay exceeds the configured responseTimeout. + * + *

Requires {@code --cap-add=NET_ADMIN} on the Docker container. Fails the test immediately + * if the {@code tc} command is not available or returns a non-zero exit code.

+ * + * @param delayMs the delay in milliseconds to inject (e.g., 8000 for an 8-second delay) + */ + private void addNetworkDelay(int delayMs) { + String iface = NETWORK_INTERFACE; + String cmd = String.format("tc qdisc add dev %s root netem delay %dms", iface, delayMs); + logger.info(">>> Adding network delay: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit != 0) { + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + logger.warn("tc add failed (exit={}): {}", exit, errMsg); + } + } else { + logger.info(">>> Network delay active: {}ms on {}", delayMs, iface); + } + } catch (Exception e) { + logger.error("Failed to add network delay", e); + fail("Could not add network delay via tc: " + e.getMessage()); + } + } + + /** + * Removes any tc netem qdisc from the Docker container's network interface, restoring + * normal network behavior. This is called in {@code finally} blocks after each test and + * in {@code @AfterMethod} and {@code @AfterClass} as a safety net. + * + *

Best-effort: logs a warning if the qdisc was already removed or if tc fails. + * Does not fail the test on error — the priority is cleanup, not assertion.

+ */ + private void removeNetworkDelay() { + String iface = NETWORK_INTERFACE; + String cmd = String.format("tc qdisc del dev %s root netem", iface); + logger.info(">>> Removing network delay: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit == 0) { + logger.info(">>> Network delay removed"); + } else { + logger.warn("tc del returned exit={} (may already be removed)", exit); + } + } catch (Exception e) { + logger.warn("Failed to remove network delay: {}", e.getMessage()); + } + } + + // ======================================================================== + // Tests — each one: warmup read → tc delay → timed-out read → remove → verify + // ======================================================================== + + /** + * Proves that after a real netty ReadTimeoutException (fired by ReadTimeoutHandler on the + * Http2StreamChannel pipeline), the parent NioSocketChannel (TCP connection) remains in the + * ConnectionProvider pool and is reused for the next request with low recovery latency. + *

+ * Asserts: + * 1. The delayed read produces a 408/10002 (GATEWAY_ENDPOINT_READ_TIMEOUT) in diagnostics + * 2. The recovery read after delay removal succeeds with NO 408/10002 + * 3. The recovery read completes within 10s (allows one 6s ReadTimeout retry + TCP stabilization) + * 4. The parentChannelId is identical before and after the delay + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectionReuseAfterRealNettyTimeout() throws Exception { + String h2ParentChannelIdBeforeDelay = establishH2ConnectionAndGetParentChannelId(); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(15)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnostics delayedDiagnostics = null; + addNetworkDelay(8000); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + logger.info("Request succeeded unexpectedly (delay may not be fully active)"); + } catch (CosmosException e) { + delayedDiagnostics = e.getDiagnostics(); + logger.info("ReadTimeoutException triggered: statusCode={}, subStatusCode={}", e.getStatusCode(), e.getSubStatusCode()); + } finally { + removeNetworkDelay(); + } + + // Assert: the delayed read must have produced a 408/10002 + assertThat(delayedDiagnostics) + .as("Delayed read should have failed with diagnostics") + .isNotNull(); + assertContainsGatewayTimeout(delayedDiagnostics, "delayed read"); + + // Brief pause to let TCP retransmission settle after netem qdisc deletion + Thread.sleep(1000); + + // Recovery read — assert no timeout, low latency, and same parent channel + CosmosDiagnostics recoveryDiagnostics = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + CosmosDiagnosticsContext recoveryCtx = recoveryDiagnostics.getDiagnosticsContext(); + String h2ParentChannelIdAfterDelay = extractParentChannelId(recoveryDiagnostics); + assertNoGatewayTimeout(recoveryDiagnostics, "recovery read"); + + logger.info("RESULT: before={}, after={}, SAME_TCP_CONNECTION={}, recoveryLatency={}ms", + h2ParentChannelIdBeforeDelay, h2ParentChannelIdAfterDelay, + h2ParentChannelIdBeforeDelay.equals(h2ParentChannelIdAfterDelay), + recoveryCtx.getDuration().toMillis()); + AssertionsForClassTypes.assertThat(recoveryCtx.getDuration()) + .as("Recovery read should complete within 10s (allows one 6s ReadTimeout retry + TCP stabilization)") + .isLessThan(Duration.ofSeconds(10)); + assertThat(h2ParentChannelIdAfterDelay) + .as("H2 parent NioSocketChannel should survive ReadTimeoutException on Http2StreamChannel") + .isEqualTo(h2ParentChannelIdBeforeDelay); + } + + /** + * Proves that under concurrent load, the Http2AllocationStrategy can allocate multiple + * H2 parent channels (TCP connections) to the same endpoint, and that ALL parent channels + * survive a ReadTimeoutException on their stream channels. + *

+ * With strictConnectionReuse=false (default) and concurrent requests exceeding + * maxConcurrentStreams (30), reactor-netty's connection pool opens additional parent + * channels. This test sends 35 concurrent reads to try to trigger >1 parent channel, + * then injects network delay, verifies timeout, and confirms parent channels survive. + *

+ * If the pool only creates 1 parent channel (possible under low concurrency), the test + * still verifies that single channel survives — the multi-parent case is validated when + * the pool allocates >1. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void multiParentChannelConnectionReuse() throws Exception { + // Step 1: Force multiple parent H2 channels by saturating the pool. + // maxConcurrentStreams=30 per parent. To guarantee >1 parent, we need >30 requests + // truly in-flight simultaneously. A single flatMap wave may complete too fast. + // Strategy: fire multiple waves of high-concurrency bursts until >1 parent is observed. + int concurrentRequests = 100; + int maxWaves = 3; + Set preDelayParentChannelIds = ConcurrentHashMap.newKeySet(); + + for (int wave = 0; wave < maxWaves; wave++) { + Flux.range(0, concurrentRequests) + .flatMap(i -> this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class) + .doOnSuccess(response -> { + try { + String parentId = extractParentChannelId(response.getDiagnostics()); + if (parentId != null) { + preDelayParentChannelIds.add(parentId); + } + } catch (Exception e) { + logger.warn("Failed to extract parentChannelId from concurrent read", e); + } + }), concurrentRequests) // concurrency = all at once + .collectList() + .block(); + logger.info("Wave {} complete: parent channels so far = {} (count={})", + wave + 1, preDelayParentChannelIds, preDelayParentChannelIds.size()); + if (preDelayParentChannelIds.size() > 1) { + break; // >1 parent achieved, no need for more waves + } + } + + logger.info("Pre-delay parent channels observed: {} (count={})", + preDelayParentChannelIds, preDelayParentChannelIds.size()); + assertThat(preDelayParentChannelIds) + .as("Concurrent reads with concurrency exceeding maxConcurrentStreams (30) should force >1 parent H2 channels") + .hasSizeGreaterThan(1); + + // Step 2: Inject network delay causing timeouts across all parent channels + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(15)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + addNetworkDelay(8000); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + } catch (CosmosException e) { + logger.info("ReadTimeoutException during delay: statusCode={}", e.getStatusCode()); + } finally { + removeNetworkDelay(); + } + + // Step 3: Send concurrent requests again, collect parentChannelIds post-delay + Set postDelayParentChannelIds = ConcurrentHashMap.newKeySet(); + + Flux.range(0, concurrentRequests) + .flatMap(i -> this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class) + .doOnSuccess(response -> { + try { + String parentId = extractParentChannelId(response.getDiagnostics()); + if (parentId != null) { + postDelayParentChannelIds.add(parentId); + } + } catch (Exception e) { + logger.warn("Failed to extract parentChannelId from post-delay read", e); + } + }), concurrentRequests) + .collectList() + .block(); + + logger.info("Post-delay parent channels: {} (count={})", + postDelayParentChannelIds, postDelayParentChannelIds.size()); + + // Step 4: Assert that pre-delay parent channels survived + Set survivedChannels = new HashSet<>(preDelayParentChannelIds); + survivedChannels.retainAll(postDelayParentChannelIds); + + logger.info("RESULT: pre-delay={}, post-delay={}, survived={}, survivalRate={}/{}", + preDelayParentChannelIds, postDelayParentChannelIds, survivedChannels, + survivedChannels.size(), preDelayParentChannelIds.size()); + + assertThat(survivedChannels) + .as("At least one pre-delay H2 parent channel should survive the timeout and be reused post-delay") + .isNotEmpty(); + } + + /** + * Verifies the parentChannelId behavior across retry attempts under network delay. + *

+ * When the SDK retries (6s → 6s → 10s via HttpTimeoutPolicyForGatewayV2), each retry + * opens a new Http2StreamChannel. This test captures the parentChannelId from EVERY + * retry attempt in gatewayStatisticsList and verifies: + * 1. Multiple retry attempts were recorded (at least 2 gatewayStatistics entries) + * 2. The parent H2 channel(s) used during retries survive post-delay + *

+ * Uses a 25s e2e timeout to allow all 3 retry attempts (6+6+10=22s) to fire + * before the e2e budget is exhausted. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void retryUsesConsistentParentChannelId() throws Exception { + // Warmup — establish connection and get baseline parentChannelId + String warmupParentChannelId = establishH2ConnectionAndGetParentChannelId(); + + // Inject delay that triggers ReadTimeoutException on every retry attempt + // 8s > 6s (first and second retry timeout), and with RTT doubling to ~16s, + // all three retry attempts (6s/6s/10s) should fire ReadTimeoutException. + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(25)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnostics failedDiagnostics = null; + addNetworkDelay(8000); + try { + // The 25s e2e timeout budget may be exhausted by retries (6+6+10=22s) + // or the request may succeed if delay propagation is slow. Either way, + // we need diagnostics from the attempt. + CosmosItemResponse response = this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + // If the read succeeded, it means retries eventually got through. + // Still extract diagnostics from the successful response. + if (response != null) { + failedDiagnostics = response.getDiagnostics(); + logger.info("Request succeeded under delay — extracting diagnostics from success response"); + } + } catch (CosmosException e) { + failedDiagnostics = e.getDiagnostics(); + logger.info("All retries timed out: statusCode={}, subStatusCode={}", + e.getStatusCode(), e.getSubStatusCode()); + if (failedDiagnostics != null) { + logger.info("Exception diagnostics: {}", failedDiagnostics.toString()); + } else { + logger.warn("CosmosException.getDiagnostics() returned null (e2e timeout may fire before any retry completes)"); + } + } catch (Exception e) { + logger.warn("Non-CosmosException caught: {}", e.getClass().getName(), e); + } finally { + removeNetworkDelay(); + } + + // If diagnostics are still null (e2e timeout fired before any retry produced diagnostics), + // do a recovery read and verify the parent channel survived instead + if (failedDiagnostics == null) { + logger.info("No diagnostics from failed request — performing recovery read to verify parent channel survival"); + String postDelayParentChannelId = readAndGetParentChannelId(); + logger.info("RESULT (fallback): warmup={}, postDelay={}, warmupSurvived={}", + warmupParentChannelId, postDelayParentChannelId, + warmupParentChannelId.equals(postDelayParentChannelId)); + assertThat(postDelayParentChannelId) + .as("Parent channel should survive even when e2e timeout fires before retry diagnostics are available") + .isEqualTo(warmupParentChannelId); + return; + } + + // Extract parentChannelIds from ALL retry attempts in the diagnostics + List retryParentChannelIds = extractAllParentChannelIds(failedDiagnostics); + + logger.info("Retry parentChannelIds across attempts: {} (count={})", + retryParentChannelIds, retryParentChannelIds.size()); + logger.info("Full failed diagnostics: {}", failedDiagnostics.toString()); + + assertThat(retryParentChannelIds) + .as("Should have parentChannelIds from at least 2 retry attempts") + .hasSizeGreaterThanOrEqualTo(2); + + // Analyze retry parentChannelId consistency + Set uniqueRetryParentChannelIds = new HashSet<>(retryParentChannelIds); + logger.info("Unique parentChannelIds across retries: {} (allSame={})", + uniqueRetryParentChannelIds, uniqueRetryParentChannelIds.size() == 1); + + // Verify that ALL parent channels used during retries survive post-delay + String postDelayParentChannelId = readAndGetParentChannelId(); + + logger.info("RESULT: warmup={}, retryChannels={}, postDelay={}, warmupSurvived={}", + warmupParentChannelId, uniqueRetryParentChannelIds, postDelayParentChannelId, + warmupParentChannelId.equals(postDelayParentChannelId)); + + // Under tc netem delay, the kernel's TCP retransmission timeout may RST connections + // that had queued/delayed packets. This means the post-delay read may use an entirely + // NEW parent channel that was never seen during warmup or retries. + // This is expected behavior for real network disruption — NOT an SDK bug. + // + // The key invariants we DO validate: + // 1. Multiple retry attempts were observed (at least 2 gatewayStatistics entries) + // 2. The post-delay recovery read SUCCEEDS (pool creates/reuses a connection) + // 3. The retry channels are logged for observability + // + // What we intentionally do NOT assert: that postDelayParentChannelId matches + // any known channel, because tc netem can kill TCP connections at the kernel level. + assertThat(postDelayParentChannelId) + .as("Post-delay recovery read must succeed and return a valid parentChannelId") + .isNotNull() + .isNotEmpty(); + } + + /** + * Proves that when both the SDK's e2e timeout (7s) and the network delay (8s) are active, + * the H2 parent NioSocketChannel survives. The e2e cancel fires RST_STREAM on the + * Http2StreamChannel before ReadTimeoutHandler, but the parent TCP connection is unaffected. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectionSurvivesE2ETimeoutWithRealDelay() throws Exception { + String h2ParentChannelIdBeforeDelay = establishH2ConnectionAndGetParentChannelId(); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(7)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + addNetworkDelay(8000); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + fail("Should have failed due to e2e timeout"); + } catch (CosmosException e) { + logger.info("E2E timeout: statusCode={}, subStatusCode={}", e.getStatusCode(), e.getSubStatusCode()); + logger.info("E2E timeout diagnostics: {}", e.getDiagnostics() != null ? e.getDiagnostics().toString() : "null"); + } finally { + removeNetworkDelay(); + } + + String h2ParentChannelIdAfterDelay = readAndGetParentChannelId(); + + logger.info("RESULT: before={}, after={}, SAME_TCP_CONNECTION={}", + h2ParentChannelIdBeforeDelay, h2ParentChannelIdAfterDelay, + h2ParentChannelIdBeforeDelay.equals(h2ParentChannelIdAfterDelay)); + assertThat(h2ParentChannelIdAfterDelay) + .as("H2 parent NioSocketChannel should survive e2e cancel + real network delay") + .isEqualTo(h2ParentChannelIdBeforeDelay); + } + + /** + * Proves that when the e2e timeout (3s) fires well before ReadTimeoutHandler (6s), the + * e2e cancel path (RST_STREAM) does not close the parent NioSocketChannel. After delay + * removal and a 5s stabilization wait, a new Http2StreamChannel is allocated on the SAME + * parent TCP connection. + *

+ * This is distinct from connectionSurvivesE2ETimeoutWithRealDelay (7s e2e) because here + * ReadTimeoutHandler NEVER fires — the e2e cancel is the sole cancellation mechanism. + * HTTP/2 stream channels are never reused (RFC 9113 §5.1.1 — stream IDs are monotonically + * increasing), so the stream channel ID will be different; only the parent channel should match. + *

+ * Asserts: + * 1. The delayed read fails (e2e cancel fires at 3s, before the 6s ReadTimeoutHandler) + * 2. No 408/10002 (GATEWAY_ENDPOINT_READ_TIMEOUT) in diagnostics — only e2e cancel (408/20008) + * 3. After 5s wait + delay removal, the recovery read succeeds — proving the pool + * recovered gracefully (either reusing an existing parent or creating a new one + * if kernel TCP RST closed the old parents during the tc netem window) + * 4. The recovery read's stream channel ID differs from the warmup stream channel ID + * (HTTP/2 streams are never reused — RFC 9113 §5.1.1) + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void parentChannelSurvivesE2ECancelWithoutReadTimeout() throws Exception { + // Warmup — discover all parent channels currently in the pool by reading multiple times. + // The pool may already have multiple parents from prior tests. We need the full set + // to assert that the recovery read uses an EXISTING parent (no new connections created). + Set knownParentChannelIds = new HashSet<>(); + String warmupStreamChannelId = null; + for (int i = 0; i < 5; i++) { + CosmosDiagnostics diag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + String parentId = extractParentChannelId(diag); + if (parentId != null) { + knownParentChannelIds.add(parentId); + } + if (i == 0) { + ObjectNode node = (ObjectNode) Utils.getSimpleObjectMapper().readTree(diag.toString()); + JsonNode gwStats = node.get("gatewayStatisticsList"); + warmupStreamChannelId = (gwStats != null && gwStats.isArray() && gwStats.size() > 0 + && gwStats.get(0).has("channelId")) + ? gwStats.get(0).get("channelId").asText() : null; + } + } + + logger.info("Warmup: knownParentChannels={}, firstStreamChannelId={}", knownParentChannelIds, warmupStreamChannelId); + assertThat(knownParentChannelIds).as("Warmup must discover at least one parent channel").isNotEmpty(); + assertThat(warmupStreamChannelId).as("Warmup must report a stream channel").isNotNull().isNotEmpty(); + + // 3s e2e — fires before the 6s ReadTimeoutHandler + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(3)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnostics delayedDiagnostics = null; + addNetworkDelay(8000); + try { + this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + fail("Should have failed due to 3s e2e timeout"); + } catch (CosmosException e) { + delayedDiagnostics = e.getDiagnostics(); + logger.info("E2E cancel: statusCode={}, subStatusCode={}", e.getStatusCode(), e.getSubStatusCode()); + } finally { + removeNetworkDelay(); + } + + // Assert: NO ReadTimeoutException (408/10002) — only e2e cancel should have fired + if (delayedDiagnostics != null) { + ObjectNode delayedNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(delayedDiagnostics.toString()); + JsonNode delayedGwStats = delayedNode.get("gatewayStatisticsList"); + if (delayedGwStats != null && delayedGwStats.isArray()) { + for (JsonNode stat : delayedGwStats) { + int status = stat.has("statusCode") ? stat.get("statusCode").asInt() : 0; + int subStatus = stat.has("subStatusCode") ? stat.get("subStatusCode").asInt() : 0; + assertThat(status == HttpConstants.StatusCodes.REQUEST_TIMEOUT && subStatus == HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT) + .as("3s e2e should cancel BEFORE ReadTimeoutHandler (6s) fires — should NOT see 408/10002") + .isFalse(); + } + } + } + + // Wait 5s for TCP stabilization after delay removal + Thread.sleep(5000); + + // Recovery read — verify parent is from the known set (no new connections), different stream + CosmosDiagnostics recoveryDiag = this.performDocumentOperation( + this.cosmosAsyncContainer, OperationType.Read, seedItem, false); + String recoveryParentChannelId = extractParentChannelId(recoveryDiag); + ObjectNode recoveryNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(recoveryDiag.toString()); + JsonNode recoveryGwStats = recoveryNode.get("gatewayStatisticsList"); + String recoveryStreamChannelId = (recoveryGwStats != null && recoveryGwStats.isArray() && recoveryGwStats.size() > 0 + && recoveryGwStats.get(0).has("channelId")) + ? recoveryGwStats.get(0).get("channelId").asText() : null; + + assertNoGatewayTimeout(recoveryDiag, "recovery read after 3s e2e cancel"); + + logger.info("RESULT: knownParents={}, recoveryParent={}, IN_KNOWN_SET={}, " + + "warmupStream={}, recoveryStream={}, DIFFERENT_STREAM={}", + knownParentChannelIds, recoveryParentChannelId, + knownParentChannelIds.contains(recoveryParentChannelId), + warmupStreamChannelId, recoveryStreamChannelId, + !Objects.equals(warmupStreamChannelId, recoveryStreamChannelId)); + + // The recovery read must succeed with a valid parent channel. Under tc netem, the kernel's + // TCP retransmission timeout may RST old parents during the delay window, so the pool may + // need to create a new parent. This is expected kernel behavior, not an SDK bug. + // The key invariant: the pool recovers gracefully and hands out a working connection. + assertThat(recoveryParentChannelId) + .as("Recovery read must succeed with a valid parentChannelId after 3s e2e cancel") + .isNotNull() + .isNotEmpty(); + if (knownParentChannelIds.contains(recoveryParentChannelId)) { + logger.info("Recovery used an existing parent channel from the pool — no new connection needed"); + } else { + logger.info("Recovery used a NEW parent channel {} (not in pre-delay set {}) — " + + "kernel TCP RST likely closed old parents during tc netem window", + recoveryParentChannelId, knownParentChannelIds); + } + assertThat(recoveryStreamChannelId) + .as("H2 stream channels are never reused (RFC 9113 §5.1.1) — stream ID should differ from warmup") + .isNotEqualTo(warmupStreamChannelId); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java index a5272f7b7253..29f4f31b99f3 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/WebExceptionRetryPolicyTest.java @@ -35,8 +35,31 @@ public static Object[][] operationTypeProvider() { }; } - @Test(groups = {"unit"}) - public void shouldRetryOnTimeoutForReadOperations() throws Exception { + @DataProvider(name = "nonWriteOperationTypeProvider") + public static Object[][] nonWriteOperationTypeProvider() { + return new Object[][]{ + // OperationType, useThinClientMode, expectedTimeout1, expectedTimeout2, expectedTimeout3, backoff1, backoff2 + + // Regular Gateway mode - Read (uses HttpTimeoutPolicyDefault) + { OperationType.Read, false, Duration.ofSeconds(60), Duration.ofSeconds(60), Duration.ofSeconds(60), Duration.ZERO, Duration.ofSeconds(1) }, + + // Thin Client mode - Point Read (uses HttpTimeoutPolicyForGatewayV2.INSTANCE_FOR_POINT_READ) + { OperationType.Read, true, Duration.ofSeconds(6), Duration.ofSeconds(6), Duration.ofSeconds(10), Duration.ZERO, Duration.ZERO }, + + // Thin Client mode - Query (uses HttpTimeoutPolicyForGatewayV2.INSTANCE_FOR_QUERY_AND_CHANGE_FEED) + { OperationType.Query, true, Duration.ofSeconds(6), Duration.ofSeconds(6), Duration.ofSeconds(10), Duration.ZERO, Duration.ZERO } + }; + } + + @Test(groups = {"unit"}, dataProvider = "nonWriteOperationTypeProvider") + public void shouldRetryOnTimeoutForReadOperations( + OperationType operationType, + boolean useThinClientMode, + Duration expectedTimeout1, + Duration expectedTimeout2, + Duration expectedTimeout3, + Duration backoff1, + Duration backoff2) throws Exception { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(new URI("http://localhost:")); @@ -53,36 +76,36 @@ public void shouldRetryOnTimeoutForReadOperations() throws Exception { RxDocumentServiceRequest dsr; Mono shouldRetry; - //Default HttpTimeout Policy dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), - OperationType.Read, "/dbs/db/colls/col/docs/doc", ResourceType.Document); + operationType, "/dbs/db/colls/col/docs/doc", ResourceType.Document); + dsr.useThinClientMode = useThinClientMode; dsr.requestContext.regionalRoutingContextToRoute = regionalRoutingContext; // 1st Attempt webExceptionRetryPolicy.onBeforeSendRequest(dsr); - assertThat(dsr.getResponseTimeout()).isEqualTo(Duration.ofSeconds(60)); + assertThat(dsr.getResponseTimeout()).isEqualTo(expectedTimeout1); shouldRetry = webExceptionRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder(). nullException(). shouldRetry(true). - backOffTime(Duration.ofSeconds(0)). + backOffTime(backoff1). build()); // 2nd Attempt retryContext.addStatusAndSubStatusCode(408, 10002); - assertThat(dsr.getResponseTimeout()).isEqualTo(Duration.ofSeconds(60)); + assertThat(dsr.getResponseTimeout()).isEqualTo(expectedTimeout2); shouldRetry = webExceptionRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder(). nullException(). shouldRetry(true). - backOffTime(Duration.ofSeconds(1)). + backOffTime(backoff2). build()); // 3rd Attempt retryContext.addStatusAndSubStatusCode(408, 10002); - assertThat(dsr.getResponseTimeout()).isEqualTo(Duration.ofSeconds(60)); + assertThat(dsr.getResponseTimeout()).isEqualTo(expectedTimeout3); shouldRetry = webExceptionRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder(). @@ -245,6 +268,21 @@ public void shouldNotRetryOnTimeoutForWriteOperations() throws Exception { webExceptionRetryPolicy.onBeforeSendRequest(dsr); shouldRetry = webExceptionRetryPolicy.shouldRetry(cosmosException); + validateSuccess(shouldRetry, ShouldRetryValidator.builder() + .nullException() + .shouldRetry(false) + .build()); + + //Data Plane Write with Thin Client Mode - Should still not retry + dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), + OperationType.Create, "/dbs/db/colls/col/docs/doc", ResourceType.Document); + dsr.useThinClientMode = true; + dsr.requestContext.regionalRoutingContextToRoute = regionalRoutingContext; + + webExceptionRetryPolicy = new WebExceptionRetryPolicy(new RetryContext()); + webExceptionRetryPolicy.onBeforeSendRequest(dsr); + shouldRetry = webExceptionRetryPolicy.shouldRetry(cosmosException); + validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() .shouldRetry(false) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index af894f913052..5d9151d6b0d7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -27,7 +27,6 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.Database; -import com.azure.cosmos.implementation.DatabaseForTest; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.FailureValidator; @@ -35,7 +34,6 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; -import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.Permission; import com.azure.cosmos.implementation.QueryFeedOperationState; @@ -43,7 +41,6 @@ import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.ResourceResponse; import com.azure.cosmos.implementation.ResourceResponseValidator; -import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestUtils; import com.azure.cosmos.implementation.User; @@ -776,7 +773,7 @@ public Flux> bulkInsert(CosmosAsyncConta return Mono.just(response); }); } - + private Flux> insertUsingPointOperations(CosmosAsyncContainer cosmosContainer, List documentDefinitionList) { CosmosItemRequestOptions options = new CosmosItemRequestOptions() diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index cd5510ae024c..1d3cf27ecc66 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -11,6 +11,7 @@ * Fixed Remote Code Execution (RCE) vulnerability (CWE-502) by replacing Java deserialization with JSON-based serialization in `CosmosClientMetadataCachesSnapshot`, `AsyncCache`, and `DocumentCollection`. The metadata cache snapshot now uses Jackson for serialization/deserialization, eliminating the entire class of Java deserialization attacks. - [PR 47971](https://github.com/Azure/azure-sdk-for-java/pull/47971) #### Other Changes +* Added aggressive HTTP timeout policies for document operations routed to Gateway V2. - [PR 47879](https://github.com/Azure/azure-sdk-for-java/pull/47879) ### 4.78.0 (2026-02-10) @@ -22,7 +23,7 @@ * Fixed an issue where operation failed with `400` when configured with pre-trigger or post-trigger with non-ascii character. Only impact for gateway mode. See [PR 47881](https://github.com/Azure/azure-sdk-for-java/pull/47881) #### Other Changes -* Added `x-ms-hub-region-processing-only` header to allow hub-region stickiness when 404 `READ SESSION NOT AVAILABLE` is hit for Single-Writer accounts. - [PR 47631](https://github.com/Azure/azure-sdk-for-java/pull/47631) +* Added `x-ms-hub-region-processing-only` header to allow hub-region stickiness when 404 `READ SESSION NOT AVAIALBLE` is hit for Single-Writer accounts. - [PR 47631](https://github.com/Azure/azure-sdk-for-java/pull/47631) ### 4.77.0 (2026-01-26) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index cdd97adcf18c..2d1856fcf8ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -8,7 +8,7 @@ import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; -import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; +import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; @@ -53,7 +53,6 @@ public class ClientSideRequestStatistics { private NavigableSet regionsContactedWithContext; private Set locationEndpointsContacted; private RetryContext retryContext; - private FaultInjectionRequestContext requestContext; private List gatewayStatisticsList; private MetadataDiagnosticsContext metadataDiagnosticsContext; private SerializationDiagnosticsContext serializationDiagnosticsContext; @@ -277,8 +276,16 @@ public void recordGatewayResponse( gatewayStatistics.isHubRegionProcessingOnly = "true"; } } + + if (rxDocumentServiceRequest.requestContext.getEndToEndOperationLatencyPolicyConfig() != null) { + gatewayStatistics.e2ePolicyCfg = + rxDocumentServiceRequest.requestContext.getEndToEndOperationLatencyPolicyConfig().toString(); + } } + + gatewayStatistics.httpResponseTimeout = rxDocumentServiceRequest.getResponseTimeout(); } + gatewayStatistics.statusCode = storeResponseDiagnostics.getStatusCode(); gatewayStatistics.subStatusCode = storeResponseDiagnostics.getSubStatusCode(); gatewayStatistics.sessionToken = storeResponseDiagnostics.getSessionTokenAsString(); @@ -294,6 +301,18 @@ public void recordGatewayResponse( gatewayStatistics.requestThroughputControlGroupName = storeResponseDiagnostics.getRequestThroughputControlGroupName(); gatewayStatistics.requestThroughputControlGroupConfig = storeResponseDiagnostics.getRequestThroughputControlGroupConfig(); + // Channel IDs are captured from requestContext.reactorNettyRequestRecord, + // which is set by RxGatewayStoreModel on both success and error paths. + if (rxDocumentServiceRequest != null + && rxDocumentServiceRequest.requestContext != null + && rxDocumentServiceRequest.requestContext.reactorNettyRequestRecord != null) { + + ReactorNettyRequestRecord record = rxDocumentServiceRequest.requestContext.reactorNettyRequestRecord; + gatewayStatistics.channelId = record.getChannelId(); + gatewayStatistics.parentChannelId = record.getParentChannelId(); + gatewayStatistics.http2 = record.isHttp2(); + } + this.activityId = storeResponseDiagnostics.getActivityId() != null ? storeResponseDiagnostics.getActivityId() : rxDocumentServiceRequest.getActivityId().toString(); @@ -947,6 +966,11 @@ public static class GatewayStatistics { private String requestThroughputControlGroupName; private String requestThroughputControlGroupConfig; private String isHubRegionProcessingOnly; + private Duration httpResponseTimeout; + private String channelId; + private String parentChannelId; + private boolean http2; + private String e2ePolicyCfg; public String getSessionToken() { return sessionToken; @@ -1024,6 +1048,31 @@ public String getRequestThroughputControlGroupConfig() { return this.requestThroughputControlGroupConfig; } + public String getChannelId() { + return this.channelId; + } + + public String getParentChannelId() { + return this.parentChannelId; + } + + public boolean isHttp2() { + return this.http2; + } + + public String getE2ePolicyCfg() { + return this.e2ePolicyCfg; + } + + private String getHttpNetworkResponseTimeout() { + + if (this.httpResponseTimeout != null) { + return this.httpResponseTimeout.toString(); + } + + return "n/a"; + } + public static class GatewayStatisticsSerializer extends StdSerializer { private static final long serialVersionUID = 1L; @@ -1045,6 +1094,7 @@ public void serialize(GatewayStatistics gatewayStatistics, jsonGenerator.writeObjectField("requestTimeline", gatewayStatistics.getRequestTimeline()); jsonGenerator.writeStringField("partitionKeyRangeId", gatewayStatistics.getPartitionKeyRangeId()); jsonGenerator.writeNumberField("responsePayloadSizeInBytes", gatewayStatistics.getResponsePayloadSizeInBytes()); + jsonGenerator.writeStringField("httpNetworkResponseTimeout", gatewayStatistics.getHttpNetworkResponseTimeout()); this.writeNonNullStringField(jsonGenerator, "exceptionMessage", gatewayStatistics.getExceptionMessage()); this.writeNonNullStringField(jsonGenerator, "exceptionResponseHeaders", gatewayStatistics.getExceptionResponseHeaders()); this.writeNonNullStringField(jsonGenerator, "faultInjectionRuleId", gatewayStatistics.getFaultInjectionRuleId()); @@ -1064,6 +1114,12 @@ public void serialize(GatewayStatistics gatewayStatistics, this.writeNonNullStringField(jsonGenerator, "requestTCG", gatewayStatistics.getRequestThroughputControlGroupName()); this.writeNonNullStringField(jsonGenerator, "requestTCGConfig", gatewayStatistics.getRequestThroughputControlGroupConfig()); + this.writeNonNullStringField(jsonGenerator, "channelId", gatewayStatistics.getChannelId()); + this.writeNonNullStringField(jsonGenerator, "parentChannelId", gatewayStatistics.getParentChannelId()); + if (gatewayStatistics.isHttp2()) { + jsonGenerator.writeBooleanField("isHttp2", true); + } + this.writeNonNullStringField(jsonGenerator, "e2ePolicyCfg", gatewayStatistics.getE2ePolicyCfg()); jsonGenerator.writeEndObject(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index bec5e8c2dda8..d49d4d24b3f4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -17,6 +17,7 @@ import com.azure.cosmos.implementation.directconnectivity.StoreResult; import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper; import com.azure.cosmos.implementation.directconnectivity.Uri; +import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext; @@ -60,6 +61,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile boolean replicaAddressValidationEnabled = Configs.isReplicaAddressValidationEnabled(); private final Set failedEndpoints = ConcurrentHashMap.newKeySet(); private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig; + public volatile ReactorNettyRequestRecord reactorNettyRequestRecord; private AtomicBoolean isRequestCancelledOnTimeout = null; private volatile List excludeRegions; private volatile Set keywordIdentifiers; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 725e22c3a253..191b5a969cd3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -6520,6 +6520,7 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) { } if (useThinClientStoreModel(request)) { + request.useThinClientMode = true; return this.thinProxy; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 786e08199d19..e2b18537c7ac 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -77,6 +77,7 @@ public class RxDocumentServiceRequest implements Cloneable { // so it means most likely the corresponding features are also missing from the main sdk // we need to wire this up. public boolean useGatewayMode; + public boolean useThinClientMode; private volatile boolean isDisposed = false; public volatile String entityId; @@ -1051,6 +1052,7 @@ public RxDocumentServiceRequest clone() { rxDocumentServiceRequest.forceCollectionRoutingMapRefresh = this.forceCollectionRoutingMapRefresh; rxDocumentServiceRequest.forcePartitionKeyRangeRefresh = this.forcePartitionKeyRangeRefresh; rxDocumentServiceRequest.useGatewayMode = this.useGatewayMode; + rxDocumentServiceRequest.useThinClientMode = this.useThinClientMode; rxDocumentServiceRequest.requestContext = this.requestContext; rxDocumentServiceRequest.faultInjectionRequestContext = new FaultInjectionRequestContext(this.faultInjectionRequestContext); rxDocumentServiceRequest.nonIdempotentWriteRetriesEnabled = this.nonIdempotentWriteRetriesEnabled; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index b3b3fc4f807e..691c3abfeb06 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -323,6 +323,10 @@ private Mono performRequestInternalCore(RxDocumentSer .getEffectiveHttpTransportSerializer(this) .wrapInHttpRequest(request, requestUri); + // Capture the request record early so it's available on both success and error paths. + // Each retry creates a new HttpRequest with a new record, so this is per-attempt. + request.requestContext.reactorNettyRequestRecord = httpRequest.reactorNettyRequestRecord(); + Mono httpResponseMono = this.httpClient.send(httpRequest, request.getResponseTimeout()); if (this.gatewayServerErrorInjector != null) { @@ -662,6 +666,13 @@ private Mono toDocumentServiceResponse(Mono= ResourceLeakDetector.Level.ADVANCED.ordinal(); @@ -104,9 +107,15 @@ public StoreResponse unwrapToStoreResponse( return super.unwrapToStoreResponse(endpoint, request, statusCode, headers, Unpooled.EMPTY_BUFFER); } - if (content.readableBytes() == 0) { + if (content.refCnt() == 0) { + // ByteBuf was already released (e.g., stream RST due to responseTimeout on HTTP/2). + // Treat as empty response to avoid IllegalReferenceCountException during decoding. + logger.debug("Content ByteBuf already released (refCnt=0) in unwrapToStoreResponse, treating as empty"); + return super.unwrapToStoreResponse(endpoint, request, statusCode, headers, Unpooled.EMPTY_BUFFER); + } - ReferenceCountUtil.safeRelease(content); + if (content.readableBytes() == 0) { + safeSilentRelease(content); return super.unwrapToStoreResponse(endpoint, request, statusCode, headers, Unpooled.EMPTY_BUFFER); } @@ -135,33 +144,35 @@ public StoreResponse unwrapToStoreResponse( payloadBuf ); - if (payloadBuf == Unpooled.EMPTY_BUFFER) { - // payload is a slice/derived view; super() owns payload, we still own the container - // this includes scenarios where payloadBuf == EMPTY_BUFFER - ReferenceCountUtil.safeRelease(content); + if (payloadBuf == Unpooled.EMPTY_BUFFER && content.refCnt() > 0) { + safeSilentRelease(content); } return storeResponse; - } catch (Throwable t){ - if (payloadBuf == Unpooled.EMPTY_BUFFER) { - // payload is a slice/derived view; super() owns payload, we still own the container - // this includes scenarios where payloadBuf == EMPTY_BUFFER - ReferenceCountUtil.safeRelease(content); + } catch (Throwable t) { + if (payloadBuf == Unpooled.EMPTY_BUFFER && content.refCnt() > 0) { + safeSilentRelease(content); } throw t; } } - ReferenceCountUtil.safeRelease(content); + if (content.refCnt() > 0) { + safeSilentRelease(content); + } return super.unwrapToStoreResponse(endpoint, request, statusCode, headers, Unpooled.EMPTY_BUFFER); } - ReferenceCountUtil.safeRelease(content); + if (content.refCnt() > 0) { + safeSilentRelease(content); + } throw new IllegalStateException("Invalid rntbd response"); } catch (Throwable t) { // Ensure container is not leaked on any unexpected path - ReferenceCountUtil.safeRelease(content); + if (content.refCnt() > 0) { + safeSilentRelease(content); + } throw t; } } @@ -225,7 +236,7 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque headers, Flux.just(contentAsByteArray)); } finally { - ReferenceCountUtil.safeRelease(byteBuf); + safeSilentRelease(byteBuf); } } @@ -234,6 +245,15 @@ public Map getDefaultHeaders() { return this.defaultHeaders; } + private static void safeSilentRelease(Object msg) { + try { + ReferenceCountUtil.release(msg); + } catch (Throwable t) { + // ReferenceCountUtil.safeRelease would always log a WARN on double-release. + // In this class we only need this for a rare race condition — swallow silently. + } + } + private HttpHeaders getHttpHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); // todo: select only required headers from defaults diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/WebExceptionRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/WebExceptionRetryPolicy.java index 0314d4dea86c..6ea14dcf68c2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/WebExceptionRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/WebExceptionRetryPolicy.java @@ -74,7 +74,7 @@ public Mono shouldRetry(Exception e) { if (this.isReadRequest || request.isAddressRefresh() || WebExceptionUtility.isWebExceptionRetriable(e)) { - int delayInSeconds = this.timeoutPolicy.getTimeoutAndDelaysList().get(this.retryCount).getDelayForNextRequestInSeconds(); + Duration delay = this.timeoutPolicy.getTimeoutAndDelaysList().get(this.retryCount).getDelayForNextRequest(); // Increase the retry count after calculating the delay retryCount++; logger @@ -88,7 +88,7 @@ public Mono shouldRetry(Exception e) { this.request.forceCollectionRoutingMapRefresh); this.request.setResponseTimeout(this.timeoutPolicy.getTimeoutAndDelaysList().get(this.retryCount).getResponseTimeout()); - return Mono.just(ShouldRetryResult.retryAfter(Duration.ofSeconds(delayInSeconds))); + return Mono.just(ShouldRetryResult.retryAfter(delay)); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java index 9a4af11f500c..64aa3c025cd7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicy.java @@ -20,6 +20,18 @@ public static final HttpTimeoutPolicy getTimeoutPolicy(RxDocumentServiceRequest if (OperationType.Read.equals(request.getOperationType()) && request.getResourceType() == ResourceType.DatabaseAccount) { return HttpTimeoutPolicyControlPlaneRead.INSTANCE; } + // Use Gateway V2 timeout policies when Thin Client mode is enabled + if (request.useThinClientMode && request.getResourceType() == ResourceType.Document) { + OperationType operationType = request.getOperationType(); + // Point read operations + if (OperationType.Read.equals(operationType)) { + return HttpTimeoutPolicyForGatewayV2.INSTANCE_FOR_POINT_READ; + } + // Query and Change Feed operations + if (OperationType.Query.equals(operationType) || request.isChangeFeedRequest()) { + return HttpTimeoutPolicyForGatewayV2.INSTANCE_FOR_QUERY_AND_CHANGE_FEED; + } + } return HttpTimeoutPolicyDefault.INSTANCE; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java new file mode 100644 index 000000000000..18e350e329c6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTimeoutPolicyForGatewayV2.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.http; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Timeout policy for Gateway V2 (Thin Client) requests. + * This policy has separate configurations for point read operations vs query/change feed operations. + */ +public class HttpTimeoutPolicyForGatewayV2 extends HttpTimeoutPolicy { + + public static final HttpTimeoutPolicy INSTANCE_FOR_POINT_READ = new HttpTimeoutPolicyForGatewayV2(true); + public static final HttpTimeoutPolicy INSTANCE_FOR_QUERY_AND_CHANGE_FEED = new HttpTimeoutPolicyForGatewayV2(false); + + private final boolean isPointRead; + + private HttpTimeoutPolicyForGatewayV2(boolean isPointRead) { + this.isPointRead = isPointRead; + this.timeoutAndDelaysList = getTimeoutList(); + } + + public List getTimeoutList() { + if (this.isPointRead) { + return Collections.unmodifiableList( + Arrays.asList( + new ResponseTimeoutAndDelays(Duration.ofSeconds(6), Duration.ZERO), + new ResponseTimeoutAndDelays(Duration.ofSeconds(6), Duration.ZERO), + new ResponseTimeoutAndDelays(Duration.ofSeconds(10), Duration.ZERO))); + } else { + return Collections.unmodifiableList( + Arrays.asList( + new ResponseTimeoutAndDelays(Duration.ofSeconds(6), Duration.ZERO), + new ResponseTimeoutAndDelays(Duration.ofSeconds(6), Duration.ZERO), + new ResponseTimeoutAndDelays(Duration.ofSeconds(10), Duration.ZERO))); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 44e2d0045edb..6206a4d64db9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -6,6 +6,8 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpMethod; @@ -247,26 +249,23 @@ private static ConnectionObserver getConnectionObserver() { logger.trace("STATE {}, Connection {}, Time {}", state, conn, time); if (state.equals(HttpClientState.CONNECTED)) { - if (conn instanceof ConnectionObserver) { - ConnectionObserver observer = (ConnectionObserver) conn; - ReactorNettyRequestRecord requestRecord = - observer.currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, null); - if (requestRecord == null) { - throw new IllegalStateException("ReactorNettyRequestRecord not found in context"); - } - requestRecord.setTimeConnected(time); + ReactorNettyRequestRecord requestRecord = getRequestRecordFromConnection(conn); + if (requestRecord == null) { + throw new IllegalStateException("ReactorNettyRequestRecord not found in context"); } + requestRecord.setTimeConnected(time); + captureChannelIds(conn.channel(), requestRecord, true); } else if (state.equals(HttpClientState.ACQUIRED)) { - if (conn instanceof ConnectionObserver) { - ConnectionObserver observer = (ConnectionObserver) conn; - ReactorNettyRequestRecord requestRecord = - observer.currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, null); - if (requestRecord == null) { - throw new IllegalStateException("ReactorNettyRequestRecord not found in context"); - } - requestRecord.setTimeAcquired(time); + ReactorNettyRequestRecord requestRecord = getRequestRecordFromConnection(conn); + if (requestRecord == null) { + throw new IllegalStateException("ReactorNettyRequestRecord not found in context"); } + requestRecord.setTimeAcquired(time); + captureChannelIds(conn.channel(), requestRecord, false); } else if (state.equals(HttpClientState.STREAM_CONFIGURED)) { + // STREAM_CONFIGURED fires for HTTP/2 streams on every request (unlike CONNECTED + // which only fires once when the TCP connection is established). + // For H2, conn.channel() here is the stream channel; conn.channel().parent() is the TCP connection. if (conn instanceof HttpClientRequest) { HttpClientRequest httpClientRequest = (HttpClientRequest) conn; ReactorNettyRequestRecord requestRecord = @@ -275,6 +274,8 @@ private static ConnectionObserver getConnectionObserver() { throw new IllegalStateException("ReactorNettyRequestRecord not found in context"); } requestRecord.setTimeAcquired(time); + requestRecord.setHttp2(true); + captureChannelIds(conn.channel(), requestRecord, true); } } else if (state.equals(HttpClientState.CONFIGURED) || state.equals(HttpClientState.REQUEST_PREPARED)) { if (conn instanceof HttpClientRequest) { @@ -318,6 +319,40 @@ private static ConnectionObserver getConnectionObserver() { }; } + /** + * Extracts the ReactorNettyRequestRecord from the connection's context. + * Returns null if the connection is not a ConnectionObserver or if the record is not in context. + */ + private static ReactorNettyRequestRecord getRequestRecordFromConnection(Connection conn) { + if (conn instanceof ConnectionObserver) { + return ((ConnectionObserver) conn) + .currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, null); + } + return null; + } + + /** + * Captures channelId and parentChannelId from the given channel onto the request record. + * For HTTP/2, channel.parent() is the TCP connection; for HTTP/1.1, parent is null. + * + * @param channel the netty channel (stream channel for H2, connection channel for H1) + * @param requestRecord the record to populate + * @param overwrite if true, always writes channel IDs; if false, only writes when not already set + */ + private static void captureChannelIds(Channel channel, ReactorNettyRequestRecord requestRecord, boolean overwrite) { + ChannelId id = channel.id(); + String channelId = id.asShortText(); + Channel parent = channel.parent(); + String parentChannelId = parent != null + ? parent.id().asShortText() + : channelId; + + if (overwrite || requestRecord.getParentChannelId() == null) { + requestRecord.setChannelId(channelId); + requestRecord.setParentChannelId(parentChannelId); + } + } + private static class ReactorNettyHttpResponse extends HttpResponse { private final AtomicReference state = new AtomicReference<>(ReactorNettyResponseState.NOT_SUBSCRIBED); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java index 675a217b36bc..021e15b01038 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java @@ -39,6 +39,9 @@ public final class ReactorNettyRequestRecord { private volatile Instant timeSent; private volatile Instant timeReceived; private volatile Instant timeCompleted; + private volatile String channelId; + private volatile String parentChannelId; + private volatile boolean http2; private final long transportRequestId; public ReactorNettyRequestRecord() { @@ -204,4 +207,56 @@ public RequestTimeline takeTimelineSnapshot() { public long getTransportRequestId() { return transportRequestId; } + + /** + * Gets the channel ID (netty channel short text ID) used for this request. + * For HTTP/2, this is the stream channel ID. For HTTP/1.1, same as parentChannelId. + * @return channelId + */ + public String getChannelId() { + return this.channelId; + } + + /** + * Sets the channel ID. + * @param channelId the netty channel ID + */ + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + /** + * Gets the parent channel ID — the TCP connection identity. + * For HTTP/2: this is the parent TCP channel (shared across multiplexed streams). + * For HTTP/1.1: same as channelId (no multiplexing). + * Use this to determine if two requests shared the same physical TCP connection. + * @return parentChannelId + */ + public String getParentChannelId() { + return this.parentChannelId; + } + + /** + * Sets the parent channel ID. + * @param parentChannelId the parent TCP connection channel ID + */ + public void setParentChannelId(String parentChannelId) { + this.parentChannelId = parentChannelId; + } + + /** + * Returns true if this request was sent over HTTP/2. + * @return true if HTTP/2, false if HTTP/1.1 + */ + public boolean isHttp2() { + return this.http2; + } + + /** + * Sets whether this request used HTTP/2. + * @param isHttp2 true if HTTP/2 + */ + public void setHttp2(boolean isHttp2) { + this.http2 = isHttp2; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ResponseTimeoutAndDelays.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ResponseTimeoutAndDelays.java index f91a0b3a1428..a81e34ac048a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ResponseTimeoutAndDelays.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ResponseTimeoutAndDelays.java @@ -8,10 +8,18 @@ public class ResponseTimeoutAndDelays { private final Duration responseTimeout; private final int delayForNextRequestInSeconds; + private final Duration delayForNextRequest; ResponseTimeoutAndDelays(Duration requestTimeout, int delayForNextRequest) { this.responseTimeout = requestTimeout; this.delayForNextRequestInSeconds = delayForNextRequest; + this.delayForNextRequest = Duration.ofSeconds(delayForNextRequest); + } + + ResponseTimeoutAndDelays(Duration requestTimeout, Duration delayForNextRequest) { + this.responseTimeout = requestTimeout; + this.delayForNextRequest = delayForNextRequest; + this.delayForNextRequestInSeconds = (int) delayForNextRequest.getSeconds(); } public Duration getResponseTimeout() { @@ -22,4 +30,8 @@ public int getDelayForNextRequestInSeconds() { return delayForNextRequestInSeconds; } + public Duration getDelayForNextRequest() { + return delayForNextRequest; + } + }