diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index 680f9470e6bf..18cb8e9cd2d1 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -659,6 +659,60 @@ Licensed under the MIT License. + + + fi-customer-workflows + + fi-customer-workflows + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.3 + + + src/test/resources/fi-customer-workflows-testng.xml + + + true + 1 + 256 + paranoid + + + + + + + + + fi-sm-customer-workflows + + fi-sm-customer-workflows + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.3 + + + src/test/resources/fi-sm-customer-workflows-testng.xml + + + true + 1 + 256 + paranoid + + + + + + multi-region diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java index 19bd22d6f570..2761269269c0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java @@ -276,8 +276,11 @@ public class PerPartitionAutomaticFailoverE2ETests extends TestSuiteBase { if (expectedResponseCharacteristics.shouldFinalResponseHaveSuccess) { assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); - assertThat(cosmosDiagnostics.getDiagnosticsContext().getStatusCode() >= HttpConstants.StatusCodes.OK - && cosmosDiagnostics.getDiagnosticsContext().getStatusCode() <= HttpConstants.StatusCodes.NOT_MODIFIED).isTrue(); + int finalStatusCode = cosmosDiagnostics.getDiagnosticsContext().getStatusCode(); + assertThat(finalStatusCode) + .as("final response status code should indicate success (2xx/304) but was %d (sub-status %d)", + finalStatusCode, cosmosDiagnostics.getDiagnosticsContext().getSubStatusCode()) + .isBetween(HttpConstants.StatusCodes.OK, HttpConstants.StatusCodes.NOT_MODIFIED); } }; @@ -1818,7 +1821,7 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamic *

Dynamic enablement is achieved by overriding GlobalEndpointManager's owner to * inject the PPAF flag into DatabaseAccount snapshots.

