Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,36 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}

@Test(groups = "unit")
public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));

ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,35 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}

@Test(groups = "unit")
public void getThroughputControlConfigForFeedRange_withThroughputBucket() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.82.0-beta.1 (Unreleased)

#### Features Added
* Added throughput bucket support for Change Feed Processor feed polling throughput control. - See [Issue 49487](https://github.com/Azure/azure-sdk-for-java/issues/49487)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -34,6 +35,7 @@ public class FeedRangeThroughputControlConfigManager {
private final AtomicReference<List<FeedRangeEpkImpl>> leaseTokens; // epk leases
private final Map<PartitionKeyRange, List<FeedRange>> pkRangeToFeedRangeMap;
private final Map<FeedRange, ThroughputControlGroupConfig> feedRangeToThroughputControlGroupConfigMap;
private final AtomicBoolean throughputControlGroupEnabled;

public FeedRangeThroughputControlConfigManager(
ThroughputControlGroupConfig throughputControlGroupConfig,
Expand All @@ -47,6 +49,7 @@ public FeedRangeThroughputControlConfigManager(
this.leaseTokens = new AtomicReference<>();
this.pkRangeToFeedRangeMap = new ConcurrentHashMap<>();
this.feedRangeToThroughputControlGroupConfigMap = new ConcurrentHashMap<>();
this.throughputControlGroupEnabled = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -88,6 +91,14 @@ public Mono<Void> refresh(List<Lease> leases) {

public Mono<ThroughputControlGroupConfig> getOrCreateThroughputControlConfigForFeedRange(FeedRangeEpkImpl feedRange) {
checkNotNull(feedRange, "Argument 'feedRange' can not be null");

if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig);
}
return Mono.just(this.throughputControlGroupConfig);
}

ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange =
this.feedRangeToThroughputControlGroupConfigMap.get(feedRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR
// Note: if global throughput control be added in future, then we will need to create one group per pkRange

if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig);
} else {
this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
}
}

return this.throughputControlGroupConfig;
Expand Down
Loading