diff --git a/sdk/cosmos/azure-cosmos-tests/CUSTOMER_WORKFLOW_COVERAGE_MAP.md b/sdk/cosmos/azure-cosmos-tests/CUSTOMER_WORKFLOW_COVERAGE_MAP.md new file mode 100644 index 000000000000..52d55708292f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/CUSTOMER_WORKFLOW_COVERAGE_MAP.md @@ -0,0 +1,83 @@ +# Customer Workflow Coverage Map + +This file tracks the customer release-validation workflows from `e:\benchmark-tests` against the runnable Cosmos SDK tests in this module. Keep it updated as customer-derived workflows are ported, disabled, or marked duplicate. + +## Classification + +| Classification | Meaning | +|---|---| +| `gap` | Customer workflow has no equivalent SDK scenario coverage. | +| `partial` | SDK covers the primitives, but not the customer-style operation chain or diagnostics assertion. | +| `duplicate` | Existing SDK tests already cover the behavior with enough release-signal fidelity. | +| `wrapper-specific` | Assertion belongs to the customer's wrapper defaults, not an SDK contract. | +| `deferred` | Candidate workflow, but not enabled until runtime/flakiness/account-shape trade-offs are reviewed. | + +## Initial Coverage Triage + +| Customer source area | Customer workflow signal | Existing SDK references | Classification | Initial action | +|---|---|---|---|---| +| `test/CosmosDaoTest.java` | Create/read/query/upsert/delete, readAll, bulk/batch, custom serializer through a DAO wrapper | `CosmosItemTest`, `DocumentCrudTest`, `CosmosBulkTest`, `CosmosBulkAsyncTest`, `TransactionalBatchTest`, `CosmosItemSerializerTest` | `partial` / `wrapper-specific` | Port only customer-style operation chains with diagnostics assertions; mark DAO cache/session-map/default policy checks wrapper-specific. | +| `test/CosmosMultiFeatureTests.java` | App-style create/read/query/upsert/delete, keyword identifiers, invalid session token, no preferred region default routing | `CosmosDiagnosticsTest`, `ExcludeRegionTests`, `SessionConsistencyWithRegionScopingTests` | `partial` | Start with keyword identifier and region-routing workflows in `fi-customer-workflows`. | +| `test/CosmosDriverDynamicRequestOptionTest.java` | Dynamic operation policy changes request options per operation and validates request options through diagnostics | `OperationPoliciesTest`, `GatewayReadConsistencyStrategyE2ETest`, `CosmosLatestCommittedItemTests` | `partial` | Add customer workflow tests that combine create/read/query/readMany/upsert chains and diagnostics request-option validation. | +| `test/Latest_Committed_Tests.java` | Latest-committed with excluded regions, consistency combinations, RU and contacted-region expectations | `CosmosLatestCommittedItemTests`, `GatewayReadConsistencyStrategyE2ETest`, `ClientRetryPolicyE2ETests` | `partial` | Add focused live multi-region workflow rows; keep primitive latest-committed behavior as duplicate references. | +| `regression/direct/*.java` | Latest-committed direct-mode regression matrix for change feed, read, query, readMany, session/eventual combinations | `CosmosLatestCommittedItemTests`, change feed processor tests | `partial` | Port only variants that add multi-region workflow signal beyond existing latest-committed tests. | +| `regression/gateway/*.java` | Gateway latest-committed regression matrix | `GatewayReadConsistencyStrategyE2ETest`, `GatewayReadConsistencyStrategySpyWireTest` | `duplicate` / `partial` | Keep as duplicate unless the coverage table identifies a workflow assertion not present in gateway tests. | +| `test/CosmosHighE2ETimeoutTest.java` | E2E timeout behavior under response delay and partition migrating faults for create/query/readMany/batch | `EndToEndTimeOutValidationTests`, `EndToEndTimeOutWithAvailabilityTest`, `FaultInjectionWithAvailabilityStrategyTestsBase` | `partial` | Add one customer-style chain after request-option workflows stabilize. | +| `test/CosmosStoredProcedureTest.java` | Stored procedure create/read/update diagnostics under response delay and read-session-not-available faults | `StoredProcedureCrudTest`, `StoredProcedureQueryTest`, `StoredProcedureUpsertReplaceTest`, `CosmosSyncStoredProcTest` | `gap` / `partial` | Add targeted stored-procedure fault workflow that deploys scripts in setup. | +| `test/ChangeFeedProcessorTest.java` | CFP start/stop, latest-version handler, current state, restart, and fault-injected read feed | `IncrementalChangeFeedProcessorTest`, `FullFidelityChangeFeedProcessorTest`, `CosmosContainerChangeFeedTest` | `partial` | Add a small CFP workflow and replace fixed sleeps with polling. | +| `test/PartitionLevelCircuitBreakerTests.java` | PCLB app chain and query-plan behavior under regional faults | `PerPartitionCircuitBreakerE2ETests`, `PerPartitionAutomaticFailoverE2ETests` | `partial` | Add one PCLB-enabled workflow row after first live suite run. | +| `test/CosmosConflictResolutionTest.java` | Multi-client conflict detection and conflict query | `CosmosConflictsTest`, `ConflictTests`, `MultiMasterConflictResolutionTest` | `duplicate` / `partial` | Document existing coverage first; port only if customer ordering/diagnostics differs. | +| `test/Cosmos429test.java` | 429 and connection delay behavior in app-shaped calls | `RetryThrottleTest`, `ResourceThrottleRetryPolicyTest`, `FaultInjectionServerErrorRuleOnDirectTests`, `FaultInjectionServerErrorRuleOnGatewayTests` | `duplicate` / `partial` | Prefer parameterized FI rows; do not create a standalone clone. | +| `singlemaster/direct/*.java` | Single-write account availability strategies in direct mode | `EndToEndTimeOutWithAvailabilityTest`, `ExcludeRegionTests`, `FITests_*` | `deferred` | Document rows first; add a single-write multi-region matrix only if unique customer coverage remains. | +| `singlemaster/gateway/*.java` | Single-write account availability strategies in gateway mode | Gateway retry/fault-injection tests | `deferred` | Same as singlemaster/direct. | +| `multimaster/direct/*.java` | Multi-write direct availability strategy matrix across fault/status/operation combinations | `FaultInjectionWithAvailabilityStrategyTestsBase`, `FITests_*`, `PerPartitionAutomaticFailoverE2ETests` | `partial` | Port representative workflow matrix with TestNG data providers instead of one class per customer file. | +| `multimaster/gateway/*.java` | Multi-write gateway availability strategy matrix | `FaultInjectionServerErrorRuleOnGatewayTests`, `FaultInjectionServerErrorRuleOnGatewayV2Tests`, `FITests_*` | `partial` | Port selected gateway workflow rows after direct-mode baseline. | + +## Enabled Suite + +The initial implementation adds TestNG group `fi-customer-workflows`, Maven profile `-Pfi-customer-workflows`, and live matrix display name `FaultInjectionCustomerWorkflows`. The suite is intended to run only through the existing on-demand Cosmos live test path. + +Single-write multi-region customer workflows use TestNG group `fi-sm-customer-workflows`, Maven profile `-Pfi-sm-customer-workflows`, and live matrix display name `FaultInjectionSingleMasterCustomerWorkflows`. + +## Implemented Workflow Classes + +| Workflow class | Customer coverage areas represented | +|---|---| +| `CustomerWorkflowRequestOptionsTest` | Dynamic request options, keyword identifiers, excluded regions, create/read/query/readMany/upsert/delete diagnostics. | +| `CustomerWorkflowDaoStyleOperationsTest` | DAO-style CRUD chain, readAll, patch, transactional batch, bulk read/patch with max micro-batch sizing, and request-level serializer propagation. | +| `CustomerWorkflowLatestCommittedTest` | Latest-committed point read, query, readMany, change feed, excluded regions, diagnostics request-option propagation, regional lease-not-found fault coverage, and direct/gateway client variants. | +| `CustomerWorkflowSessionTokenTest` | ReadMany with valid and advanced user session tokens, validating read-session-not-available behavior. | +| `CustomerWorkflowStoredProcedureTest` | Stored procedure create/read/execute with script logging and metadata fault-rule coverage. | +| `CustomerWorkflowChangeFeedProcessorTest` | Latest-version CFP start, restart, current state/lag, and read-feed fault recovery. | +| `CustomerWorkflowAvailabilityFaultMatrixTest` | Expanded multi-master direct/gateway fault matrix for read, query, readMany, create, upsert, replace, delete, and patch operations across representative 404/408/410/429/449/500/503 families. | +| `CustomerWorkflowHighE2ETimeoutTest` | Response-delay workflow with E2E timeout and availability strategy for create, read, query, readMany, upsert, batch, patch, and partition-migrating read. | +| `CustomerWorkflowPartitionLevelCircuitBreakerTest` | PCLB-oriented point read, query-plan diagnostics/query, and patch app-chain workflow under the PCLB-enabled live matrix leg. | +| `CustomerWorkflowSingleMasterAvailabilityTest` | Single-write multi-region excluded-readable-region reads, local readable-region read faults, write faults constrained to the single writable region, and representative direct/gateway read/create fault matrices. | + +## Remaining Gap Summary + +| Remaining area | Current status after `fi-customer-workflows` | Importance of adding more | +|---|---|---| +| Exhaustive dynamic request-option matrix | Core app-style create/read/query/readMany/upsert/delete request-option propagation is covered; the exhaustive per-option matrix remains in existing SDK primitive tests. | `nice to have` / mostly duplicate. Add only if release owners want customer-style chaining for every option combination. | +| Latest-committed RU comparison variants | Point read, query, readMany, change feed, excluded regions, diagnostics propagation, and a regional lease-not-found fault are covered; strict RU comparison checks remain. | `nice to have`. RU comparisons are service-sensitive and less valuable than the diagnostics/routing checks now covered. | +| Gateway latest-committed regression variants | Direct and gateway latest-committed workflow variants are covered by `CustomerWorkflowLatestCommittedTest`; existing gateway read-consistency tests remain the primitive anchor. | `covered enough`. No further action unless strict one-class-per-customer-file parity is required. | +| Stored procedure exact fault parity | Stored procedure create/read/execute/script-log and metadata fault-rule coverage are added; exact response-delay/read-session-not-available stored-procedure fault parity is not fully represented because fault injection has no stored-procedure-specific operation type. | `addressing significant partial gap`, but may require deeper test-infra support or a carefully scoped metadata/data-plane proxy scenario. | +| CFP full customer matrix | Latest-version CFP start, restart, current state/lag, and read-feed fault recovery are covered; full-fidelity/all-versions, side-cart, and deeper lease recovery variants remain. | `nice to have`. Current workflow covers the highest-signal CFP behavior without copying the large CFP matrix. | +| Full multi-write availability matrix | Expanded direct/gateway multi-write fault rows now cover read/query/readMany/create/upsert/replace/delete/patch across representative 404/408/410/429/449/500/503 families. The only unported portion is exact one-class-per-customer-file parity and every operation/error permutation. | `runtime-heavy duplicate`. Stop here unless parity is required over runtime. | +| Single-write direct/gateway availability matrix | Dedicated single-write multi-region live leg and representative direct/gateway read/create fault matrices are added through `fi-sm-customer-workflows`; exact one-class-per-error-file parity remains. | `runtime-heavy duplicate`. Stop here unless strict customer-suite parity is required. | +| High E2E timeout extended fault variants | Response-delay E2E timeout with availability strategy now covers create/read/query/readMany/upsert/batch/patch plus partition-migrating read; deeper customer-specific timing/RU assertions remain. | `nice to have`. The main workflow gap is covered; remaining work is runtime-sensitive strict parity. | +| PCLB exact regional circuit-breaker assertions | PCLB-oriented read/query-plan diagnostics/query/patch app-chain workflow is added; exact circuit-breaker state transitions remain in existing PCLB tests. | `nice to have` for customer parity; existing SDK PCLB tests already cover the lower-level behavior. | +| 429 and connection-delay app-shaped calls | 429-style rows are now represented in multi-write and single-write matrices; connection-delay/connect-reset style network transport variants remain in existing transport/FI tests and selected timeout workflows. | `runtime-heavy duplicate`. Add only if network-fault parity is explicitly required. | +| Conflict resolution and conflict query | Not added to the new workflow suite; existing conflict tests cover core SDK behavior. | `nice to have` / duplicate. Add only if customer multi-client ordering or diagnostics are materially different. | +| Basic multi-write behavior and feature-validation classes | Covered indirectly by CRUD/request-options/latest-committed/session-token workflows and existing multi-master tests. | `completely duplicate` for this suite unless a specific uncovered assertion is identified. | +| Custom serializer standalone tests | Request-level serializer propagation is represented in the DAO-style workflow; existing serializer tests cover normal and exception behavior. | `completely duplicate`. Keep deeper standalone serializer tests out of this workflow suite. | +| Customer wrapper defaults, caches, DAO session maps, and configuration defaults | Not ported by design because these are not SDK contracts. | `completely useless for SDK coverage` / wrapper-specific. Keep documented only. | + +## Porting Rules + +- Use SDK-native tests in `azure-cosmos-tests`; do not copy customer-specific package dependencies. +- Do not copy hardcoded customer endpoints, account keys, database names, or container names. +- Prefer dynamic account-region discovery over hardcoded region order. +- Replace fixed sleeps with polling or retry loops. +- Preserve customer workflow shape where it adds release signal: operation chains, contacted-region diagnostics, effective consistency, effective read-consistency strategy, retry counts, and request-option propagation. +- Mark wrapper default assertions as `wrapper-specific` unless the SDK owns the behavior. diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index 680f9470e6bf..18cb8e9cd2d1 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -659,6 +659,60 @@ Licensed under the MIT License. + + + fi-customer-workflows + + fi-customer-workflows + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.3 + + + src/test/resources/fi-customer-workflows-testng.xml + + + true + 1 + 256 + paranoid + + + + + + + + + fi-sm-customer-workflows + + fi-sm-customer-workflows + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.3 + + + src/test/resources/fi-sm-customer-workflows-testng.xml + + + true + 1 + 256 + paranoid + + + + + + multi-region diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 66b6e6314ad0..4f9f54f2fdb9 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -302,7 +302,7 @@ public CosmosAsyncDatabase getDatabase(String id) { @BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT) public void beforeSuite() { logger.info("beforeSuite Started"); @@ -353,7 +353,7 @@ private static DocumentCollection getInternalDocumentCollection(CosmosAsyncConta @AfterSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) public void afterSuite() { logger.info("afterSuite Started"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java new file mode 100644 index 000000000000..1d5300249d66 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java @@ -0,0 +1,194 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowAvailabilityFaultMatrixTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowAvailabilityFaultMatrixTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer availability fault workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @DataProvider(name = "availabilityFaultScenarios") + public Object[][] availabilityFaultScenarios() { + return new Object[][]{ + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.GONE}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.GONE}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.GONE}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.RETRY_WITH}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.GONE}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.GONE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.GONE} + }; + } + + @Test(groups = {"fi-customer-workflows"}, dataProvider = "availabilityFaultScenarios", timeOut = TIMEOUT) + public void representativeDirectMultiMasterFaultWorkflow( + String operation, + FaultInjectionOperationType faultInjectionOperationType, + FaultInjectionServerErrorType errorType) { + + TestObject item = TestObject.create(); + if (!"create".equals(operation)) { + this.container.createItem(item).block(); + } + + FaultInjectionRule faultRule = configureServerErrorRule( + this.container, + faultInjectionOperationType, + errorType, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosDiagnosticsContext diagnosticsContext = executeOperation(operation, item); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + assertThat(diagnosticsContext.getDuration()).isNotNull(); + } finally { + faultRule.disable(); + } + } + + private CosmosDiagnosticsContext executeOperation(String operation, TestObject item) { + try { + if ("read".equals(operation)) { + CosmosItemResponse response = this.container + .readItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions(), TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("query".equals(operation)) { + FeedResponse response = this.container + .queryItems( + String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), + new CosmosQueryRequestOptions().setQueryName("AvailabilityFaultWorkflowQuery"), + TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("readMany".equals(operation)) { + FeedResponse response = this.container + .readMany( + Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), + new CosmosReadManyRequestOptions(), + TestObject.class) + .block(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("upsert".equals(operation)) { + item.setStringProp("fault-upsert-" + item.getStringProp()); + CosmosItemResponse response = this.container + .upsertItem(item, new CosmosItemRequestOptions().setContentResponseOnWriteEnabled(true)) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("replace".equals(operation)) { + item.setStringProp("fault-replace-" + item.getStringProp()); + CosmosItemResponse response = this.container + .replaceItem(item, item.getId(), partitionKey(item), new CosmosItemRequestOptions()) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("delete".equals(operation)) { + CosmosItemResponse response = this.container + .deleteItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions()) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("patch".equals(operation)) { + CosmosItemResponse response = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "fault-patch-" + item.getStringProp()), + TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + CosmosItemResponse response = this.container + .createItem(TestObject.create(), new CosmosItemRequestOptions().setContentResponseOnWriteEnabled(true)) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + CosmosDiagnosticsContext diagnosticsContext = error.getDiagnostics().getDiagnosticsContext(); + assertThat(error.getStatusCode()).isGreaterThanOrEqualTo(HttpConstants.StatusCodes.BADREQUEST); + return diagnosticsContext; + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java new file mode 100644 index 000000000000..ef564a7fefdb --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.ChangeFeedProcessor; +import com.azure.cosmos.ChangeFeedProcessorBuilder; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.models.ChangeFeedProcessorItem; +import com.azure.cosmos.models.ChangeFeedProcessorOptions; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.fasterxml.jackson.databind.JsonNode; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowChangeFeedProcessorTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowChangeFeedProcessorTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer change feed processor workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void latestVersionProcessorRestartAndReadFeedFaultWorkflow() throws InterruptedException { + CosmosAsyncContainer feedContainer = createTemporaryContainer("customer-cfp-feed", "/mypk"); + CosmosAsyncContainer leaseContainer = createTemporaryContainer("customer-cfp-lease", "/id"); + ChangeFeedProcessor processor = null; + FaultInjectionRule readFeedDelayRule = null; + + try { + Set expectedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + Set receivedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + CountDownLatch initialLatch = new CountDownLatch(2); + + createFeedItem(feedContainer, expectedIds, "cfp-initial-1"); + createFeedItem(feedContainer, expectedIds, "cfp-initial-2"); + + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, receivedIds, initialLatch, "initial"); + processor.start().block(); + ChangeFeedProcessor initialProcessor = processor; + + assertThat(processor.isStarted()).isTrue(); + assertThat(initialLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(receivedIds).containsAll(expectedIds); + + awaitCondition( + () -> !initialProcessor.getCurrentState().block().isEmpty(), + Duration.ofSeconds(20), + "Change feed processor did not acquire leases."); + + processor.stop().block(); + assertThat(processor.isStarted()).isFalse(); + + CountDownLatch restartLatch = new CountDownLatch(1); + TestObject restartedItem = createFeedItem(feedContainer, expectedIds, "cfp-restart"); + readFeedDelayRule = configureResponseDelayRule(feedContainer, FaultInjectionOperationType.READ_FEED_ITEM, Duration.ofMillis(100), 1); + + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, receivedIds, restartLatch, "restart"); + processor.start().block(); + + assertThat(processor.isStarted()).isTrue(); + assertThat(restartLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(receivedIds).contains(restartedItem.getId()); + assertThat(processor.getEstimatedLag().block()).isNotNull(); + } finally { + if (readFeedDelayRule != null) { + readFeedDelayRule.disable(); + } + if (processor != null && processor.isStarted()) { + processor.stop().block(); + } + deleteTemporaryContainer(feedContainer); + deleteTemporaryContainer(leaseContainer); + } + } + + private TestObject createFeedItem(CosmosAsyncContainer feedContainer, Set expectedIds, String partitionKey) { + TestObject item = TestObject.create(partitionKey + "-" + UUID.randomUUID()); + feedContainer.createItem(item).block(); + expectedIds.add(item.getId()); + return item; + } + + private ChangeFeedProcessor createLatestVersionProcessor( + CosmosAsyncContainer feedContainer, + CosmosAsyncContainer leaseContainer, + Set expectedIds, + Set receivedIds, + CountDownLatch latch, + String leasePrefix) { + + return new ChangeFeedProcessorBuilder() + .hostName("customer-workflow-" + leasePrefix + "-" + UUID.randomUUID()) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .handleLatestVersionChanges(items -> recordLatestVersionItems(items, expectedIds, receivedIds, latch)) + .options(new ChangeFeedProcessorOptions() + .setStartFromBeginning(true) + .setFeedPollDelay(Duration.ofMillis(500)) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setLeaseRenewInterval(Duration.ofSeconds(2)) + .setLeaseExpirationInterval(Duration.ofSeconds(6)) + .setMaxItemCount(10) + .setLeasePrefix("customer-" + leasePrefix)) + .buildChangeFeedProcessor(); + } + + private static void recordLatestVersionItems( + List items, + Set expectedIds, + Set receivedIds, + CountDownLatch latch) { + + for (ChangeFeedProcessorItem item : items) { + JsonNode current = item.getCurrent(); + if (current != null && current.has("id")) { + String id = current.get("id").asText(); + if (expectedIds.contains(id) && receivedIds.add(id)) { + latch.countDown(); + } + } + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java new file mode 100644 index 000000000000..bc55ec5681f8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java @@ -0,0 +1,147 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosItemSerializer; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowDaoStyleOperationsTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowDaoStyleOperationsTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer DAO-style workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void crudReadAllPatchBatchAndBulkWorkflow() { + List excludedRegions = excludeFirstWritableRegion(); + TestObject item = TestObject.create(); + + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("workflow-crud-create"))) + .setExcludedRegions(excludedRegions) + .setCustomItemSerializer(CosmosItemSerializer.DEFAULT_SERIALIZER) + .setContentResponseOnWriteEnabled(true); + + CosmosItemResponse createResponse = this.container + .createItem(item, createOptions) + .block(); + + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertKeywordIdentifier(createResponse.getDiagnostics().getDiagnosticsContext(), "workflow-crud-create"); + assertThat(getRequestOptions(createResponse.getDiagnostics().getDiagnosticsContext()).getCustomItemSerializer()) + .isSameAs(CosmosItemSerializer.DEFAULT_SERIALIZER); + assertDidNotContactExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions().setExcludedRegions(excludedRegions), TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + assertThat(readResponse.getItem()).isEqualTo(item); + + FeedResponse readAllResponse = this.container + .readAllItems( + partitionKey(item), + new CosmosQueryRequestOptions() + .setExcludedRegions(excludedRegions) + .setCustomItemSerializer(CosmosItemSerializer.DEFAULT_SERIALIZER), + TestObject.class) + .byPage() + .blockFirst(); + + assertThat(readAllResponse).isNotNull(); + assertThat(readAllResponse.getResults()).extracting(TestObject::getId).contains(item.getId()); + assertExcludedRegions(readAllResponse.getCosmosDiagnostics().getDiagnosticsContext(), excludedRegions); + assertThat(getRequestOptions(readAllResponse.getCosmosDiagnostics().getDiagnosticsContext()).getCustomItemSerializer()) + .isSameAs(CosmosItemSerializer.DEFAULT_SERIALIZER); + + CosmosPatchOperations patchOperations = CosmosPatchOperations.create() + .set("/stringProp", "patched-" + item.getStringProp()); + + CosmosItemResponse patchResponse = this.container + .patchItem(item.getId(), partitionKey(item), patchOperations, TestObject.class) + .block(); + + assertThat(patchResponse).isNotNull(); + assertThat(patchResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(patchResponse.getItem().getStringProp()).startsWith("patched-"); + + String batchPk = "batch-" + UUID.randomUUID(); + TestObject batchItem = TestObject.create(batchPk); + CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey(batchItem)); + batch.createItemOperation(batchItem); + batch.readItemOperation(batchItem.getId()); + + CosmosBatchResponse batchResponse = this.container.executeCosmosBatch(batch).block(); + + assertThat(batchResponse).isNotNull(); + assertThat(batchResponse.isSuccessStatusCode()).isTrue(); + assertThat(batchResponse.size()).isEqualTo(2); + assertThat(batchResponse.getDiagnostics()).isNotNull(); + + TestObject bulkItem = TestObject.create(); + this.container.createItem(bulkItem).block(); + CosmosPatchOperations bulkPatchOperations = CosmosPatchOperations.create() + .set("/stringProp", "bulk-patched-" + bulkItem.getStringProp()); + + List bulkOperations = new ArrayList<>(); + bulkOperations.add(CosmosBulkOperations.getReadItemOperation(bulkItem.getId(), partitionKey(bulkItem))); + bulkOperations.add(CosmosBulkOperations.getPatchItemOperation(bulkItem.getId(), partitionKey(bulkItem), bulkPatchOperations)); + + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions() + .setMaxMicroBatchSize(2) + .setExcludedRegions(excludedRegions) + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("workflow-bulk"))); + + List> bulkResponses = this.container + .executeBulkOperations(Flux.fromIterable(bulkOperations), bulkExecutionOptions) + .collectList() + .block(); + + assertThat(bulkResponses).isNotNull(); + assertThat(bulkResponses).hasSize(2); + assertThat(bulkResponses).allSatisfy(response -> { + assertThat(response.getException()).isNull(); + assertThat(response.getResponse().getStatusCode()).isIn(HttpConstants.StatusCodes.OK, HttpConstants.StatusCodes.CREATED); + assertThat(response.getResponse().getCosmosDiagnostics()).isNotNull(); + }); + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java new file mode 100644 index 000000000000..5624fafdd732 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java @@ -0,0 +1,217 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchRequestOptions; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchItemRequestOptions; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowHighE2ETimeoutTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowHighE2ETimeoutTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer high E2E timeout workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @DataProvider(name = "timeoutWorkflowOperations") + public Object[][] timeoutWorkflowOperations() { + return new Object[][]{ + {"create", FaultInjectionOperationType.CREATE_ITEM}, + {"read", FaultInjectionOperationType.READ_ITEM}, + {"query", FaultInjectionOperationType.QUERY_ITEM}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM}, + {"batch", FaultInjectionOperationType.BATCH_ITEM}, + {"patch", FaultInjectionOperationType.PATCH_ITEM} + }; + } + + @Test(groups = {"fi-customer-workflows"}, dataProvider = "timeoutWorkflowOperations", timeOut = 2 * TIMEOUT) + public void responseDelayWithAvailabilityStrategyWorkflow(String operation, FaultInjectionOperationType faultInjectionOperationType) { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + FaultInjectionRule delayRule = configureResponseDelayRule(this.container, faultInjectionOperationType, Duration.ofMillis(1500), 1); + + try { + CosmosDiagnosticsContext diagnosticsContext = executeWithE2EPolicy(operation, item, e2ePolicy); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(10)); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + } finally { + delayRule.disable(); + } + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void partitionMigratingFaultWithE2EPolicyWorkflow() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + FaultInjectionRule migratingRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, + 1); + + try { + CosmosDiagnosticsContext diagnosticsContext = executeWithE2EPolicy("read", item, e2ePolicy); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(10)); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + } finally { + migratingRule.disable(); + } + } + + private CosmosDiagnosticsContext executeWithE2EPolicy( + String operation, + TestObject item, + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + + try { + if ("create".equals(operation)) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .createItem(TestObject.create(), options) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } + + if ("read".equals(operation)) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } + + if ("query".equals(operation)) { + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy) + .setQueryName("HighE2ETimeoutWorkflowQuery"); + + FeedResponse response = this.container + .queryItems(String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), options, TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("readMany".equals(operation)) { + CosmosReadManyRequestOptions options = new CosmosReadManyRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + FeedResponse response = this.container + .readMany(Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), options, TestObject.class) + .block(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("upsert".equals(operation)) { + item.setStringProp("timeout-upsert-" + item.getStringProp()); + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .upsertItem(item, options) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } + + if ("batch".equals(operation)) { + TestObject batchItem = TestObject.create("timeout-batch"); + CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey(batchItem)); + batch.createItemOperation(batchItem); + batch.readItemOperation(batchItem.getId()); + + CosmosBatchRequestOptions batchOptions = new CosmosBatchRequestOptions(); + ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper + .getCosmosBatchRequestOptionsAccessor() + .setEndToEndOperationLatencyPolicyConfig(batchOptions, e2ePolicy); + + CosmosBatchResponse response = this.container.executeCosmosBatch(batch, batchOptions).block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + CosmosPatchItemRequestOptions options = new CosmosPatchItemRequestOptions(); + options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosItemResponse response = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "timeout-patched-" + item.getStringProp()), + options, + TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java new file mode 100644 index 000000000000..cd1b9cb7a05f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java @@ -0,0 +1,169 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowLatestCommittedTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowLatestCommittedTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer latest-committed workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void latestCommittedAndExcludedRegionsFlowAcrossReadOperations() { + List excludedRegions = excludeFirstWritableRegion(); + TestObject item = TestObject.create(); + + CosmosItemResponse createResponse = this.container + .createItem(item, new CosmosItemRequestOptions().setExcludedRegions(excludedRegions)) + .block(); + + assertThat(createResponse).isNotNull(); + CosmosDiagnosticsContext createDiagnostics = createResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertThat(createDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.DEFAULT); + assertExcludedRegions(createDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(createDiagnostics, excludedRegions); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setExcludedRegions(excludedRegions) + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("latest-committed-read"))) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext readDiagnostics = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(readDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertThat(readDiagnostics.getTotalRequestCharge()).isGreaterThan(0); + assertKeywordIdentifier(readDiagnostics, "latest-committed-read"); + assertExcludedRegions(readDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readDiagnostics, excludedRegions); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setQueryName("LatestCommittedCustomerWorkflowQuery"); + + FeedResponse queryResponse = this.container + .queryItems(String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(queryResponse).isNotNull(); + assertThat(queryResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext queryDiagnostics = queryResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(queryDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(queryDiagnostics, excludedRegions); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse readManyResponse = this.container + .readMany(Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), readManyOptions, TestObject.class) + .block(); + + assertThat(readManyResponse).isNotNull(); + assertThat(readManyResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext readManyDiagnostics = readManyResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(readManyDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(readManyDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readManyDiagnostics, excludedRegions); + + CosmosChangeFeedRequestOptions changeFeedOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forLogicalPartition(partitionKey(item))) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setExcludedRegions(excludedRegions); + + FeedResponse changeFeedResponse = this.container + .queryChangeFeed(changeFeedOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(changeFeedResponse).isNotNull(); + CosmosDiagnosticsContext changeFeedDiagnostics = changeFeedResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(changeFeedDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(changeFeedDiagnostics, excludedRegions); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void latestCommittedReadWithRegionalLeaseNotFoundFault() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + FaultInjectionRule leaseNotFoundRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.LEASE_NOT_FOUND, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("latest-committed-fault-read"))); + + CosmosDiagnosticsContext diagnosticsContext; + try { + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + diagnosticsContext = readResponse.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + diagnosticsContext = error.getDiagnostics().getDiagnosticsContext(); + } + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + } finally { + leaseNotFoundRule.disable(); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java new file mode 100644 index 000000000000..35ae4799940d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchItemRequestOptions; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowPartitionLevelCircuitBreakerTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowPartitionLevelCircuitBreakerTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer PCLB workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void pointOperationCircuitBreakerAndQueryPlanWorkflow() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(3)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + FaultInjectionRule readFaultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, + 1); + + try { + CosmosDiagnosticsContext readDiagnostics = readWithPolicy(item, e2ePolicy); + + assertThat(readDiagnostics).isNotNull(); + assertThat(readDiagnostics.getStatusCode()).isGreaterThan(0); + assertThat(readDiagnostics.getContactedRegionNames()).isNotNull(); + } finally { + readFaultRule.disable(); + } + + CosmosDiagnosticsContext queryDiagnostics = queryWithPolicy(item, e2ePolicy); + assertThat(queryDiagnostics).isNotNull(); + assertThat(queryDiagnostics.getStatusCode()).isGreaterThan(0); + assertThat(queryDiagnostics.getContactedRegionNames()).isNotNull(); + assertThat(queryDiagnostics.toJson()).contains("queryPlanDiagnosticsContext"); + + CosmosPatchItemRequestOptions patchOptions = new CosmosPatchItemRequestOptions(); + patchOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + CosmosItemResponse patchResponse = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "pclb-patched-" + item.getStringProp()), + patchOptions, + TestObject.class) + .block(); + + assertThat(patchResponse).isNotNull(); + assertThat(patchResponse.getDiagnostics()).isNotNull(); + } + + private CosmosDiagnosticsContext readWithPolicy(TestObject item, CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + try { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } + + private CosmosDiagnosticsContext queryWithPolicy(TestObject item, CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + try { + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy) + .setQueryName("PclbCustomerWorkflowQuery"); + + FeedResponse response = this.container + .queryItems(String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java new file mode 100644 index 000000000000..4eb7d1f819d4 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java @@ -0,0 +1,158 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.OverridableRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowRequestOptionsTest extends CustomerWorkflowTestBase { + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowRequestOptionsTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer workflow request option tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void excludedRegionAndKeywordIdentifiersFlowAcrossOperations() { + String excludedRegion = this.writableRegions.get(0); + List excludedRegions = Collections.singletonList(excludedRegion); + TestObject item = TestObject.create(); + + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-create"))) + .setContentResponseOnWriteEnabled(true) + .setExcludedRegions(excludedRegions); + + CosmosItemResponse createResponse = this.container + .createItem(item, createOptions) + .block(); + + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertKeywordIdentifier(createResponse.getDiagnostics().getDiagnosticsContext(), "customer-create"); + assertExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-read"))) + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), new PartitionKey(item.getMypk()), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext readDiagnostics = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(readDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertKeywordIdentifier(readDiagnostics, "customer-read"); + assertExcludedRegions(readDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readDiagnostics, excludedRegions); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-query"))) + .setExcludedRegions(excludedRegions) + .setConsistencyLevel(ConsistencyLevel.EVENTUAL) + .setQueryMetricsEnabled(true) + .setQueryName("CustomerWorkflowQuery"); + + String query = String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()); + FeedResponse queryResponse = this.container + .queryItems(query, queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(queryResponse).isNotNull(); + assertThat(queryResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext queryDiagnostics = queryResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertKeywordIdentifier(queryDiagnostics, "customer-query"); + assertExcludedRegions(queryDiagnostics, excludedRegions); + OverridableRequestOptions queryRequestOptions = getRequestOptions(queryDiagnostics); + assertThat(queryRequestOptions.getConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat(queryRequestOptions.isQueryMetricsEnabled()).isTrue(); + assertThat(queryRequestOptions.getQueryNameOrDefault(null)).isEqualTo("CustomerWorkflowQuery"); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-read-many"))) + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse readManyResponse = this.container + .readMany( + Arrays.asList(new CosmosItemIdentity(new PartitionKey(item.getMypk()), item.getId())), + readManyOptions, + TestObject.class) + .block(); + + assertThat(readManyResponse).isNotNull(); + assertThat(readManyResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext readManyDiagnostics = readManyResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(readManyDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertKeywordIdentifier(readManyDiagnostics, "customer-read-many"); + assertExcludedRegions(readManyDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readManyDiagnostics, excludedRegions); + + item.setStringProp("updated-" + item.getStringProp()); + CosmosItemRequestOptions upsertOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-upsert"))) + .setExcludedRegions(excludedRegions) + .setContentResponseOnWriteEnabled(true); + + CosmosItemResponse upsertResponse = this.container + .upsertItem(item, upsertOptions) + .block(); + + assertThat(upsertResponse).isNotNull(); + assertThat(upsertResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertKeywordIdentifier(upsertResponse.getDiagnostics().getDiagnosticsContext(), "customer-upsert"); + assertExcludedRegions(upsertResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(upsertResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemRequestOptions deleteOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(new HashSet<>(Collections.singletonList("customer-delete"))) + .setExcludedRegions(excludedRegions); + + CosmosItemResponse deleteResponse = this.container + .deleteItem(item.getId(), new PartitionKey(item.getMypk()), deleteOptions) + .block(); + + assertThat(deleteResponse).isNotNull(); + assertThat(deleteResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NO_CONTENT); + assertKeywordIdentifier(deleteResponse.getDiagnostics().getDiagnosticsContext(), "customer-delete"); + assertExcludedRegions(deleteResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(deleteResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java new file mode 100644 index 000000000000..57bc2c5b6a21 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.ConsistencyTestsBase; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.SessionTokenHelper; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.FeedResponse; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class CustomerWorkflowSessionTokenTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowSessionTokenTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer session-token workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void readManyWithAdvancedSessionTokenReturnsReadSessionNotAvailable() throws Exception { + List itemIdentities = new ArrayList<>(); + String lastSessionToken = null; + + for (int index = 0; index < 3; index++) { + TestObject item = TestObject.create("session-token-workflow"); + CosmosItemResponse createResponse = this.container.createItem(item).block(); + + assertThat(createResponse).isNotNull(); + lastSessionToken = createResponse.getSessionToken(); + itemIdentities.add(new CosmosItemIdentity(partitionKey(item), item.getId())); + } + + FeedResponse validReadManyResponse = this.container + .readMany(itemIdentities, lastSessionToken, TestObject.class) + .block(); + + assertThat(validReadManyResponse).isNotNull(); + assertThat(validReadManyResponse.getResults()).hasSize(3); + + String advancedSessionToken = advanceSessionToken(lastSessionToken); + + try { + this.container + .readMany(itemIdentities, advancedSessionToken, TestObject.class) + .block(); + + fail("Should have hit read session not available error."); + } catch (Exception error) { + CosmosException cosmosException = Utils.as(error, CosmosException.class); + + assertThat(cosmosException).isNotNull(); + assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND); + assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + assertThat(cosmosException.getDiagnostics()).isNotNull(); + } + } + + private static String advanceSessionToken(String originalSessionToken) throws Exception { + String[] tokenParts = StringUtils.split(originalSessionToken, ":"); + ISessionToken sessionToken = SessionTokenHelper.parse(tokenParts[1]); + ISessionToken modifiedSessionToken = ConsistencyTestsBase.createSessionToken(sessionToken, sessionToken.getLSN() + 1000000); + + return tokenParts[0] + ":" + modifiedSessionToken.convertToString(); + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java new file mode 100644 index 000000000000..c9ab2d761932 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java @@ -0,0 +1,249 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowSingleMasterAvailabilityTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowSingleMasterAvailabilityTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-sm-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSingleWriteMultiRegionContainer("Customer single-master workflow tests"); + } + + @AfterClass(groups = {"fi-sm-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void excludedReadableRegionRoutesReadToRemainingReadableRegion() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + List excludedRegions = excludeFirstReadableRegion(); + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext diagnosticsContext = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(diagnosticsContext.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(diagnosticsContext, excludedRegions); + assertDidNotContactExcludedRegions(diagnosticsContext, excludedRegions); + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void readFaultInPreferredReadableRegionCanUseRemoteReadableRegion() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + FaultInjectionRule readSessionNotAvailableRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, + this.readableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnosticsContext diagnosticsContext = readWithDiagnostics(item, readOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + if (diagnosticsContext.getStatusCode() < HttpConstants.StatusCodes.BADREQUEST) { + assertThat(diagnosticsContext.getContactedRegionNames()).isNotEmpty(); + } else { + assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND); + assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + } + } finally { + readSessionNotAvailableRule.disable(); + } + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void writeFaultStaysOnSingleWritableRegion() { + FaultInjectionRule partitionMigratingRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.CREATE_ITEM, + FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnosticsContext diagnosticsContext = createWithDiagnostics(TestObject.create(), createOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + + Set readOnlyRegions = this.readableRegions + .stream() + .map(region -> region.toLowerCase(Locale.ROOT)) + .filter(region -> !region.equals(this.writableRegions.get(0).toLowerCase(Locale.ROOT))) + .collect(Collectors.toSet()); + assertThat(diagnosticsContext.getContactedRegionNames()).doesNotContainAnyElementsOf(readOnlyRegions); + } finally { + partitionMigratingRule.disable(); + } + } + + @DataProvider(name = "singleWriteReadFaultScenarios") + public Object[][] singleWriteReadFaultScenarios() { + return new Object[][]{ + {FaultInjectionServerErrorType.GONE}, + {FaultInjectionServerErrorType.TIMEOUT}, + {FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {FaultInjectionServerErrorType.SERVICE_UNAVAILABLE} + }; + } + + @Test(groups = {"fi-sm-customer-workflows"}, dataProvider = "singleWriteReadFaultScenarios", timeOut = TIMEOUT) + public void singleWriteReadFaultMatrix(FaultInjectionServerErrorType errorType) { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + + FaultInjectionRule faultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + errorType, + this.readableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build()); + + CosmosDiagnosticsContext diagnosticsContext = readWithDiagnostics(item, readOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + } finally { + faultRule.disable(); + } + } + + @DataProvider(name = "singleWriteMutationFaultScenarios") + public Object[][] singleWriteMutationFaultScenarios() { + return new Object[][]{ + {FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {FaultInjectionServerErrorType.TIMEOUT}, + {FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {FaultInjectionServerErrorType.RETRY_WITH}, + {FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {FaultInjectionServerErrorType.SERVICE_UNAVAILABLE} + }; + } + + @Test(groups = {"fi-sm-customer-workflows"}, dataProvider = "singleWriteMutationFaultScenarios", timeOut = TIMEOUT) + public void singleWriteCreateFaultMatrix(FaultInjectionServerErrorType errorType) { + FaultInjectionRule faultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.CREATE_ITEM, + errorType, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build()); + + CosmosDiagnosticsContext diagnosticsContext = createWithDiagnostics(TestObject.create(), createOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isGreaterThan(0); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + } finally { + faultRule.disable(); + } + } + + private CosmosDiagnosticsContext readWithDiagnostics(TestObject item, CosmosItemRequestOptions options) { + try { + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } + + private CosmosDiagnosticsContext createWithDiagnostics(TestObject item, CosmosItemRequestOptions options) { + try { + return this.container + .createItem(item, new PartitionKey(item.getMypk()), options) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java new file mode 100644 index 000000000000..d68cd5981daf --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosStoredProcedureProperties; +import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowStoredProcedureTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowStoredProcedureTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer stored procedure workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void storedProcedureCreateReadExecuteWithMetadataFaultRule() { + String storedProcedureId = "customer-sproc-" + UUID.randomUUID(); + CosmosStoredProcedureProperties storedProcedureProperties = new CosmosStoredProcedureProperties( + storedProcedureId, + "function(input) {" + + " var value = input || 'workflow';" + + " console.log('stored procedure workflow ' + value);" + + " getContext().getResponse().setBody('sproc-ok:' + value);" + + "}"); + + CosmosStoredProcedureResponse createResponse = this.container + .getScripts() + .createStoredProcedure(storedProcedureProperties) + .block(); + + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertThat(createResponse.getDiagnostics()).isNotNull(); + + FaultInjectionRule metadataDelayRule = configureResponseDelayRule( + this.container, + FaultInjectionOperationType.METADATA_REQUEST_CONTAINER, + Duration.ofMillis(100), + 1); + + try { + CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); + options.setPartitionKey(new PartitionKey("sproc-workflow")); + options.setScriptLoggingEnabled(true); + + CosmosStoredProcedureResponse readResponse = this.container + .getScripts() + .getStoredProcedure(storedProcedureId) + .read() + .block(); + + assertThat(readResponse).isNotNull(); + assertThat(readResponse.getProperties().getId()).isEqualTo(storedProcedureId); + + CosmosStoredProcedureResponse executeResponse = this.container + .getScripts() + .getStoredProcedure(storedProcedureId) + .execute(Collections.singletonList("workflow"), options) + .block(); + + assertThat(executeResponse).isNotNull(); + assertThat(executeResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(executeResponse.getResponseAsString()).contains("sproc-ok:workflow"); + assertThat(executeResponse.getScriptLog()).contains("stored procedure workflow workflow"); + assertThat(executeResponse.getDiagnostics()).isNotNull(); + } finally { + metadataDelayRule.disable(); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java new file mode 100644 index 000000000000..7a150b19de10 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java @@ -0,0 +1,320 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.ConnectionMode; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.DatabaseAccountLocation; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.OverridableRequestOptions; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; +import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; +import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import com.azure.cosmos.test.faultinjection.IFaultInjectionResult; +import org.testng.SkipException; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.UUID; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class CustomerWorkflowTestBase extends TestSuiteBase { + protected CosmosAsyncClient client; + protected CosmosAsyncContainer container; + protected List writableRegions; + protected List readableRegions; + + protected CustomerWorkflowTestBase(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + protected final void initializeSharedSinglePartitionContainer(String scenarioName) { + CosmosAsyncClient discoveryClient = null; + + try { + discoveryClient = getClientBuilder().buildAsyncClient(); + this.writableRegions = discoverWritableRegions(discoveryClient); + skipIfInsufficientRegions(this.writableRegions, scenarioName); + + this.client = getClientBuilder() + .preferredRegions(this.writableRegions) + .multipleWriteRegionsEnabled(true) + .contentResponseOnWriteEnabled(true) + .buildAsyncClient(); + this.container = getSharedSinglePartitionCosmosContainer(this.client); + } finally { + safeClose(discoveryClient); + } + } + + protected final void closeClient() { + safeClose(this.client); + this.client = null; + this.container = null; + this.writableRegions = null; + this.readableRegions = null; + } + + protected final void initializeSharedSingleWriteMultiRegionContainer(String scenarioName) { + CosmosAsyncClient discoveryClient = null; + + try { + CosmosClientBuilder clientBuilder = getClientBuilder() + .multipleWriteRegionsEnabled(false) + .contentResponseOnWriteEnabled(true); + + discoveryClient = clientBuilder.buildAsyncClient(); + this.writableRegions = discoverWritableRegions(discoveryClient); + this.readableRegions = discoverReadableRegions(discoveryClient); + skipIfInsufficientReadableRegions(this.readableRegions, scenarioName); + skipIfNotSingleWriteRegion(this.writableRegions, scenarioName); + + this.client = clientBuilder + .preferredRegions(this.readableRegions) + .multipleWriteRegionsEnabled(false) + .buildAsyncClient(); + this.container = getSharedSinglePartitionCosmosContainer(this.client); + } finally { + safeClose(discoveryClient); + } + } + + protected final List excludeFirstWritableRegion() { + return Collections.singletonList(this.writableRegions.get(0)); + } + + protected final List excludeFirstReadableRegion() { + return Collections.singletonList(this.readableRegions.get(0)); + } + + protected static com.azure.cosmos.models.PartitionKey partitionKey(TestObject item) { + return new com.azure.cosmos.models.PartitionKey(item.getMypk()); + } + + protected final CosmosAsyncContainer createTemporaryContainer(String prefix, String partitionKeyPath) { + CosmosAsyncDatabase database = getSharedCosmosDatabase(this.client); + String containerId = prefix + "-" + UUID.randomUUID(); + + database + .createContainerIfNotExists(containerId, partitionKeyPath, ThroughputProperties.createManualThroughput(400)) + .block(); + + return database.getContainer(containerId); + } + + protected static void deleteTemporaryContainer(CosmosAsyncContainer container) { + safeDeleteCollection(container); + } + + protected static void awaitCondition(BooleanSupplier condition, Duration timeout, String failureMessage) { + long deadline = System.nanoTime() + timeout.toNanos(); + + while (System.nanoTime() < deadline) { + if (condition.getAsBoolean()) { + return; + } + + try { + Thread.sleep(250); + } catch (InterruptedException error) { + Thread.currentThread().interrupt(); + throw new AssertionError(failureMessage, error); + } + } + + throw new AssertionError(failureMessage); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + int hitLimit) { + + return configureServerErrorRule(targetContainer, operationType, errorType, this.writableRegions.get(0), hitLimit); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + String region, + int hitLimit) { + + return configureServerErrorRule(targetContainer, operationType, errorType, region, currentFaultInjectionConnectionType(), hitLimit); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + String region, + FaultInjectionConnectionType connectionType, + int hitLimit) { + + FaultInjectionConditionBuilder conditionBuilder = new FaultInjectionConditionBuilder() + .operationType(operationType) + .connectionType(connectionType); + + if (region != null) { + conditionBuilder.region(region); + } + + FaultInjectionRule rule = new FaultInjectionRuleBuilder("customer-workflow-" + errorType + "-" + UUID.randomUUID()) + .condition(conditionBuilder.build()) + .result(FaultInjectionResultBuilders.getResultBuilder(errorType).build()) + .duration(Duration.ofMinutes(5)) + .hitLimit(hitLimit) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(targetContainer, Collections.singletonList(rule)).block(); + return rule; + } + + protected final FaultInjectionConnectionType currentFaultInjectionConnectionType() { + if (getConnectionPolicy().getConnectionMode() == ConnectionMode.GATEWAY) { + return FaultInjectionConnectionType.GATEWAY; + } + + return FaultInjectionConnectionType.DIRECT; + } + + protected final FaultInjectionRule configureResponseDelayRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + Duration delay, + int hitLimit) { + + FaultInjectionCondition condition = new FaultInjectionConditionBuilder() + .operationType(operationType) + .connectionType(currentFaultInjectionConnectionType()) + .build(); + + IFaultInjectionResult result = FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) + .delay(delay) + .times(hitLimit) + .build(); + + FaultInjectionRule rule = new FaultInjectionRuleBuilder("customer-workflow-response-delay-" + UUID.randomUUID()) + .condition(condition) + .result(result) + .duration(Duration.ofMinutes(5)) + .hitLimit(hitLimit) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(targetContainer, Collections.singletonList(rule)).block(); + return rule; + } + + protected static List discoverWritableRegions(CosmosAsyncClient client) { + AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(client); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) asyncDocumentClient; + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + + List writableRegions = new ArrayList<>(); + for (DatabaseAccountLocation accountLocation : databaseAccount.getWritableLocations()) { + writableRegions.add(accountLocation.getName()); + } + + return writableRegions; + } + + protected static List discoverReadableRegions(CosmosAsyncClient client) { + AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(client); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) asyncDocumentClient; + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + + List readableRegions = new ArrayList<>(); + for (DatabaseAccountLocation accountLocation : databaseAccount.getReadableLocations()) { + readableRegions.add(accountLocation.getName()); + } + + return readableRegions; + } + + protected static void skipIfInsufficientRegions(List regions, String scenarioName) { + if (regions == null || regions.size() < 2) { + throw new SkipException(scenarioName + " requires a live multi-region account."); + } + } + + protected static void skipIfInsufficientReadableRegions(List regions, String scenarioName) { + if (regions == null || regions.size() < 2) { + throw new SkipException(scenarioName + " requires a live multi-region single-write account."); + } + } + + protected static void skipIfNotSingleWriteRegion(List regions, String scenarioName) { + if (regions == null || regions.size() != 1) { + throw new SkipException(scenarioName + " requires exactly one write region."); + } + } + + protected static OverridableRequestOptions getRequestOptions(CosmosDiagnosticsContext diagnosticsContext) { + assertThat(diagnosticsContext).isNotNull(); + return ImplementationBridgeHelpers + .CosmosDiagnosticsContextHelper + .getCosmosDiagnosticsContextAccessor() + .getRequestOptions(diagnosticsContext); + } + + protected static void assertKeywordIdentifier(CosmosDiagnosticsContext diagnosticsContext, String expectedKeywordIdentifier) { + OverridableRequestOptions requestOptions = getRequestOptions(diagnosticsContext); + + assertThat(requestOptions.getKeywordIdentifiers()) + .containsExactlyInAnyOrder(expectedKeywordIdentifier); + } + + protected static void assertExcludedRegions( + CosmosDiagnosticsContext diagnosticsContext, + List expectedExcludedRegions) { + + OverridableRequestOptions requestOptions = getRequestOptions(diagnosticsContext); + + assertThat(requestOptions.getExcludedRegions()) + .containsExactlyElementsOf(expectedExcludedRegions); + } + + protected static void assertDidNotContactExcludedRegions( + CosmosDiagnosticsContext diagnosticsContext, + Collection excludedRegions) { + + Set contactedRegionNames = diagnosticsContext.getContactedRegionNames(); + Set normalizedExcludedRegions = excludedRegions + .stream() + .map(region -> region.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()); + + assertThat(contactedRegionNames).isNotNull(); + assertThat(contactedRegionNames).doesNotContainAnyElementsOf(normalizedExcludedRegions); + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml new file mode 100644 index 000000000000..edfa8a57770f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml new file mode 100644 index 000000000000..976b8fbdc204 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdk/cosmos/live-platform-matrix.json b/sdk/cosmos/live-platform-matrix.json index e5771c249c1c..8c9144b412e0 100644 --- a/sdk/cosmos/live-platform-matrix.json +++ b/sdk/cosmos/live-platform-matrix.json @@ -8,6 +8,8 @@ "-Pdirect": "Direct", "-Pmulti-master": "MultiMaster", "-Pfi-multi-master": "FaultInjectionMultiMaster", + "-Pfi-customer-workflows": "FaultInjectionCustomerWorkflows", + "-Pfi-sm-customer-workflows": "FaultInjectionSingleMasterCustomerWorkflows", "-Pflaky-multi-master": "FlakyMultiMaster", "-Pcircuit-breaker-misc-direct": "CircuitBreakerMiscDirect", "-Pcircuit-breaker-misc-gateway": "CircuitBreakerMiscGateway", @@ -22,6 +24,7 @@ "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session' }": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong' }": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enablePartitionMerge = $true }": "", + "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true; enablePartitionMerge = $true}": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong'; enableMultipleRegions = $true }": "" }, @@ -126,7 +129,7 @@ } }, "PROTOCOLS": "[\"Tcp\"]", - "ProfileFlag": [ "-Pfi-multi-master" ], + "ProfileFlag": [ "-Pfi-multi-master", "-Pfi-customer-workflows" ], "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"", "Agent": { "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } @@ -164,6 +167,21 @@ "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } } }, + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "SingleMaster_MultiRegion_FI_CustomerWorkflows": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[\"East US 2\"]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pfi-sm-customer-workflows" ], + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + }, { "DESIRED_CONSISTENCIES": "[\"Session\"]", "ACCOUNT_CONSISTENCY": "Session",