*/ - @Test(groups = {"multi-region", "fi-thinclient-multi-region"}, dataProvider = "ppafNonWriteDynamicEnablementScenarios") + @Test(groups = {"multi-region", "fi-thinclient-multi-region"}, dataProvider = "ppafNonWriteDynamicEnablementScenarios", retryAnalyzer = FlakyTestRetryAnalyzer.class) public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( String testType, OperationType operationType, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java index fc7cbdde87a5..f0fe5fbb64f2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java @@ -5,6 +5,7 @@ import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.FlakyTestRetryAnalyzer; import com.azure.cosmos.ReadConsistencyStrategy; import com.azure.cosmos.implementation.batch.ItemBatchOperation; import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest; @@ -338,7 +339,7 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce safeClose(dummyState); } - @Test(groups = { "fast" }, timeOut = TIMEOUT, dataProvider = "sessionTestArgProvider") + @Test(groups = { "fast" }, timeOut = TIMEOUT, dataProvider = "sessionTestArgProvider", retryAnalyzer = FlakyTestRetryAnalyzer.class) public void sessionTokenNotRequired(boolean isNameBased) { spyClient.readCollection(getCollectionLink(isNameBased), null).block(); // No session token set for the master resource related request diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java index a6423d69bb03..e1013fe00a06 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ContainerCreateDeleteWithSameNameTest.java @@ -889,6 +889,9 @@ private void createDeleteContainerWithSameName( String pkPathAfterRecreate, Function getPkAfterRecreate) throws InterruptedException { CosmosAsyncContainer container = null; + // A throwaway client runs the post-create readiness probe so it does not warm this test's main client + // collection cache - the test relies on that cache being stale after the container is recreated. + CosmosAsyncClient probeClient = getClientBuilder().buildAsyncClient(); try { // step1: create container String testContainerId = UUID.randomUUID().toString(); @@ -899,7 +902,7 @@ private void createDeleteContainerWithSameName( partitionKeyDef.setPaths(paths); CosmosContainerProperties containerProperties = getCollectionDefinition(testContainerId, partitionKeyDef); - container = createCollection(this.createdDatabase, containerProperties, new CosmosContainerRequestOptions(), ruBeforeDelete); + container = createCollection(this.createdDatabase, containerProperties, new CosmosContainerRequestOptions(), ruBeforeDelete, probeClient); // Step2: execute func validateFunc.accept(container, getPkBeforeDelete, false); @@ -912,13 +915,14 @@ private void createDeleteContainerWithSameName( partitionKeyDef.setPaths(Arrays.asList(pkPathAfterRecreate)); containerProperties = getCollectionDefinition(testContainerId, partitionKeyDef); - container = createCollection(this.createdDatabase, containerProperties, new CosmosContainerRequestOptions(), ruAfterRecreate); + container = createCollection(this.createdDatabase, containerProperties, new CosmosContainerRequestOptions(), ruAfterRecreate, probeClient); // step5: same as step2. // This part will confirm the cache refreshed correctly validateFunc.accept(container, getPkAfterRecreate, true); } finally { safeDeleteCollection(container); + safeClose(probeClient); } } @@ -930,6 +934,9 @@ private void changeFeedCreateDeleteContainerWithSameName( String pkPathAfterRecreate) throws InterruptedException { CosmosAsyncContainer feedContainer = null; CosmosAsyncContainer leaseContainer = null; + // A throwaway client runs the post-create readiness probe so it does not warm this test's main client + // collection cache - the test relies on that cache being stale after the feed container is recreated. + CosmosAsyncClient probeClient = getClientBuilder().buildAsyncClient(); try { // step1: create feed container and lease container @@ -937,7 +944,7 @@ private void changeFeedCreateDeleteContainerWithSameName( PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); partitionKeyDefinition.setPaths(Arrays.asList(pkPathBeforeDelete)); CosmosContainerProperties feedContainerProperties = getCollectionDefinition(feedContainerId, partitionKeyDefinition); - feedContainer = createCollection(this.createdDatabase, feedContainerProperties, new CosmosContainerRequestOptions(), ruBeforeDelete); + feedContainer = createCollection(this.createdDatabase, feedContainerProperties, new CosmosContainerRequestOptions(), ruBeforeDelete, probeClient); String leaseContainerId = UUID.randomUUID().toString(); CosmosContainerProperties leaseContainerProperties = getCollectionDefinition(leaseContainerId); @@ -954,7 +961,7 @@ private void changeFeedCreateDeleteContainerWithSameName( // step 4: recreate the feed container with same id as step 1 partitionKeyDefinition.setPaths(Arrays.asList(pkPathAfterRecreate)); feedContainerProperties = getCollectionDefinition(feedContainerId, partitionKeyDefinition); - feedContainer = createCollection(this.createdDatabase, feedContainerProperties, new CosmosContainerRequestOptions(), ruAfterRecreate); + feedContainer = createCollection(this.createdDatabase, feedContainerProperties, new CosmosContainerRequestOptions(), ruAfterRecreate, probeClient); // step5: recreate the lease container and lease container with same ids as step1 leaseContainer = createLeaseContainer(leaseContainerProperties.getId()); @@ -965,6 +972,7 @@ private void changeFeedCreateDeleteContainerWithSameName( } finally { safeDeleteCollection(feedContainer); safeDeleteCollection(leaseContainer); + safeClose(probeClient); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index 9a6427952ca6..4166dc386944 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -120,6 +120,7 @@ public void before_MultiOrderByQueryTests() throws Exception { documents = new ArrayList<>(); client = getClientBuilder().buildAsyncClient(); documentCollection = getSharedMultiPartitionCosmosContainerWithCompositeAndSpatialIndexes(client); + waitForCollectionToBeAvailableToRead(documentCollection, /* probeClient */ null); cleanUpContainer(documentCollection); expectCount(documentCollection, 0); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java index c632fd06e315..bcfc6cb66bf4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/NonStreamingOrderByQueryVectorSearchTest.java @@ -102,7 +102,7 @@ public void before_NonStreamingOrderByQueryVectorSearchTest() { database.createContainer(containerProperties).block(); largeDataContainer = database.getContainer(largeDataContainerId); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(largeDataContainer, /* probeClient */ null); for (Document doc : getVectorDocs()) { flatIndexContainer.createItem(doc).block(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 16722fd466bd..1bf138fc529e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -688,7 +688,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception { })) .block(); roundTripsContainer = createdDatabase.getContainer(containerName); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(roundTripsContainer, /* probeClient */ null); setupRoundTripContainer(); List> keyValuePropsList = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index 966890825de4..8658d3c4a265 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -126,7 +126,7 @@ public void orderByQueryForLargeCollection() { ).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId()); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(container, /* probeClient */ null); int partitionDocCount = 5; int pageSize = partitionDocCount + 1; @@ -382,7 +382,7 @@ public void splitQueryContinuationToken() throws Exception { CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk"); CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerId); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(container, /* probeClient */ null); AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client); //Insert some documents @@ -494,7 +494,7 @@ public void orderbyContinuationOnUndefinedAndNull() throws Exception { createdDatabase.createContainer(containerProperties, new CosmosContainerRequestOptions()).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId()); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(container, /* probeClient */ null); CosmosContainerResponse containerResponse = container.read().block(); assert (containerResponse != null); CosmosContainerProperties properties = containerResponse.getProperties(); @@ -582,7 +582,7 @@ public void queryLargePartitionKeyOn100BPKCollection() throws Exception { CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/id"); CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block(); CosmosAsyncContainer container = createdDatabase.getContainer(containerId); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(container, /* probeClient */ null); //id as partitionkey > 100bytes String itemID1 = "cosmosdb" + "-drWarm4Z60GkknMfHLo5BwuiH7w6AffzSb9jKbvwAQwaRZd10oxnLeCueuyZ5gbm9dwVVAqJLdzrB38Dk73Q6xMErv-0"; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java index e081fee801b4..d0f9c7dee219 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ReadFeedCollectionsTest.java @@ -88,7 +88,7 @@ public CosmosAsyncContainer createCollections(CosmosAsyncDatabase database) { partitionKeyDef.setPaths(paths); CosmosContainerProperties containerProperties = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); database.createContainer(containerProperties, new CosmosContainerRequestOptions()).block(); - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(database.getContainer(containerProperties.getId()), /* probeClient */ null); return database.getContainer(containerProperties.getId()); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 66b6e6314ad0..a13bfe770730 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -31,10 +31,13 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.Database; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.FeedResponseListValidator; +import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; @@ -43,6 +46,7 @@ import com.azure.cosmos.implementation.QueryFeedOperationState; import com.azure.cosmos.implementation.RequestOptions; import com.azure.cosmos.implementation.Resource; +import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.ResourceResponse; import com.azure.cosmos.implementation.ResourceResponseValidator; import com.azure.cosmos.implementation.TestConfigurations; @@ -51,6 +55,7 @@ import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.directconnectivity.Protocol; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.guava25.base.CaseFormat; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.models.CosmosClientTelemetryConfig; @@ -302,7 +307,7 @@ public CosmosAsyncDatabase getDatabase(String id) { @BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT) public void beforeSuite() { logger.info("beforeSuite Started"); @@ -353,7 +358,7 @@ private static DocumentCollection getInternalDocumentCollection(CosmosAsyncConta @AfterSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) public void afterSuite() { logger.info("afterSuite Started"); @@ -547,6 +552,18 @@ protected static void waitIfNeededForReplicasToCatchUp(CosmosClientBuilder clien public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, CosmosContainerRequestOptions options, int throughput) { + return createCollection(database, cosmosContainerProperties, options, throughput, /* probeClient */ null); + } + + /** + * Overload of {@link #createCollection(CosmosAsyncDatabase, CosmosContainerProperties, CosmosContainerRequestOptions, int)} + * that runs the post-creation collection-readiness probe using {@code probeClient} instead of the caller's + * client. Tests that depend on the caller's collection cache remaining stale after a recreate (for example + * {@code ContainerCreateDeleteWithSameNameTest}) pass a throwaway client here so the probe does not refresh + * their main client's cache. When {@code probeClient} is null the caller's client is used. + */ + public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, + CosmosContainerRequestOptions options, int throughput, CosmosAsyncClient probeClient) { database.createContainer(cosmosContainerProperties, ThroughputProperties.createManualThroughput(throughput), options) .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(5)) .filter(TestSuiteBase::isTransientCreateFailure)) @@ -566,24 +583,148 @@ public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database .getCosmosAsyncClientAccessor() .getPreferredRegions(client).size() > 1; if (throughput > 6000 || isMultiRegional) { - waitForCollectionToBeAvailableToRead(); + waitForCollectionToBeAvailableToRead(database.getContainer(cosmosContainerProperties.getId()), probeClient); } return database.getContainer(cosmosContainerProperties.getId()); } - protected static void waitForCollectionToBeAvailableToRead() { - // Creating a container is an async task - especially with multiple regions it can - // take some time until the container is available in the remote regions as well. - // When the container does not exist yet, metadata reads or item operations can - // fail with 404/1013 "Collection is not yet available for read". - // So, adding this delay after container creation to minimize risk of hitting these errors. - try { - TimeUnit.SECONDS.sleep(3); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); + protected static void waitForCollectionToBeAvailableToRead(CosmosAsyncContainer container, CosmosAsyncClient probeClient) { + // Creating a container is asynchronous - especially on multi-region accounts the new collection can + // take time to become readable in the non-write regions. Until then, reads routed to those regions fail + // with 404/1013 ("Collection is not yet available for read"). Instead of a fixed sleep, verify - against + // every non-primary region of the account - that the collection is readable, probing each region (by + // excluding all other regions) with exponential back-off until it succeeds, bounded to two minutes total. + // The probe is issued through probeClient when provided (so a throwaway client does not warm the caller's + // caches); otherwise the container's own client is used. + CosmosAsyncClient client = probeClient != null + ? probeClient + : ImplementationBridgeHelpers + .CosmosAsyncDatabaseHelper + .getCosmosAsyncDatabaseAccessor() + .getCosmosAsyncClient(container.getDatabase()); + DatabaseAccount databaseAccount = getLatestDatabaseAccount(client); + CosmosAsyncContainer probeContainer = + client.getDatabase(container.getDatabase().getId()).getContainer(container.getId()); + + // Use the account's regions (not the client's preferred regions, which may be a subset). + List allRegions = new ArrayList<>(); + for (DatabaseAccountLocation location : databaseAccount.getReadableLocations()) { + allRegions.add(location.getName()); + } + + // The primary region is the first writable location; propagation lag manifests in the other regions. + String primaryRegion = null; + for (DatabaseAccountLocation location : databaseAccount.getWritableLocations()) { + primaryRegion = location.getName(); + break; + } + final String primary = primaryRegion; + + List nonPrimaryRegions = allRegions + .stream() + .filter(region -> primary == null || !region.equalsIgnoreCase(primary)) + .collect(Collectors.toList()); + + Duration maxWait = Duration.ofMinutes(2); + long deadlineNanos = System.nanoTime() + maxWait.toNanos(); + + if (nonPrimaryRegions.isEmpty()) { + // Single-region account: there is no non-primary region to verify, but the collection still needs + // to be readable (for example while physical partitions are provisioned). + awaitContainerReadableInRegion(probeContainer, null, Collections.emptyList(), deadlineNanos, maxWait); + return; } + + // Verify the collection is readable in each non-primary region. + for (String region : nonPrimaryRegions) { + final String target = region; + List excludedRegions = allRegions + .stream() + .filter(other -> !other.equalsIgnoreCase(target)) + .collect(Collectors.toList()); + awaitContainerReadableInRegion(probeContainer, region, excludedRegions, deadlineNanos, maxWait); + } + } + + private static void awaitContainerReadableInRegion( + CosmosAsyncContainer container, + String targetRegion, + List excludedRegions, + long deadlineNanos, + Duration maxWait) { + + long backoffMillis = 100; + long maxBackoffMillis = 5000; + int attempts = 0; + Throwable lastError = null; + + while (true) { + attempts++; + try { + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + if (!excludedRegions.isEmpty()) { + options.setExcludedRegions(excludedRegions); + } + // A successful (possibly empty) page proves the collection is resolvable/readable in the + // targeted region. + container.queryItems("SELECT TOP 1 c.id FROM c", options, Object.class) + .byPage(1) + .blockFirst(); + return; + } catch (Exception error) { + lastError = error; + } + + long remainingNanos = deadlineNanos - System.nanoTime(); + if (remainingNanos <= 0) { + break; + } + + long sleepMillis = Math.max(1, Math.min(backoffMillis, TimeUnit.NANOSECONDS.toMillis(remainingNanos))); + try { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for collection to be available to read.", interrupted); + } + backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis); + } + + throw new AssertionError( + String.format( + "Container '%s' was not available to read%s within %d seconds (%d attempts).", + container.getId(), + targetRegion != null ? " in region '" + targetRegion + "'" : "", + maxWait.getSeconds(), + attempts), + lastError); + } + + private static DatabaseAccount getLatestDatabaseAccount(CosmosAsyncClient client) { + AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(client); + GlobalEndpointManager globalEndpointManager = + ReflectionUtils.getGlobalEndpointManager((RxDocumentClientImpl) asyncDocumentClient); + + // The latest database account is populated during client initialization; poll briefly to defend against + // an initialization race. + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + long deadlineNanos = System.nanoTime() + Duration.ofSeconds(10).toNanos(); + while (databaseAccount == null && System.nanoTime() < deadlineNanos) { + try { + TimeUnit.MILLISECONDS.sleep(200); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while resolving the database account.", interrupted); + } + databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + } + + if (databaseAccount == null) { + throw new AssertionError("Database account was not available to determine the account's regions."); + } + + return databaseAccount; } public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java new file mode 100644 index 000000000000..176715cd5a2a --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowAvailabilityFaultMatrixTest.java @@ -0,0 +1,199 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowAvailabilityFaultMatrixTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowAvailabilityFaultMatrixTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer availability fault workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @DataProvider(name = "availabilityFaultScenarios") + public Object[][] availabilityFaultScenarios() { + return new Object[][]{ + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.GONE}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {"read", FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.GONE}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"query", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.GONE}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.RETRY_WITH}, + {"create", FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.GONE}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"replace", FaultInjectionOperationType.REPLACE_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.GONE}, + {"delete", FaultInjectionOperationType.DELETE_ITEM, FaultInjectionServerErrorType.TIMEOUT}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE}, + {"patch", FaultInjectionOperationType.PATCH_ITEM, FaultInjectionServerErrorType.GONE} + }; + } + + @Test(groups = {"fi-customer-workflows"}, dataProvider = "availabilityFaultScenarios", timeOut = TIMEOUT) + public void representativeDirectMultiMasterFaultWorkflow( + String operation, + FaultInjectionOperationType faultInjectionOperationType, + FaultInjectionServerErrorType errorType) { + + skipIfNotDirectMode("Customer availability fault workflow (direct multi-master)"); + + TestObject item = TestObject.create(); + if (!"create".equals(operation)) { + this.container.createItem(item).block(); + registerForCleanup(item); + } + + List faultRules = "readMany".equals(operation) + ? configureReadManyServerErrorRules(this.container, errorType, this.writableRegions.get(0), 1) + : Collections.singletonList(configureServerErrorRule( + this.container, + faultInjectionOperationType, + errorType, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1)); + + try { + CosmosDiagnosticsContext diagnosticsContext = executeOperation(operation, item); + + assertFaultInjectedOperation(diagnosticsContext, faultRules); + assertThat(diagnosticsContext.getDuration()).isNotNull(); + } finally { + faultRules.forEach(FaultInjectionRule::disable); + } + } + + private CosmosDiagnosticsContext executeOperation(String operation, TestObject item) { + try { + if ("read".equals(operation)) { + CosmosItemResponse response = this.container + .readItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions(), TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("query".equals(operation)) { + FeedResponse response = this.container + .queryItems( + String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), + new CosmosQueryRequestOptions().setQueryName("AvailabilityFaultWorkflowQuery"), + TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("readMany".equals(operation)) { + FeedResponse response = this.container + .readMany( + Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), + new CosmosReadManyRequestOptions(), + TestObject.class) + .block(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("upsert".equals(operation)) { + item.setStringProp("fault-upsert-" + item.getStringProp()); + CosmosItemResponse response = this.container + .upsertItem(item, new CosmosItemRequestOptions().setContentResponseOnWriteEnabled(true)) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("replace".equals(operation)) { + item.setStringProp("fault-replace-" + item.getStringProp()); + CosmosItemResponse response = this.container + .replaceItem(item, item.getId(), partitionKey(item), new CosmosItemRequestOptions()) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("delete".equals(operation)) { + CosmosItemResponse response = this.container + .deleteItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions()) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("patch".equals(operation)) { + CosmosItemResponse response = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "fault-patch-" + item.getStringProp()), + TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } + + CosmosItemResponse response = this.container + .createItem(item, new CosmosItemRequestOptions().setContentResponseOnWriteEnabled(true)) + .block(); + + registerForCleanup(item); + return response.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + CosmosDiagnosticsContext diagnosticsContext = error.getDiagnostics().getDiagnosticsContext(); + assertThat(error.getStatusCode()).isGreaterThanOrEqualTo(HttpConstants.StatusCodes.BADREQUEST); + return diagnosticsContext; + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java new file mode 100644 index 000000000000..2d5b88cacd65 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowChangeFeedProcessorTest.java @@ -0,0 +1,219 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.ChangeFeedProcessor; +import com.azure.cosmos.ChangeFeedProcessorBuilder; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.models.ChangeFeedProcessorItem; +import com.azure.cosmos.models.ChangeFeedProcessorOptions; +import com.azure.cosmos.models.ChangeFeedProcessorState; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.fasterxml.jackson.databind.JsonNode; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowChangeFeedProcessorTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowChangeFeedProcessorTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer change feed processor workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void latestVersionProcessorRestartResumesFromLeasesWorkflow() throws InterruptedException { + CosmosAsyncContainer feedContainer = createTemporaryContainer("customer-cfp-feed", "/mypk"); + CosmosAsyncContainer leaseContainer = createTemporaryContainer("customer-cfp-lease", "/id"); + ChangeFeedProcessor processor = null; + FaultInjectionRule readFeedDelayRule = null; + + try { + Set expectedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + Set receivedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + CountDownLatch initialLatch = new CountDownLatch(2); + + createFeedItem(feedContainer, expectedIds, "cfp-initial-1"); + createFeedItem(feedContainer, expectedIds, "cfp-initial-2"); + + // Use a single, stable lease prefix so the second processor instance resumes from the persisted + // continuation instead of reprocessing from the beginning - this validates a genuine restart. + String leasePrefix = "resume"; + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, receivedIds, initialLatch, leasePrefix); + processor.start().block(); + ChangeFeedProcessor initialProcessor = processor; + + assertThat(processor.isStarted()).isTrue(); + assertThat(initialLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(receivedIds).containsAll(expectedIds); + + awaitCondition( + () -> hasAcquiredLeases(initialProcessor), + Duration.ofSeconds(20), + "Change feed processor did not acquire leases."); + + processor.stop().block(); + assertThat(processor.isStarted()).isFalse(); + + CountDownLatch restartLatch = new CountDownLatch(1); + TestObject restartedItem = createFeedItem(feedContainer, expectedIds, "cfp-restart"); + readFeedDelayRule = configureResponseDelayRule(feedContainer, FaultInjectionOperationType.READ_FEED_ITEM, Duration.ofMillis(100), 1); + + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, receivedIds, restartLatch, leasePrefix); + processor.start().block(); + + assertThat(processor.isStarted()).isTrue(); + assertThat(restartLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(receivedIds).contains(restartedItem.getId()); + + // getEstimatedLag() is not supported for a latest-version processor; query the per-lease state + // (which exposes the estimated lag) via the supported getCurrentState() API instead. + List currentState = processor.getCurrentState().block(); + assertThat(currentState).isNotNull().isNotEmpty(); + assertThat(currentState).allSatisfy(state -> assertThat(state.getEstimatedLag()).isGreaterThanOrEqualTo(0)); + } finally { + if (readFeedDelayRule != null) { + readFeedDelayRule.disable(); + } + if (processor != null && processor.isStarted()) { + processor.stop().block(); + } + deleteTemporaryContainer(feedContainer); + deleteTemporaryContainer(leaseContainer); + } + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void latestVersionProcessorWithNewLeasePrefixReprocessesFromBeginningWorkflow() throws InterruptedException { + CosmosAsyncContainer feedContainer = createTemporaryContainer("customer-cfp-feed", "/mypk"); + CosmosAsyncContainer leaseContainer = createTemporaryContainer("customer-cfp-lease", "/id"); + ChangeFeedProcessor processor = null; + + try { + Set expectedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + Set initialReceivedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + CountDownLatch initialLatch = new CountDownLatch(2); + + createFeedItem(feedContainer, expectedIds, "cfp-initial-1"); + createFeedItem(feedContainer, expectedIds, "cfp-initial-2"); + + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, initialReceivedIds, initialLatch, "initial"); + processor.start().block(); + ChangeFeedProcessor initialProcessor = processor; + + assertThat(processor.isStarted()).isTrue(); + assertThat(initialLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(initialReceivedIds).containsAll(expectedIds); + + awaitCondition( + () -> hasAcquiredLeases(initialProcessor), + Duration.ofSeconds(20), + "Change feed processor did not acquire leases."); + + processor.stop().block(); + assertThat(processor.isStarted()).isFalse(); + + // A different lease prefix creates a fresh lease set, so a from-beginning processor reprocesses all + // existing items. A separate received-id set is required because the original set already contains them. + Set reprocessedIds = Collections.newSetFromMap(new ConcurrentHashMap()); + CountDownLatch reprocessLatch = new CountDownLatch(expectedIds.size()); + + processor = createLatestVersionProcessor(feedContainer, leaseContainer, expectedIds, reprocessedIds, reprocessLatch, "fresh"); + processor.start().block(); + + assertThat(processor.isStarted()).isTrue(); + assertThat(reprocessLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(reprocessedIds).containsAll(expectedIds); + + // getEstimatedLag() is not supported for a latest-version processor; query the per-lease state + // (which exposes the estimated lag) via the supported getCurrentState() API instead. + List currentState = processor.getCurrentState().block(); + assertThat(currentState).isNotNull().isNotEmpty(); + assertThat(currentState).allSatisfy(state -> assertThat(state.getEstimatedLag()).isGreaterThanOrEqualTo(0)); + } finally { + if (processor != null && processor.isStarted()) { + processor.stop().block(); + } + deleteTemporaryContainer(feedContainer); + deleteTemporaryContainer(leaseContainer); + } + } + + private static boolean hasAcquiredLeases(ChangeFeedProcessor processor) { + List currentState = processor.getCurrentState().block(); + return currentState != null && !currentState.isEmpty(); + } + + private TestObject createFeedItem(CosmosAsyncContainer feedContainer, Set expectedIds, String partitionKey) { + TestObject item = TestObject.create(partitionKey + "-" + UUID.randomUUID()); + feedContainer.createItem(item).block(); + expectedIds.add(item.getId()); + return item; + } + + private ChangeFeedProcessor createLatestVersionProcessor( + CosmosAsyncContainer feedContainer, + CosmosAsyncContainer leaseContainer, + Set expectedIds, + Set receivedIds, + CountDownLatch latch, + String leasePrefix) { + + return new ChangeFeedProcessorBuilder() + .hostName("customer-workflow-" + leasePrefix + "-" + UUID.randomUUID()) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .handleLatestVersionChanges(items -> recordLatestVersionItems(items, expectedIds, receivedIds, latch)) + .options(new ChangeFeedProcessorOptions() + .setStartFromBeginning(true) + .setFeedPollDelay(Duration.ofMillis(500)) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setLeaseRenewInterval(Duration.ofSeconds(2)) + .setLeaseExpirationInterval(Duration.ofSeconds(6)) + .setMaxItemCount(10) + .setLeasePrefix("customer-" + leasePrefix)) + .buildChangeFeedProcessor(); + } + + private static void recordLatestVersionItems( + List items, + Set expectedIds, + Set receivedIds, + CountDownLatch latch) { + + for (ChangeFeedProcessorItem item : items) { + JsonNode current = item.getCurrent(); + if (current != null && current.has("id")) { + String id = current.get("id").asText(); + if (expectedIds.contains(id) && receivedIds.add(id)) { + latch.countDown(); + } + } + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java new file mode 100644 index 000000000000..2356468005ce --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowDaoStyleOperationsTest.java @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosItemSerializer; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowDaoStyleOperationsTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowDaoStyleOperationsTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer DAO-style workflow tests", true); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void crudReadAllPatchBatchAndBulkWorkflow() { + List excludedRegions = excludeFirstWritableRegion(); + TestObject item = TestObject.create(); + + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(Collections.singleton("workflow-crud-create")) + .setExcludedRegions(excludedRegions) + .setCustomItemSerializer(CosmosItemSerializer.DEFAULT_SERIALIZER) + .setContentResponseOnWriteEnabled(true); + + CosmosItemResponse createResponse = this.container + .createItem(item, createOptions) + .block(); + + assertThat(createResponse).isNotNull(); + registerForCleanup(item); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertKeywordIdentifier(createResponse.getDiagnostics().getDiagnosticsContext(), "workflow-crud-create"); + assertThat(getRequestOptions(createResponse.getDiagnostics().getDiagnosticsContext()).getCustomItemSerializer()) + .isSameAs(CosmosItemSerializer.DEFAULT_SERIALIZER); + assertDidNotContactExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), new CosmosItemRequestOptions().setExcludedRegions(excludedRegions), TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + assertThat(readResponse.getItem()).isEqualTo(item); + + FeedResponse readAllResponse = this.container + .readAllItems( + partitionKey(item), + new CosmosQueryRequestOptions() + .setExcludedRegions(excludedRegions) + .setCustomItemSerializer(CosmosItemSerializer.DEFAULT_SERIALIZER), + TestObject.class) + .byPage() + .blockFirst(); + + assertThat(readAllResponse).isNotNull(); + assertThat(readAllResponse.getResults()).extracting(TestObject::getId).contains(item.getId()); + assertExcludedRegions(readAllResponse.getCosmosDiagnostics().getDiagnosticsContext(), excludedRegions); + assertThat(getRequestOptions(readAllResponse.getCosmosDiagnostics().getDiagnosticsContext()).getCustomItemSerializer()) + .isSameAs(CosmosItemSerializer.DEFAULT_SERIALIZER); + + CosmosPatchOperations patchOperations = CosmosPatchOperations.create() + .set("/stringProp", "patched-" + item.getStringProp()); + + CosmosItemResponse patchResponse = this.container + .patchItem(item.getId(), partitionKey(item), patchOperations, TestObject.class) + .block(); + + assertThat(patchResponse).isNotNull(); + assertThat(patchResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(patchResponse.getItem().getStringProp()).startsWith("patched-"); + + String batchPk = "batch-" + UUID.randomUUID(); + TestObject batchItem = TestObject.create(batchPk); + CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey(batchItem)); + batch.createItemOperation(batchItem); + batch.readItemOperation(batchItem.getId()); + + CosmosBatchResponse batchResponse = this.container.executeCosmosBatch(batch).block(); + + assertThat(batchResponse).isNotNull(); + registerForCleanup(batchItem); + assertThat(batchResponse.isSuccessStatusCode()).isTrue(); + assertThat(batchResponse.size()).isEqualTo(2); + assertThat(batchResponse.getDiagnostics()).isNotNull(); + + TestObject bulkItem = TestObject.create(); + this.container.createItem(bulkItem).block(); + registerForCleanup(bulkItem); + CosmosPatchOperations bulkPatchOperations = CosmosPatchOperations.create() + .set("/stringProp", "bulk-patched-" + bulkItem.getStringProp()); + + List bulkOperations = new ArrayList<>(); + bulkOperations.add(CosmosBulkOperations.getReadItemOperation(bulkItem.getId(), partitionKey(bulkItem))); + bulkOperations.add(CosmosBulkOperations.getPatchItemOperation(bulkItem.getId(), partitionKey(bulkItem), bulkPatchOperations)); + + CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions() + .setMaxMicroBatchSize(2) + .setExcludedRegions(excludedRegions) + .setKeywordIdentifiers(Collections.singleton("workflow-bulk")); + + List> bulkResponses = this.container + .executeBulkOperations(Flux.fromIterable(bulkOperations), bulkExecutionOptions) + .collectList() + .block(); + + assertThat(bulkResponses).isNotNull(); + assertThat(bulkResponses).hasSize(2); + assertThat(bulkResponses).allSatisfy(response -> { + assertThat(response.getException()).isNull(); + assertThat(response.getResponse().getStatusCode()).isIn(HttpConstants.StatusCodes.OK, HttpConstants.StatusCodes.CREATED); + assertThat(response.getResponse().getCosmosDiagnostics()).isNotNull(); + }); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java new file mode 100644 index 000000000000..3946d0324b1e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowHighE2ETimeoutTest.java @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchRequestOptions; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchItemRequestOptions; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowHighE2ETimeoutTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowHighE2ETimeoutTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer high E2E timeout workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @DataProvider(name = "timeoutWorkflowOperations") + public Object[][] timeoutWorkflowOperations() { + return new Object[][]{ + {"create", FaultInjectionOperationType.CREATE_ITEM}, + {"read", FaultInjectionOperationType.READ_ITEM}, + {"query", FaultInjectionOperationType.QUERY_ITEM}, + {"readMany", FaultInjectionOperationType.QUERY_ITEM}, + {"upsert", FaultInjectionOperationType.UPSERT_ITEM}, + {"batch", FaultInjectionOperationType.BATCH_ITEM}, + {"patch", FaultInjectionOperationType.PATCH_ITEM} + }; + } + + @Test(groups = {"fi-customer-workflows"}, dataProvider = "timeoutWorkflowOperations", timeOut = 2 * TIMEOUT) + public void responseDelayWithAvailabilityStrategyWorkflow(String operation, FaultInjectionOperationType faultInjectionOperationType) { + TestObject item = TestObject.create(); + if (!"create".equals(operation)) { + this.container.createItem(item).block(); + registerForCleanup(item); + } + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + // readMany resolves to a point read for a single item, so the QUERY_ITEM data-provider value alone would not + // exercise the fault - inject the delay for both the point-read and query operation types. + List delayRules = new ArrayList<>(); + if ("readMany".equals(operation)) { + delayRules.add(configureResponseDelayRule(this.container, FaultInjectionOperationType.READ_ITEM, Duration.ofMillis(1500), 1)); + delayRules.add(configureResponseDelayRule(this.container, FaultInjectionOperationType.QUERY_ITEM, Duration.ofMillis(1500), 1)); + } else { + delayRules.add(configureResponseDelayRule(this.container, faultInjectionOperationType, Duration.ofMillis(1500), 1)); + } + + try { + CosmosDiagnosticsContext diagnosticsContext = executeWithE2EPolicy(operation, item, e2ePolicy); + + assertFaultInjectedOperation(diagnosticsContext, delayRules); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(10)); + } finally { + delayRules.forEach(FaultInjectionRule::disable); + } + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void partitionMigratingFaultWithE2EPolicyWorkflow() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + FaultInjectionRule migratingRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, + 1); + + try { + CosmosDiagnosticsContext diagnosticsContext = executeWithE2EPolicy("read", item, e2ePolicy); + + assertFaultInjectedOperation(diagnosticsContext, migratingRule); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(10)); + } finally { + migratingRule.disable(); + } + } + + private CosmosDiagnosticsContext executeWithE2EPolicy( + String operation, + TestObject item, + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + + try { + if ("create".equals(operation)) { + TestObject createdItem = TestObject.create(); + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosItemResponse response = this.container + .createItem(createdItem, options) + .block(); + + registerForCleanup(createdItem); + return response.getDiagnostics().getDiagnosticsContext(); + } + + if ("read".equals(operation)) { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } + + if ("query".equals(operation)) { + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy) + .setQueryName("HighE2ETimeoutWorkflowQuery"); + + FeedResponse response = this.container + .queryItems(String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), options, TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("readMany".equals(operation)) { + CosmosReadManyRequestOptions options = new CosmosReadManyRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + FeedResponse response = this.container + .readMany(Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), options, TestObject.class) + .block(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } + + if ("upsert".equals(operation)) { + item.setStringProp("timeout-upsert-" + item.getStringProp()); + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .upsertItem(item, options) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } + + if ("batch".equals(operation)) { + TestObject batchItem = TestObject.create("timeout-batch"); + CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey(batchItem)); + batch.createItemOperation(batchItem); + batch.readItemOperation(batchItem.getId()); + + CosmosBatchRequestOptions batchOptions = new CosmosBatchRequestOptions(); + ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper + .getCosmosBatchRequestOptionsAccessor() + .setEndToEndOperationLatencyPolicyConfig(batchOptions, e2ePolicy); + + CosmosBatchResponse response = this.container.executeCosmosBatch(batch, batchOptions).block(); + + registerForCleanup(batchItem); + return response.getDiagnostics().getDiagnosticsContext(); + } + + CosmosPatchItemRequestOptions options = new CosmosPatchItemRequestOptions(); + options.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosItemResponse response = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "timeout-patched-" + item.getStringProp()), + options, + TestObject.class) + .block(); + + return response.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java new file mode 100644 index 000000000000..a5eeaca9822d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowLatestCommittedTest.java @@ -0,0 +1,171 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowLatestCommittedTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowLatestCommittedTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer latest-committed workflow tests", true); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void latestCommittedAndExcludedRegionsFlowAcrossReadOperations() { + List excludedRegions = excludeFirstWritableRegion(); + TestObject item = TestObject.create(); + + CosmosItemResponse createResponse = this.container + .createItem(item, new CosmosItemRequestOptions().setExcludedRegions(excludedRegions)) + .block(); + + assertThat(createResponse).isNotNull(); + registerForCleanup(item); + CosmosDiagnosticsContext createDiagnostics = createResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertThat(createDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.DEFAULT); + assertExcludedRegions(createDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(createDiagnostics, excludedRegions); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setExcludedRegions(excludedRegions) + .setKeywordIdentifiers(Collections.singleton("latest-committed-read")) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext readDiagnostics = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(readDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertThat(readDiagnostics.getTotalRequestCharge()).isGreaterThan(0); + assertKeywordIdentifier(readDiagnostics, "latest-committed-read"); + assertExcludedRegions(readDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readDiagnostics, excludedRegions); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setQueryName("LatestCommittedCustomerWorkflowQuery"); + + FeedResponse queryResponse = this.container + .queryItems(String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()), queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(queryResponse).isNotNull(); + assertThat(queryResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext queryDiagnostics = queryResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(queryDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(queryDiagnostics, excludedRegions); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse readManyResponse = this.container + .readMany(Collections.singletonList(new CosmosItemIdentity(partitionKey(item), item.getId())), readManyOptions, TestObject.class) + .block(); + + assertThat(readManyResponse).isNotNull(); + assertThat(readManyResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext readManyDiagnostics = readManyResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(readManyDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(readManyDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readManyDiagnostics, excludedRegions); + + CosmosChangeFeedRequestOptions changeFeedOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forLogicalPartition(partitionKey(item))) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setExcludedRegions(excludedRegions); + + FeedResponse changeFeedResponse = this.container + .queryChangeFeed(changeFeedOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(changeFeedResponse) + .as("change feed query should return at least one page before reading diagnostics") + .isNotNull(); + CosmosDiagnosticsContext changeFeedDiagnostics = changeFeedResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(changeFeedDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(changeFeedDiagnostics, excludedRegions); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void latestCommittedReadWithRegionalLeaseNotFoundFault() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + FaultInjectionRule leaseNotFoundRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.LEASE_NOT_FOUND, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .setKeywordIdentifiers(Collections.singleton("latest-committed-fault-read")); + + CosmosDiagnosticsContext diagnosticsContext; + try { + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), partitionKey(item), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + diagnosticsContext = readResponse.getDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + diagnosticsContext = error.getDiagnostics().getDiagnosticsContext(); + } + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertFaultInjectedOperation(diagnosticsContext, leaseNotFoundRule); + } finally { + leaseNotFoundRule.disable(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java new file mode 100644 index 000000000000..8ffaf5571295 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowPartitionLevelCircuitBreakerTest.java @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchItemRequestOptions; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowPartitionLevelCircuitBreakerTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowPartitionLevelCircuitBreakerTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer PCLB workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = 2 * TIMEOUT) + public void pointOperationCircuitBreakerAndQueryPlanWorkflow() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(3)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + FaultInjectionRule readFaultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, + 1); + + try { + CosmosDiagnosticsContext readDiagnostics = readWithPolicy(item, e2ePolicy); + + assertFaultInjectedOperation(readDiagnostics, readFaultRule); + } finally { + readFaultRule.disable(); + } + + CosmosDiagnosticsContext queryDiagnostics = queryWithPolicy(item, e2ePolicy); + assertThat(queryDiagnostics).isNotNull(); + assertThat(queryDiagnostics.getStatusCode()).isBetween(200, 599); + assertThat(queryDiagnostics.getContactedRegionNames()).isNotNull(); + assertThat(queryDiagnostics.toJson()).contains("queryPlanDiagnosticsContext"); + + CosmosPatchItemRequestOptions patchOptions = new CosmosPatchItemRequestOptions(); + patchOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + CosmosItemResponse patchResponse = this.container + .patchItem( + item.getId(), + partitionKey(item), + CosmosPatchOperations.create().set("/stringProp", "pclb-patched-" + item.getStringProp()), + patchOptions, + TestObject.class) + .block(); + + assertThat(patchResponse).isNotNull(); + assertThat(patchResponse.getDiagnostics()).isNotNull(); + } + + private CosmosDiagnosticsContext readWithPolicy(TestObject item, CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + try { + CosmosItemRequestOptions options = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } + + private CosmosDiagnosticsContext queryWithPolicy(TestObject item, CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy) { + try { + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy) + .setQueryName("PclbCustomerWorkflowQuery"); + + // ORDER BY forces the gateway query-plan round-trip so the queryPlanDiagnosticsContext is always present, + // independent of single-partition / ServiceInterop query-plan optimizations. + FeedResponse response = this.container + .queryItems( + String.format("SELECT * FROM c WHERE c.id = '%s' ORDER BY c.id", item.getId()), + queryOptions, + TestObject.class) + .byPage() + .blockFirst(); + + return response.getCosmosDiagnostics().getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java new file mode 100644 index 000000000000..f46b517813a6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowRequestOptionsTest.java @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.OverridableRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowRequestOptionsTest extends CustomerWorkflowTestBase { + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowRequestOptionsTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer workflow request option tests", true); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void excludedRegionAndKeywordIdentifiersFlowAcrossOperations() { + String excludedRegion = this.writableRegions.get(0); + List excludedRegions = Collections.singletonList(excludedRegion); + TestObject item = TestObject.create(); + + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-create")) + .setContentResponseOnWriteEnabled(true) + .setExcludedRegions(excludedRegions); + + CosmosItemResponse createResponse = this.container + .createItem(item, createOptions) + .block(); + + assertThat(createResponse).isNotNull(); + registerForCleanup(item); + assertThat(createResponse.getStatusCode()).isEqualTo(201); + assertKeywordIdentifier(createResponse.getDiagnostics().getDiagnosticsContext(), "customer-create"); + assertExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(createResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-read")) + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + CosmosItemResponse readResponse = this.container + .readItem(item.getId(), new PartitionKey(item.getMypk()), readOptions, TestObject.class) + .block(); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext readDiagnostics = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(200); + assertThat(readDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertKeywordIdentifier(readDiagnostics, "customer-read"); + assertExcludedRegions(readDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readDiagnostics, excludedRegions); + + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-query")) + .setExcludedRegions(excludedRegions) + .setConsistencyLevel(ConsistencyLevel.EVENTUAL) + .setQueryMetricsEnabled(true) + .setQueryName("CustomerWorkflowQuery"); + + String query = String.format("SELECT * FROM c WHERE c.id = '%s'", item.getId()); + FeedResponse queryResponse = this.container + .queryItems(query, queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + assertThat(queryResponse).isNotNull(); + assertThat(queryResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext queryDiagnostics = queryResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertKeywordIdentifier(queryDiagnostics, "customer-query"); + assertExcludedRegions(queryDiagnostics, excludedRegions); + OverridableRequestOptions queryRequestOptions = getRequestOptions(queryDiagnostics); + assertThat(queryRequestOptions.getConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat(queryRequestOptions.isQueryMetricsEnabled()).isTrue(); + assertThat(queryRequestOptions.getQueryNameOrDefault(null)).isEqualTo("CustomerWorkflowQuery"); + + CosmosReadManyRequestOptions readManyOptions = new CosmosReadManyRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-read-many")) + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + FeedResponse readManyResponse = this.container + .readMany( + Arrays.asList(new CosmosItemIdentity(new PartitionKey(item.getMypk()), item.getId())), + readManyOptions, + TestObject.class) + .block(); + + assertThat(readManyResponse).isNotNull(); + assertThat(readManyResponse.getResults()).hasSize(1); + CosmosDiagnosticsContext readManyDiagnostics = readManyResponse.getCosmosDiagnostics().getDiagnosticsContext(); + assertThat(readManyDiagnostics.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertKeywordIdentifier(readManyDiagnostics, "customer-read-many"); + assertExcludedRegions(readManyDiagnostics, excludedRegions); + assertDidNotContactExcludedRegions(readManyDiagnostics, excludedRegions); + + item.setStringProp("updated-" + item.getStringProp()); + CosmosItemRequestOptions upsertOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-upsert")) + .setExcludedRegions(excludedRegions) + .setContentResponseOnWriteEnabled(true); + + CosmosItemResponse upsertResponse = this.container + .upsertItem(item, upsertOptions) + .block(); + + assertThat(upsertResponse).isNotNull(); + assertThat(upsertResponse.getStatusCode()).isEqualTo(200); + assertKeywordIdentifier(upsertResponse.getDiagnostics().getDiagnosticsContext(), "customer-upsert"); + assertExcludedRegions(upsertResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(upsertResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + + CosmosItemRequestOptions deleteOptions = new CosmosItemRequestOptions() + .setKeywordIdentifiers(Collections.singleton("customer-delete")) + .setExcludedRegions(excludedRegions); + + CosmosItemResponse deleteResponse = this.container + .deleteItem(item.getId(), new PartitionKey(item.getMypk()), deleteOptions) + .block(); + + assertThat(deleteResponse).isNotNull(); + assertThat(deleteResponse.getStatusCode()).isEqualTo(204); + assertKeywordIdentifier(deleteResponse.getDiagnostics().getDiagnosticsContext(), "customer-delete"); + assertExcludedRegions(deleteResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + assertDidNotContactExcludedRegions(deleteResponse.getDiagnostics().getDiagnosticsContext(), excludedRegions); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java new file mode 100644 index 000000000000..2ad18a9c09a7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSessionTokenTest.java @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.ConsistencyTestsBase; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.SessionTokenHelper; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.FeedResponse; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class CustomerWorkflowSessionTokenTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowSessionTokenTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer session-token workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void readManyWithAdvancedSessionTokenReturnsReadSessionNotAvailable() throws Exception { + List itemIdentities = new ArrayList<>(); + String lastSessionToken = null; + + for (int index = 0; index < 3; index++) { + TestObject item = TestObject.create("session-token-workflow"); + CosmosItemResponse createResponse = this.container.createItem(item).block(); + + assertThat(createResponse).isNotNull(); + registerForCleanup(item); + lastSessionToken = createResponse.getSessionToken(); + itemIdentities.add(new CosmosItemIdentity(partitionKey(item), item.getId())); + } + + FeedResponse validReadManyResponse = this.container + .readMany(itemIdentities, lastSessionToken, TestObject.class) + .block(); + + assertThat(validReadManyResponse).isNotNull(); + assertThat(validReadManyResponse.getResults()).hasSize(3); + + String advancedSessionToken = advanceSessionToken(lastSessionToken); + + try { + this.container + .readMany(itemIdentities, advancedSessionToken, TestObject.class) + .block(); + + fail("Should have hit read session not available error."); + } catch (Exception error) { + CosmosException cosmosException = Utils.as(error, CosmosException.class); + + assertThat(cosmosException).isNotNull(); + assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND); + assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + assertThat(cosmosException.getDiagnostics()).isNotNull(); + } + } + + private static String advanceSessionToken(String originalSessionToken) throws Exception { + String[] tokenParts = StringUtils.split(originalSessionToken, ":"); + ISessionToken sessionToken = SessionTokenHelper.parse(tokenParts[1]); + ISessionToken modifiedSessionToken = ConsistencyTestsBase.createSessionToken(sessionToken, sessionToken.getLSN() + 1000000); + + return tokenParts[0] + ":" + modifiedSessionToken.convertToString(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java new file mode 100644 index 000000000000..d3249ae10940 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowSingleMasterAvailabilityTest.java @@ -0,0 +1,291 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.ReadConsistencyStrategy; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowSingleMasterAvailabilityTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CustomerWorkflowSingleMasterAvailabilityTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-sm-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSingleWriteMultiRegionContainer("Customer single-master workflow tests"); + } + + @AfterClass(groups = {"fi-sm-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void excludedReadableRegionRoutesReadToRemainingReadableRegion() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + List excludedRegions = excludeFirstReadableRegion(); + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setExcludedRegions(excludedRegions) + .setReadConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED); + + // Excluding the preferred readable region forces the read onto the remaining readable region, which may + // lag behind the just-completed write. Retry until cross-region replication catches up before asserting. + CosmosItemResponse readResponse = readWithReplicationRetry(item, readOptions); + + assertThat(readResponse).isNotNull(); + CosmosDiagnosticsContext diagnosticsContext = readResponse.getDiagnostics().getDiagnosticsContext(); + assertThat(readResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(diagnosticsContext.getEffectiveReadConsistencyStrategy()).isEqualTo(ReadConsistencyStrategy.LATEST_COMMITTED); + assertExcludedRegions(diagnosticsContext, excludedRegions); + assertDidNotContactExcludedRegions(diagnosticsContext, excludedRegions); + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void readFaultInPreferredReadableRegionCanUseRemoteReadableRegion() { + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + FaultInjectionRule readSessionNotAvailableRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, + this.readableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnosticsContext diagnosticsContext = readWithDiagnostics(item, readOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(readSessionNotAvailableRule.getHitCount()) + .as("the injected read-session-not-available fault should have been hit in the preferred readable region") + .isGreaterThanOrEqualTo(1); + assertThat(diagnosticsContext.getStatusCode()).isBetween(HttpConstants.StatusCodes.OK, 599); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + if (diagnosticsContext.getStatusCode() < HttpConstants.StatusCodes.BADREQUEST) { + assertThat(diagnosticsContext.getContactedRegionNames()).isNotEmpty(); + } else { + assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND); + assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + } + } finally { + readSessionNotAvailableRule.disable(); + } + } + + @Test(groups = {"fi-sm-customer-workflows"}, timeOut = TIMEOUT) + public void writeFaultStaysOnSingleWritableRegion() { + FaultInjectionRule partitionMigratingRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.CREATE_ITEM, + FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosDiagnosticsContext diagnosticsContext = createWithDiagnostics(TestObject.create(), createOptions); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(partitionMigratingRule.getHitCount()) + .as("the injected write fault should have been hit in the single writable region") + .isGreaterThanOrEqualTo(1); + assertThat(diagnosticsContext.getStatusCode()).isBetween(HttpConstants.StatusCodes.OK, 599); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + + // A single-write account cannot hedge writes to another region, so even with an availability strategy + // configured the write must never be routed to a read-only region. + Set readOnlyRegions = this.readableRegions + .stream() + .map(region -> region.toLowerCase(Locale.ROOT)) + .filter(region -> !region.equals(this.writableRegions.get(0).toLowerCase(Locale.ROOT))) + .collect(Collectors.toSet()); + assertThat(diagnosticsContext.getContactedRegionNames()).doesNotContainAnyElementsOf(readOnlyRegions); + } finally { + partitionMigratingRule.disable(); + } + } + + @DataProvider(name = "singleWriteReadFaultScenarios") + public Object[][] singleWriteReadFaultScenarios() { + return new Object[][]{ + {FaultInjectionServerErrorType.GONE}, + {FaultInjectionServerErrorType.TIMEOUT}, + {FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE}, + {FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {FaultInjectionServerErrorType.SERVICE_UNAVAILABLE} + }; + } + + @Test(groups = {"fi-sm-customer-workflows"}, dataProvider = "singleWriteReadFaultScenarios", timeOut = TIMEOUT) + public void singleWriteReadFaultMatrix(FaultInjectionServerErrorType errorType) { + skipIfFaultTypeUnsupportedOnGateway(errorType, "Customer single-master read fault matrix"); + + TestObject item = TestObject.create(); + this.container.createItem(item).block(); + registerForCleanup(item); + + FaultInjectionRule faultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.READ_ITEM, + errorType, + this.readableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions readOptions = new CosmosItemRequestOptions() + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build()); + + CosmosDiagnosticsContext diagnosticsContext = readWithDiagnostics(item, readOptions); + + assertFaultInjectedOperation(diagnosticsContext, faultRule); + } finally { + faultRule.disable(); + } + } + + @DataProvider(name = "singleWriteMutationFaultScenarios") + public Object[][] singleWriteMutationFaultScenarios() { + return new Object[][]{ + {FaultInjectionServerErrorType.PARTITION_IS_MIGRATING}, + {FaultInjectionServerErrorType.TIMEOUT}, + {FaultInjectionServerErrorType.TOO_MANY_REQUEST}, + {FaultInjectionServerErrorType.RETRY_WITH}, + {FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR}, + {FaultInjectionServerErrorType.SERVICE_UNAVAILABLE} + }; + } + + @Test(groups = {"fi-sm-customer-workflows"}, dataProvider = "singleWriteMutationFaultScenarios", timeOut = TIMEOUT) + public void singleWriteCreateFaultMatrix(FaultInjectionServerErrorType errorType) { + FaultInjectionRule faultRule = configureServerErrorRule( + this.container, + FaultInjectionOperationType.CREATE_ITEM, + errorType, + this.writableRegions.get(0), + currentFaultInjectionConnectionType(), + 1); + + try { + CosmosItemRequestOptions createOptions = new CosmosItemRequestOptions() + .setContentResponseOnWriteEnabled(true) + .setCosmosEndToEndOperationLatencyPolicyConfig( + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build()); + + CosmosDiagnosticsContext diagnosticsContext = createWithDiagnostics(TestObject.create(), createOptions); + + // The availability strategy cannot hedge writes on a single-write account; the assertion below confirms + // the injected write fault was still exercised and produced a real HTTP outcome. + assertFaultInjectedOperation(diagnosticsContext, faultRule); + } finally { + faultRule.disable(); + } + } + + private CosmosDiagnosticsContext readWithDiagnostics(TestObject item, CosmosItemRequestOptions options) { + try { + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } + + private CosmosItemResponse readWithReplicationRetry(TestObject item, CosmosItemRequestOptions options) { + Duration deadline = Duration.ofSeconds(30); + long deadlineNanos = System.nanoTime() + deadline.toNanos(); + CosmosException lastNotFound = null; + + while (System.nanoTime() < deadlineNanos) { + try { + return this.container + .readItem(item.getId(), partitionKey(item), options, TestObject.class) + .block(); + } catch (CosmosException error) { + if (error.getStatusCode() != HttpConstants.StatusCodes.NOTFOUND) { + throw error; + } + // Item not yet replicated to the remaining readable region - wait and retry. + lastNotFound = error; + try { + Thread.sleep(500); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for cross-region replication.", interrupted); + } + } + } + + throw new AssertionError("Item was not replicated to the remaining readable region within " + deadline, lastNotFound); + } + + private CosmosDiagnosticsContext createWithDiagnostics(TestObject item, CosmosItemRequestOptions options) { + try { + CosmosDiagnosticsContext diagnosticsContext = this.container + .createItem(item, new PartitionKey(item.getMypk()), options) + .block() + .getDiagnostics() + .getDiagnosticsContext(); + + registerForCleanup(item); + return diagnosticsContext; + } catch (CosmosException error) { + return error.getDiagnostics().getDiagnosticsContext(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java new file mode 100644 index 000000000000..762ae05b0a72 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowStoredProcedureTest.java @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosStoredProcedureProperties; +import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CustomerWorkflowStoredProcedureTest extends CustomerWorkflowTestBase { + + @Factory(dataProvider = "clientBuildersWithDirectTcpSession") + public CustomerWorkflowStoredProcedureTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"fi-customer-workflows"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + initializeSharedSinglePartitionContainer("Customer stored procedure workflow tests"); + } + + @AfterClass(groups = {"fi-customer-workflows"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + closeClient(); + } + + @Test(groups = {"fi-customer-workflows"}, timeOut = TIMEOUT) + public void storedProcedureCreateReadExecuteWithMetadataFaultRule() { + String storedProcedureId = "customer-sproc-" + UUID.randomUUID(); + CosmosStoredProcedureProperties storedProcedureProperties = new CosmosStoredProcedureProperties( + storedProcedureId, + "function(input) {" + + " var value = input || 'workflow';" + + " console.log('stored procedure workflow ' + value);" + + " getContext().getResponse().setBody('sproc-ok:' + value);" + + "}"); + + CosmosStoredProcedureResponse createResponse = this.container + .getScripts() + .createStoredProcedure(storedProcedureProperties) + .block(); + + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + assertThat(createResponse.getDiagnostics()).isNotNull(); + + FaultInjectionRule metadataDelayRule = configureResponseDelayRule( + this.container, + FaultInjectionOperationType.METADATA_REQUEST_CONTAINER, + Duration.ofMillis(100), + 1); + + try { + CosmosStoredProcedureRequestOptions options = new CosmosStoredProcedureRequestOptions(); + options.setPartitionKey(new PartitionKey("sproc-workflow")); + options.setScriptLoggingEnabled(true); + + CosmosStoredProcedureResponse readResponse = withStoredProcedureReplicationRetry(() -> this.container + .getScripts() + .getStoredProcedure(storedProcedureId) + .read() + .block()); + + assertThat(readResponse).isNotNull(); + assertThat(readResponse.getProperties().getId()).isEqualTo(storedProcedureId); + + CosmosStoredProcedureResponse executeResponse = withStoredProcedureReplicationRetry(() -> this.container + .getScripts() + .getStoredProcedure(storedProcedureId) + .execute(Collections.singletonList("workflow"), options) + .block()); + + assertThat(executeResponse).isNotNull(); + assertThat(executeResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + assertThat(executeResponse.getResponseAsString()).contains("sproc-ok:workflow"); + assertThat(executeResponse.getScriptLog()).contains("stored procedure workflow workflow"); + assertThat(executeResponse.getDiagnostics()).isNotNull(); + } finally { + metadataDelayRule.disable(); + try { + this.container.getScripts().getStoredProcedure(storedProcedureId).delete().block(); + } catch (Exception error) { + // best-effort cleanup of the stored procedure created by this test + } + } + } + + /** + * Retries a stored-procedure operation while it returns 404. A stored procedure that was just created can + * be temporarily not found when the request is routed to a region the metadata has not yet replicated to + * (possible on a multi-write account, where stored-procedure metadata is not covered by session + * read-your-write the way document operations are). + */ + private CosmosStoredProcedureResponse withStoredProcedureReplicationRetry(Supplier operation) { + Duration deadline = Duration.ofSeconds(30); + long deadlineNanos = System.nanoTime() + deadline.toNanos(); + CosmosException lastNotFound = null; + + while (System.nanoTime() < deadlineNanos) { + try { + return operation.get(); + } catch (CosmosException error) { + if (error.getStatusCode() != HttpConstants.StatusCodes.NOTFOUND) { + throw error; + } + lastNotFound = error; + try { + Thread.sleep(500); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for stored procedure replication.", interrupted); + } + } + } + + throw new AssertionError("Stored procedure was not available to read within " + deadline, lastNotFound); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java new file mode 100644 index 000000000000..e8d57ca887e2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/workflows/customer/CustomerWorkflowTestBase.java @@ -0,0 +1,479 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.workflows.customer; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.ConnectionMode; +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.DatabaseAccountLocation; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.OverridableRequestOptions; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; +import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; +import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import com.azure.cosmos.test.faultinjection.IFaultInjectionResult; +import org.testng.SkipException; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.UUID; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class CustomerWorkflowTestBase extends TestSuiteBase { + protected CosmosAsyncClient client; + protected CosmosAsyncContainer container; + protected List writableRegions; + protected List readableRegions; + private final List itemsToCleanup = Collections.synchronizedList(new ArrayList<>()); + + protected CustomerWorkflowTestBase(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + protected final void initializeSharedSinglePartitionContainer(String scenarioName) { + initializeSharedSinglePartitionContainer(scenarioName, false); + } + + protected final void initializeSharedSinglePartitionContainer(String scenarioName, boolean forceSessionConsistency) { + if (forceSessionConsistency) { + skipIfAccountConsistencyWeakerThanSession(scenarioName); + } + + CosmosAsyncClient discoveryClient = null; + + try { + discoveryClient = getClientBuilder().buildAsyncClient(); + this.writableRegions = discoverWritableRegions(discoveryClient); + skipIfInsufficientRegions(this.writableRegions, scenarioName); + + CosmosClientBuilder clientBuilder = getClientBuilder() + .preferredRegions(this.writableRegions) + .multipleWriteRegionsEnabled(true) + .contentResponseOnWriteEnabled(true); + + if (forceSessionConsistency) { + // Read-your-write across an excluded write region is only deterministic with session (or + // stronger) consistency, so pin the client to session consistency for these scenarios. + clientBuilder.consistencyLevel(ConsistencyLevel.SESSION); + } + + this.client = clientBuilder.buildAsyncClient(); + this.container = getSharedSinglePartitionCosmosContainer(this.client); + waitForCollectionToBeAvailableToRead(this.container, /* probeClient */ null); + } finally { + safeClose(discoveryClient); + } + } + + protected final void closeClient() { + cleanupRegisteredItems(); + safeClose(this.client); + this.client = null; + this.container = null; + this.writableRegions = null; + this.readableRegions = null; + } + + protected final void initializeSharedSingleWriteMultiRegionContainer(String scenarioName) { + CosmosAsyncClient discoveryClient = null; + + try { + discoveryClient = getClientBuilder() + .multipleWriteRegionsEnabled(false) + .contentResponseOnWriteEnabled(true) + .buildAsyncClient(); + this.writableRegions = discoverWritableRegions(discoveryClient); + this.readableRegions = discoverReadableRegions(discoveryClient); + skipIfInsufficientReadableRegions(this.readableRegions, scenarioName); + skipIfNotSingleWriteRegion(this.writableRegions, scenarioName); + + this.client = getClientBuilder() + .preferredRegions(this.readableRegions) + .multipleWriteRegionsEnabled(false) + .contentResponseOnWriteEnabled(true) + .buildAsyncClient(); + this.container = getSharedSinglePartitionCosmosContainer(this.client); + waitForCollectionToBeAvailableToRead(this.container, /* probeClient */ null); + } finally { + safeClose(discoveryClient); + } + } + + /** + * Registers an item to be best-effort deleted from the shared container when the test class finishes, + * so the shared single-partition container does not accumulate items across runs. + */ + protected final void registerForCleanup(TestObject item) { + if (item != null) { + this.itemsToCleanup.add(new CosmosItemIdentity(partitionKey(item), item.getId())); + } + } + + private void cleanupRegisteredItems() { + CosmosAsyncContainer cleanupContainer = this.container; + List snapshot; + synchronized (this.itemsToCleanup) { + snapshot = new ArrayList<>(this.itemsToCleanup); + this.itemsToCleanup.clear(); + } + + if (cleanupContainer == null) { + return; + } + + for (CosmosItemIdentity identity : snapshot) { + try { + cleanupContainer + .deleteItem(identity.getId(), identity.getPartitionKey(), new CosmosItemRequestOptions()) + .block(); + } catch (Exception error) { + // best-effort cleanup - ignore (for example item already deleted by the test itself) + } + } + } + + protected final List excludeFirstWritableRegion() { + return Collections.singletonList(this.writableRegions.get(0)); + } + + protected final List excludeFirstReadableRegion() { + return Collections.singletonList(this.readableRegions.get(0)); + } + + protected static com.azure.cosmos.models.PartitionKey partitionKey(TestObject item) { + return new com.azure.cosmos.models.PartitionKey(item.getMypk()); + } + + protected final CosmosAsyncContainer createTemporaryContainer(String prefix, String partitionKeyPath) { + CosmosAsyncDatabase database = getSharedCosmosDatabase(this.client); + String containerId = prefix + "-" + UUID.randomUUID(); + + database + .createContainerIfNotExists(containerId, partitionKeyPath, ThroughputProperties.createManualThroughput(400)) + .block(); + + return database.getContainer(containerId); + } + + protected static void deleteTemporaryContainer(CosmosAsyncContainer container) { + safeDeleteCollection(container); + } + + protected static void awaitCondition(BooleanSupplier condition, Duration timeout, String failureMessage) { + long deadline = System.nanoTime() + timeout.toNanos(); + + while (System.nanoTime() < deadline) { + if (condition.getAsBoolean()) { + return; + } + + try { + Thread.sleep(250); + } catch (InterruptedException error) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for condition: " + failureMessage, error); + } + } + + throw new AssertionError(failureMessage); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + int hitLimit) { + + return configureServerErrorRule(targetContainer, operationType, errorType, this.writableRegions.get(0), hitLimit); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + String region, + int hitLimit) { + + return configureServerErrorRule(targetContainer, operationType, errorType, region, currentFaultInjectionConnectionType(), hitLimit); + } + + protected final FaultInjectionRule configureServerErrorRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + FaultInjectionServerErrorType errorType, + String region, + FaultInjectionConnectionType connectionType, + int hitLimit) { + + FaultInjectionConditionBuilder conditionBuilder = new FaultInjectionConditionBuilder() + .operationType(operationType) + .connectionType(connectionType); + + if (region != null) { + conditionBuilder.region(region); + } + + FaultInjectionRule rule = new FaultInjectionRuleBuilder("customer-workflow-" + errorType + "-" + UUID.randomUUID()) + .condition(conditionBuilder.build()) + .result(FaultInjectionResultBuilders.getResultBuilder(errorType).build()) + .duration(Duration.ofMinutes(5)) + .hitLimit(hitLimit) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(targetContainer, Collections.singletonList(rule)).block(); + return rule; + } + + protected final FaultInjectionConnectionType currentFaultInjectionConnectionType() { + if (getConnectionPolicy().getConnectionMode() == ConnectionMode.GATEWAY) { + return FaultInjectionConnectionType.GATEWAY; + } + + return FaultInjectionConnectionType.DIRECT; + } + + protected final FaultInjectionRule configureResponseDelayRule( + CosmosAsyncContainer targetContainer, + FaultInjectionOperationType operationType, + Duration delay, + int hitLimit) { + + FaultInjectionCondition condition = new FaultInjectionConditionBuilder() + .operationType(operationType) + .connectionType(currentFaultInjectionConnectionType()) + .build(); + + IFaultInjectionResult result = FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) + .delay(delay) + .times(hitLimit) + .build(); + + FaultInjectionRule rule = new FaultInjectionRuleBuilder("customer-workflow-response-delay-" + UUID.randomUUID()) + .condition(condition) + .result(result) + .duration(Duration.ofMinutes(5)) + .hitLimit(hitLimit) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(targetContainer, Collections.singletonList(rule)).block(); + return rule; + } + + protected static List discoverWritableRegions(CosmosAsyncClient client) { + DatabaseAccount databaseAccount = readDatabaseAccount(client); + + List writableRegions = new ArrayList<>(); + for (DatabaseAccountLocation accountLocation : databaseAccount.getWritableLocations()) { + writableRegions.add(accountLocation.getName()); + } + + return writableRegions; + } + + protected static List discoverReadableRegions(CosmosAsyncClient client) { + DatabaseAccount databaseAccount = readDatabaseAccount(client); + + List readableRegions = new ArrayList<>(); + for (DatabaseAccountLocation accountLocation : databaseAccount.getReadableLocations()) { + readableRegions.add(accountLocation.getName()); + } + + return readableRegions; + } + + private static DatabaseAccount readDatabaseAccount(CosmosAsyncClient client) { + AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(client); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) asyncDocumentClient; + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + + // The latest database account is populated during client initialization. Poll briefly to defend against + // an initialization race instead of forcing a synthetic database-account read (which is not routable in + // direct connection mode). + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + long deadlineNanos = System.nanoTime() + Duration.ofSeconds(10).toNanos(); + while (databaseAccount == null && System.nanoTime() < deadlineNanos) { + try { + Thread.sleep(200); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for the database account to be available.", interrupted); + } + databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + } + + assertThat(databaseAccount) + .as("database account must be available for region discovery") + .isNotNull(); + + return databaseAccount; + } + + protected static void skipIfInsufficientRegions(List regions, String scenarioName) { + if (regions == null || regions.size() < 2) { + throw new SkipException(scenarioName + " requires a live multi-region account."); + } + } + + protected static void skipIfInsufficientReadableRegions(List regions, String scenarioName) { + if (regions == null || regions.size() < 2) { + throw new SkipException(scenarioName + " requires a live multi-region single-write account."); + } + } + + protected static void skipIfNotSingleWriteRegion(List regions, String scenarioName) { + if (regions == null || regions.size() != 1) { + throw new SkipException(scenarioName + " requires exactly one write region."); + } + } + + protected static void skipIfAccountConsistencyWeakerThanSession(String scenarioName) { + if (accountConsistency == ConsistencyLevel.EVENTUAL || accountConsistency == ConsistencyLevel.CONSISTENT_PREFIX) { + throw new SkipException( + scenarioName + " requires an account with session or stronger default consistency for deterministic read-your-write."); + } + } + + protected final void skipIfNotDirectMode(String scenarioName) { + if (getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) { + throw new SkipException(scenarioName + " only applies to the direct connection mode client builder."); + } + } + + protected final void skipIfNotGatewayMode(String scenarioName) { + if (getConnectionPolicy().getConnectionMode() != ConnectionMode.GATEWAY) { + throw new SkipException(scenarioName + " only applies to the gateway connection mode client builder."); + } + } + + /** + * Skips fault-injection scenarios that cannot be injected for the gateway connection type. The gateway + * internally retries 410/0, so {@code GONE} and {@code STALED_ADDRESSES_SERVER_GONE} rules are rejected at + * configuration time for gateway-mode clients. + */ + protected final void skipIfFaultTypeUnsupportedOnGateway(FaultInjectionServerErrorType errorType, String scenarioName) { + if (currentFaultInjectionConnectionType() == FaultInjectionConnectionType.GATEWAY + && (errorType == FaultInjectionServerErrorType.GONE + || errorType == FaultInjectionServerErrorType.STALED_ADDRESSES_SERVER_GONE)) { + + throw new SkipException( + scenarioName + " cannot inject " + errorType + " for the gateway connection type."); + } + } + + /** + * Configures the same server-error fault for both the point-read ({@code READ_ITEM}) and query + * ({@code QUERY_ITEM}) operation types. {@code readMany} resolves to a point read for a single item in a + * partition and to a query for multiple items, so both rules are needed for the fault to reliably apply. + */ + protected final List configureReadManyServerErrorRules( + CosmosAsyncContainer targetContainer, + FaultInjectionServerErrorType errorType, + String region, + int hitLimit) { + + List rules = new ArrayList<>(); + rules.add(configureServerErrorRule( + targetContainer, FaultInjectionOperationType.READ_ITEM, errorType, region, currentFaultInjectionConnectionType(), hitLimit)); + rules.add(configureServerErrorRule( + targetContainer, FaultInjectionOperationType.QUERY_ITEM, errorType, region, currentFaultInjectionConnectionType(), hitLimit)); + return rules; + } + + /** + * Asserts that a fault-injected operation produced a real HTTP outcome and that at least one of the supplied + * fault rules was actually hit, so the scenario cannot silently pass without exercising the injected fault. + */ + protected static void assertFaultInjectedOperation( + CosmosDiagnosticsContext diagnosticsContext, + FaultInjectionRule... rules) { + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isBetween(HttpConstants.StatusCodes.OK, 599); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + + long totalHits = 0; + for (FaultInjectionRule rule : rules) { + totalHits += rule.getHitCount(); + } + + assertThat(totalHits) + .as("expected at least one injected fault to be hit") + .isGreaterThanOrEqualTo(1); + } + + protected static void assertFaultInjectedOperation( + CosmosDiagnosticsContext diagnosticsContext, + List rules) { + + assertFaultInjectedOperation(diagnosticsContext, rules.toArray(new FaultInjectionRule[0])); + } + + protected static OverridableRequestOptions getRequestOptions(CosmosDiagnosticsContext diagnosticsContext) { + assertThat(diagnosticsContext).isNotNull(); + return ImplementationBridgeHelpers + .CosmosDiagnosticsContextHelper + .getCosmosDiagnosticsContextAccessor() + .getRequestOptions(diagnosticsContext); + } + + protected static void assertKeywordIdentifier(CosmosDiagnosticsContext diagnosticsContext, String expectedKeywordIdentifier) { + OverridableRequestOptions requestOptions = getRequestOptions(diagnosticsContext); + + assertThat(requestOptions.getKeywordIdentifiers()) + .contains(expectedKeywordIdentifier); + } + + protected static void assertExcludedRegions( + CosmosDiagnosticsContext diagnosticsContext, + List expectedExcludedRegions) { + + OverridableRequestOptions requestOptions = getRequestOptions(diagnosticsContext); + + assertThat(requestOptions.getExcludedRegions()) + .containsExactlyElementsOf(expectedExcludedRegions); + } + + protected static void assertDidNotContactExcludedRegions( + CosmosDiagnosticsContext diagnosticsContext, + Collection excludedRegions) { + + Set contactedRegionNames = diagnosticsContext.getContactedRegionNames(); + Set normalizedExcludedRegions = excludedRegions + .stream() + .map(region -> region.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()); + + assertThat(contactedRegionNames).isNotNull(); + assertThat(contactedRegionNames).doesNotContainAnyElementsOf(normalizedExcludedRegions); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml new file mode 100644 index 000000000000..edfa8a57770f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-customer-workflows-testng.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml new file mode 100644 index 000000000000..976b8fbdc204 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/fi-sm-customer-workflows-testng.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdk/cosmos/live-fi-customer-workflows-platform-matrix.json b/sdk/cosmos/live-fi-customer-workflows-platform-matrix.json new file mode 100644 index 000000000000..c3b1e2b841fe --- /dev/null +++ b/sdk/cosmos/live-fi-customer-workflows-platform-matrix.json @@ -0,0 +1,42 @@ +{ + "displayNames": { + "-Pfi-customer-workflows": "FaultInjectionCustomerWorkflows", + "Session": "", + "ubuntu": "", + "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }": "" + }, + "include": [ + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "MultiMaster_MultiRegion_FI_CustomerWorkflows": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[\"East US 2\"]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pfi-customer-workflows" ], + "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"", + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + }, + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "MultiMaster_MultiRegion_FI_CustomerWorkflows_ThinClient_Http2": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pfi-customer-workflows" ], + "AdditionalArgs": "-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true", + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + } + ] +} diff --git a/sdk/cosmos/live-fi-sm-customer-workflows-platform-matrix.json b/sdk/cosmos/live-fi-sm-customer-workflows-platform-matrix.json new file mode 100644 index 000000000000..cd56d0a830a5 --- /dev/null +++ b/sdk/cosmos/live-fi-sm-customer-workflows-platform-matrix.json @@ -0,0 +1,41 @@ +{ + "displayNames": { + "-Pfi-sm-customer-workflows": "FaultInjectionSingleMasterCustomerWorkflows", + "Session": "", + "ubuntu": "", + "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }": "" + }, + "include": [ + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "SingleMaster_MultiRegion_FI_CustomerWorkflows": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[\"East US 2\"]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pfi-sm-customer-workflows" ], + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + }, + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "SingleMaster_MultiRegion_FI_CustomerWorkflows_ThinClient_Http2": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pfi-sm-customer-workflows" ], + "AdditionalArgs": "-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-region-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-region-session-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true", + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + } + ] +} diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index 0eecf42166ea..a03d1638ab89 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -233,6 +233,70 @@ extends: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(gsi-pipeline-uri) -DACCOUNT_KEY=$(gsi-pipeline-key)' + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml + parameters: + TestName: 'Cosmos_Live_Test_FaultInjectionCustomerWorkflows' + CloudConfig: + Public: + ServiceConnection: azure-sdk-tests-cosmos + MatrixConfigs: + - Name: Cosmos_live_test_fi_customer_workflows + Path: sdk/cosmos/live-fi-customer-workflows-platform-matrix.json + Selection: all + GenerateVMJobs: true + MatrixReplace: + - .*Version=1.2(1|5)/1.17 + ServiceDirectory: cosmos + Artifacts: + - name: azure-cosmos + groupId: com.azure + safeName: azurecosmos + AdditionalModules: + - name: azure-cosmos-tests + groupId: com.azure + - name: azure-cosmos-benchmark + groupId: com.azure + TimeoutInMinutes: 210 + MaxParallel: 20 + TestGoals: 'verify' + TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false' + TestResultsFiles: '**/junitreports/TEST-*.xml' + AdditionalVariables: + - name: AdditionalArgs + value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true' + + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml + parameters: + TestName: 'Cosmos_Live_Test_FaultInjectionSingleMasterCustomerWorkflows' + CloudConfig: + Public: + ServiceConnection: azure-sdk-tests-cosmos + MatrixConfigs: + - Name: Cosmos_live_test_fi_sm_customer_workflows + Path: sdk/cosmos/live-fi-sm-customer-workflows-platform-matrix.json + Selection: all + GenerateVMJobs: true + MatrixReplace: + - .*Version=1.2(1|5)/1.17 + ServiceDirectory: cosmos + Artifacts: + - name: azure-cosmos + groupId: com.azure + safeName: azurecosmos + AdditionalModules: + - name: azure-cosmos-tests + groupId: com.azure + - name: azure-cosmos-benchmark + groupId: com.azure + TimeoutInMinutes: 210 + MaxParallel: 20 + TestGoals: 'verify' + TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false' + TestResultsFiles: '**/junitreports/TEST-*.xml' + AdditionalVariables: + - name: AdditionalArgs + value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true' + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: TestName: 'Spring_Data_Cosmos_Integration'