From c1954035f2f73d95ece8467625bfc331817a30b2 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 01:34:02 +0530 Subject: [PATCH] Add throughput bucket support for Change Feed Processor feed polling --- ...geThroughputControlConfigManagerTests.java | 32 +++++++++++++++++++ ...geThroughputControlConfigManagerTests.java | 31 ++++++++++++++++++ sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + ...edRangeThroughputControlConfigManager.java | 11 +++++++ ...edRangeThroughputControlConfigManager.java | 6 +++- 5 files changed, 80 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java index 566ff7043336..1cc753e834a6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -66,4 +66,36 @@ public void getThroughputControlConfigForFeedRange() { assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size()); assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); } + + @Test(groups = "unit") + public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false)); + + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block(); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java index 115d060e0d06..4331a7f074c7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -48,4 +48,35 @@ public void getThroughputControlConfigForFeedRange() { assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()); assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); } + + @Test(groups = "unit") + public void getThroughputControlConfigForFeedRange_withThroughputBucket() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1"); + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index eee4f87437f9..d7ed7c883c6b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.82.0-beta.1 (Unreleased) #### Features Added +* Added throughput bucket support for Change Feed Processor feed polling throughput control. - See [Issue 49487](https://github.com/Azure/azure-sdk-for-java/issues/49487) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java index 636f99ba6581..8725e7a5516c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ public class FeedRangeThroughputControlConfigManager { private final AtomicReference> leaseTokens; // epk leases private final Map> pkRangeToFeedRangeMap; private final Map feedRangeToThroughputControlGroupConfigMap; + private final AtomicBoolean throughputControlGroupEnabled; public FeedRangeThroughputControlConfigManager( ThroughputControlGroupConfig throughputControlGroupConfig, @@ -47,6 +49,7 @@ public FeedRangeThroughputControlConfigManager( this.leaseTokens = new AtomicReference<>(); this.pkRangeToFeedRangeMap = new ConcurrentHashMap<>(); this.feedRangeToThroughputControlGroupConfigMap = new ConcurrentHashMap<>(); + this.throughputControlGroupEnabled = new AtomicBoolean(false); } /** @@ -88,6 +91,14 @@ public Mono refresh(List leases) { public Mono getOrCreateThroughputControlConfigForFeedRange(FeedRangeEpkImpl feedRange) { checkNotNull(feedRange, "Argument 'feedRange' can not be null"); + + if (this.throughputControlGroupConfig.getThroughputBucket() != null) { + if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { + this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } + return Mono.just(this.throughputControlGroupConfig); + } + ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange = this.feedRangeToThroughputControlGroupConfigMap.get(feedRange); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java index f0ebbe1db4af..1d3120c39f6e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java @@ -41,7 +41,11 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR // Note: if global throughput control be added in future, then we will need to create one group per pkRange if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { - this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig); + if (this.throughputControlGroupConfig.getThroughputBucket() != null) { + this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } else { + this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig); + } } return this.throughputControlGroupConfig;