Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
50c10b1
Adding more scenario-based tests
FabianMeiswinkel Jun 19, 2026
716e229
Potential fix for pull request finding
FabianMeiswinkel Jun 19, 2026
e68f653
Update live-platform-matrix.json
FabianMeiswinkel Jun 19, 2026
74ade95
Merge branch 'users/fabianm/test' of https://github.com/FabianMeiswin…
FabianMeiswinkel Jun 19, 2026
0b236f1
Addressing code review feedback
FabianMeiswinkel Jun 19, 2026
e3f3f23
Merge branch 'main' into users/fabianm/test
FabianMeiswinkel Jun 19, 2026
e01dbdb
Potential fix for pull request finding
FabianMeiswinkel Jun 19, 2026
837f3da
Register created items for cleanup in customer workflow create paths
FabianMeiswinkel Jun 19, 2026
fb9e0db
Update CustomerWorkflowHighE2ETimeoutTest.java
FabianMeiswinkel Jun 19, 2026
a37f517
Delete CUSTOMER_WORKFLOW_COVERAGE_MAP.md
FabianMeiswinkel Jun 19, 2026
9f6c651
Refactoring customer tests into separate pipelines
FabianMeiswinkel Jun 19, 2026
b946a65
Fix single master tests
FabianMeiswinkel Jun 19, 2026
c422d86
Update CustomerWorkflowChangeFeedProcessorTest.java
FabianMeiswinkel Jun 19, 2026
3ff626d
Update CustomerWorkflowTestBase.java
FabianMeiswinkel Jun 19, 2026
fd17a99
Merge branch 'main' into users/fabianm/test
FabianMeiswinkel Jun 19, 2026
81f1de9
Fixing test flakiness due to async container creation
FabianMeiswinkel Jun 22, 2026
f1d585e
Merge branch 'users/fabianm/test' of https://github.com/FabianMeiswin…
FabianMeiswinkel Jun 22, 2026
0bf70bd
Merge branch 'main' into users/fabianm/test
FabianMeiswinkel Jun 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,60 @@ Licensed under the MIT License.
</plugins>
</build>
</profile>
<profile>
<!-- customer workflow integration tests, requires Cosmos DB endpoint with multi master support -->
<id>fi-customer-workflows</id>
<properties>
<test.groups>fi-customer-workflows</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/fi-customer-workflows-testng.xml</suiteXmlFile>
</suiteXmlFiles>
<systemPropertyVariables>
<COSMOS.CLIENT_LEAK_DETECTION_ENABLED>true</COSMOS.CLIENT_LEAK_DETECTION_ENABLED>
<io.netty.leakDetection.samplingInterval>1</io.netty.leakDetection.samplingInterval>
<io.netty.leakDetection.targetRecords>256</io.netty.leakDetection.targetRecords>
<io.netty.leakDetection.level>paranoid</io.netty.leakDetection.level>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- customer workflow integration tests, requires Cosmos DB endpoint with single master multi region support -->
<id>fi-sm-customer-workflows</id>
<properties>
<test.groups>fi-sm-customer-workflows</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/fi-sm-customer-workflows-testng.xml</suiteXmlFile>
</suiteXmlFiles>
<systemPropertyVariables>
<COSMOS.CLIENT_LEAK_DETECTION_ENABLED>true</COSMOS.CLIENT_LEAK_DETECTION_ENABLED>
<io.netty.leakDetection.samplingInterval>1</io.netty.leakDetection.samplingInterval>
<io.netty.leakDetection.targetRecords>256</io.netty.leakDetection.targetRecords>
<io.netty.leakDetection.level>paranoid</io.netty.leakDetection.level>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- integration tests, requires Cosmos DB endpoint with multi region support -->
<id>multi-region</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void before_NonStreamingOrderByQueryVectorSearchTest() {
database.createContainer(containerProperties).block();
largeDataContainer = database.getContainer(largeDataContainerId);

waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(largeDataContainer);

for (Document doc : getVectorDocs()) {
flatIndexContainer.createItem(doc).block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public void before_OrderbyDocumentQueryTest() throws Exception {
}))
.block();
roundTripsContainer = createdDatabase.getContainer(containerName);
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(roundTripsContainer);
setupRoundTripContainer();

List<Map<String, Object>> keyValuePropsList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void orderByQueryForLargeCollection() {
).block();

CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId());
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(container);

int partitionDocCount = 5;
int pageSize = partitionDocCount + 1;
Expand Down Expand Up @@ -382,7 +382,7 @@ public void splitQueryContinuationToken() throws Exception {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk");
CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block();
CosmosAsyncContainer container = createdDatabase.getContainer(containerId);
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(container);
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client);

//Insert some documents
Expand Down Expand Up @@ -494,7 +494,7 @@ public void orderbyContinuationOnUndefinedAndNull() throws Exception {
createdDatabase.createContainer(containerProperties, new CosmosContainerRequestOptions()).block();

CosmosAsyncContainer container = createdDatabase.getContainer(containerProperties.getId());
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(container);
CosmosContainerResponse containerResponse = container.read().block();
assert (containerResponse != null);
CosmosContainerProperties properties = containerResponse.getProperties();
Expand Down Expand Up @@ -582,7 +582,7 @@ public void queryLargePartitionKeyOn100BPKCollection() throws Exception {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/id");
CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block();
CosmosAsyncContainer container = createdDatabase.getContainer(containerId);
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(container);
//id as partitionkey > 100bytes
String itemID1 = "cosmosdb" +
"-drWarm4Z60GkknMfHLo5BwuiH7w6AffzSb9jKbvwAQwaRZd10oxnLeCueuyZ5gbm9dwVVAqJLdzrB38Dk73Q6xMErv-0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public CosmosAsyncContainer createCollections(CosmosAsyncDatabase database) {
partitionKeyDef.setPaths(paths);
CosmosContainerProperties containerProperties = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef);
database.createContainer(containerProperties, new CosmosContainerRequestOptions()).block();
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(database.getContainer(containerProperties.getId()));
return database.getContainer(containerProperties.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.DatabaseAccount;
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.FeedResponseListValidator;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.InternalObjectNode;
Expand All @@ -43,6 +46,7 @@
import com.azure.cosmos.implementation.QueryFeedOperationState;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.ResourceResponse;
import com.azure.cosmos.implementation.ResourceResponseValidator;
import com.azure.cosmos.implementation.TestConfigurations;
Expand All @@ -51,6 +55,7 @@
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.guava25.base.CaseFormat;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
Expand Down Expand Up @@ -302,7 +307,7 @@ public CosmosAsyncDatabase getDatabase(String id) {

@BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator",
"emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT)
"circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT)
public void beforeSuite() {

logger.info("beforeSuite Started");
Expand Down Expand Up @@ -353,7 +358,7 @@ private static DocumentCollection getInternalDocumentCollection(CosmosAsyncConta

@AfterSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master",
"emulator", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
"circuit-breaker-read-all-read-many", "fi-multi-master", "fi-customer-workflows", "fi-sm-customer-workflows", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
public void afterSuite() {

logger.info("afterSuite Started");
Expand Down Expand Up @@ -566,24 +571,143 @@ public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database
.getCosmosAsyncClientAccessor()
.getPreferredRegions(client).size() > 1;
if (throughput > 6000 || isMultiRegional) {
waitForCollectionToBeAvailableToRead();
waitForCollectionToBeAvailableToRead(database.getContainer(cosmosContainerProperties.getId()));
}

return database.getContainer(cosmosContainerProperties.getId());
}

protected static void waitForCollectionToBeAvailableToRead() {
// Creating a container is an async task - especially with multiple regions it can
// take some time until the container is available in the remote regions as well.
// When the container does not exist yet, metadata reads or item operations can
// fail with 404/1013 "Collection is not yet available for read".
// So, adding this delay after container creation to minimize risk of hitting these errors.
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
protected static void waitForCollectionToBeAvailableToRead(CosmosAsyncContainer container) {
// Creating a container is asynchronous - especially on multi-region accounts the new collection can
// take time to become readable in the non-write regions. Until then, reads routed to those regions fail
// with 404/1013 ("Collection is not yet available for read"). Instead of a fixed sleep, verify - against
// every non-primary region of the account - that the collection is readable, probing each region (by
// excluding all other regions) with exponential back-off until it succeeds, bounded to two minutes total.
CosmosAsyncClient client = ImplementationBridgeHelpers
.CosmosAsyncDatabaseHelper
.getCosmosAsyncDatabaseAccessor()
.getCosmosAsyncClient(container.getDatabase());

DatabaseAccount databaseAccount = getLatestDatabaseAccount(client);

// Use the account's regions (not the client's preferred regions, which may be a subset).
List<String> allRegions = new ArrayList<>();
for (DatabaseAccountLocation location : databaseAccount.getReadableLocations()) {
allRegions.add(location.getName());
}

// The primary region is the first writable location; propagation lag manifests in the other regions.
String primaryRegion = null;
for (DatabaseAccountLocation location : databaseAccount.getWritableLocations()) {
primaryRegion = location.getName();
break;
}
final String primary = primaryRegion;

List<String> nonPrimaryRegions = allRegions
.stream()
.filter(region -> primary == null || !region.equalsIgnoreCase(primary))
.collect(Collectors.toList());

Duration maxWait = Duration.ofMinutes(2);
long deadlineNanos = System.nanoTime() + maxWait.toNanos();

if (nonPrimaryRegions.isEmpty()) {
// Single-region account: there is no non-primary region to verify, but the collection still needs
// to be readable (for example while physical partitions are provisioned).
awaitContainerReadableInRegion(container, null, Collections.emptyList(), deadlineNanos, maxWait);
return;
}

// Verify the collection is readable in each non-primary region.
for (String region : nonPrimaryRegions) {
final String target = region;
List<String> excludedRegions = allRegions
.stream()
.filter(other -> !other.equalsIgnoreCase(target))
.collect(Collectors.toList());
awaitContainerReadableInRegion(container, region, excludedRegions, deadlineNanos, maxWait);
}
}

private static void awaitContainerReadableInRegion(
CosmosAsyncContainer container,
String targetRegion,
List<String> excludedRegions,
long deadlineNanos,
Duration maxWait) {

long backoffMillis = 100;
long maxBackoffMillis = 5000;
int attempts = 0;
Throwable lastError = null;

while (true) {
attempts++;
try {
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
if (!excludedRegions.isEmpty()) {
options.setExcludedRegions(excludedRegions);
}
// A successful (possibly empty) page proves the collection is resolvable/readable in the
// targeted region.
container.queryItems("SELECT TOP 1 c.id FROM c", options, Object.class)
.byPage(1)
.blockFirst();
return;
} catch (Exception error) {
lastError = error;
}

long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos <= 0) {
break;
}

long sleepMillis = Math.max(1, Math.min(backoffMillis, TimeUnit.NANOSECONDS.toMillis(remainingNanos)));
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for collection to be available to read.", interrupted);
}
backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis);
}

throw new AssertionError(
String.format(
"Container '%s' was not available to read%s within %d seconds (%d attempts).",
container.getId(),
targetRegion != null ? " in region '" + targetRegion + "'" : "",
maxWait.getSeconds(),
attempts),
lastError);
}

private static DatabaseAccount getLatestDatabaseAccount(CosmosAsyncClient client) {
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(client);
GlobalEndpointManager globalEndpointManager =
ReflectionUtils.getGlobalEndpointManager((RxDocumentClientImpl) asyncDocumentClient);

// The latest database account is populated during client initialization; poll briefly to defend against
// an initialization race.
DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
long deadlineNanos = System.nanoTime() + Duration.ofSeconds(10).toNanos();
while (databaseAccount == null && System.nanoTime() < deadlineNanos) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while resolving the database account.", interrupted);
}
databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
}

if (databaseAccount == null) {
throw new AssertionError("Database account was not available to determine the account's regions.");
}

return databaseAccount;
}

public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties,
Expand Down
Loading