Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion java-iam-policy/.repo-metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"repo": "googleapis/google-cloud-java",
"repo_short": "java-iam-policy",
"distribution_name": "com.google.cloud:google-iam-policy",
"api_id": "iam.googleapis.com",
Comment thread
lqiu96 marked this conversation as resolved.
"library_type": "GAPIC_AUTO",
"requires_billing": true,
"excluded_dependencies": "google-iam-policy",
Expand Down
2 changes: 1 addition & 1 deletion java-iam-policy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[code-of-conduct]: https://github.com/googleapis/google-cloud-java/blob/main/CODE_OF_CONDUCT.md#contributor-code-of-conduct
[license]: https://github.com/googleapis/google-cloud-java/blob/main/LICENSE
[enable-billing]: https://cloud.google.com/apis/docs/getting-started#enabling_billing
[enable-api]: https://console.cloud.google.com/flows/enableapi?apiid=iam.googleapis.com
Comment thread
lqiu96 marked this conversation as resolved.
Comment thread
lqiu96 marked this conversation as resolved.

[libraries-bom]: https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google-Cloud-Platform-Libraries-BOM
[shell_img]: https://gstatic.com/cloudssh/images/open-btn.png

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
* <p>Package-private for internal use.
*/
class ChannelPool extends ManagedChannel {
private static final String CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING =
"Channel pool is repeatedly resizing. "
+ "Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging "
+ "and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.";
@VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName());
private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50);

Expand All @@ -84,6 +89,16 @@ class ChannelPool extends ManagedChannel {
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

// The number of consecutive resize cycles to wait before logging a warning about repeated
// resizing. This is an arbitrary value chosen to detect repeated requests for changes
// (multiple continuous increase or decrease attempts) without being too sensitive.
private static final int CONSECUTIVE_RESIZE_THRESHOLD = 5;

// Tracks the number of consecutive resize cycles where a resize actually occurred (either expand
// or shrink). Used to detect repeated resizing activity and log a warning.
// Note: This field is only accessed safely within resizeSafely() and does not need to be atomic.
private int consecutiveResizes = 0;
Comment thread
lqiu96 marked this conversation as resolved.

static ChannelPool create(
ChannelPoolSettings settings,
ChannelFactory channelFactory,
Expand Down Expand Up @@ -313,9 +328,24 @@ void resize() {
int currentSize = localEntries.size();
int delta = tentativeTarget - currentSize;
int dampenedTarget = tentativeTarget;
if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) {
dampenedTarget =
currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta);
if (Math.abs(delta) > settings.getMaxResizeDelta()) {
dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta);
}

// We only count as "resized" if the thresholds are crossed and we actually attempt to scale.
// Checking (dampenedTarget != currentSize) would cause false positives when the pool is within
// bounds but not at the target, because the target aims for the middle of the bounds.
boolean resized = (currentSize < minChannels || currentSize > maxChannels);
if (resized) {
consecutiveResizes++;
} else {
consecutiveResizes = 0;
}

// Log warning only once when the threshold is reached to avoid spamming logs.
// Using == instead of >= ensures we don't log on every subsequent resize cycle.
if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) {
LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
}

// Only resize the pool when thresholds are crossed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ public abstract class ChannelPoolSettings {
*/
public abstract int getMaxChannelCount();

/**
* The maximum number of channels that can be added or removed at a time.
*
* <p>This setting limits the rate at which the channel pool can grow or shrink in a single resize
* period. The default value is {@value #MAX_RESIZE_DELTA}. Increasing this value can help the
* pool better handle sudden bursts or spikes in requests by allowing it to scale up faster.
* Regardless of this setting, the number of channels will never exceed {@link
* #getMaxChannelCount()}.
*/
public abstract int getMaxResizeDelta();

/**
* The initial size of the channel pool.
*
Expand Down Expand Up @@ -132,6 +143,7 @@ public static ChannelPoolSettings staticallySized(int size) {
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
.setMinChannelCount(size)
.setMaxChannelCount(size)
.setMaxResizeDelta(Math.min(MAX_RESIZE_DELTA, size))
.build();
}

Expand All @@ -142,7 +154,8 @@ public static Builder builder() {
.setMaxChannelCount(200)
.setMinRpcsPerChannel(0)
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
.setPreemptiveRefreshEnabled(false);
.setPreemptiveRefreshEnabled(false)
.setMaxResizeDelta(MAX_RESIZE_DELTA);
}

@AutoValue.Builder
Expand All @@ -159,6 +172,8 @@ public abstract static class Builder {

public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);

public abstract Builder setMaxResizeDelta(int count);

abstract ChannelPoolSettings autoBuild();

public ChannelPoolSettings build() {
Expand All @@ -178,6 +193,11 @@ public ChannelPoolSettings build() {
"initial channel count must be less than maxChannelCount");
Preconditions.checkState(
s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0");
Preconditions.checkState(
s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0");
Preconditions.checkState(
s.getMaxResizeDelta() <= s.getMaxChannelCount(),
"Max resize delta cannot be greater than max channel count");
return s;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,53 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce
assertThat(pool.entries.get()).hasSize(2);
}

@Test
void customResizeDeltaIsRespected() throws Exception {
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);

List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};

pool =
new ChannelPool(
ChannelPoolSettings.builder()
.setInitialChannelCount(2)
.setMinRpcsPerChannel(1)
.setMaxRpcsPerChannel(2)
.setMaxResizeDelta(5)
.build(),
channelFactory,
provider);
assertThat(pool.entries.get()).hasSize(2);

// Add 20 RPCs to push expansion
for (int i = 0; i < 20; i++) {
ClientCalls.futureUnaryCall(
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
}
pool.resize();
// delta is 15 - 2 = 13. Capped at maxResizeDelta = 5.
// Expected size = 2 + 5 = 7.
assertThat(pool.entries.get()).hasSize(7);
}

@Test
void removedIdleChannelsAreShutdown() throws Exception {
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
Expand Down Expand Up @@ -679,6 +726,125 @@ public void onComplete() {}
assertThat(e.getMessage()).isEqualTo("Call is already cancelled");
}

@Test
void repeatedResizingLogsWarningOnExpand() throws Exception {
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);

List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};

pool =
new ChannelPool(
ChannelPoolSettings.builder()
.setInitialChannelCount(1)
.setMinRpcsPerChannel(1)
.setMaxRpcsPerChannel(2)
.setMaxResizeDelta(1)
.setMinChannelCount(1)
.setMaxChannelCount(10)
.build(),
channelFactory,
provider);
assertThat(pool.entries.get()).hasSize(1);

FakeLogHandler logHandler = new FakeLogHandler();
ChannelPool.LOG.addHandler(logHandler);

try {
// Add 20 RPCs to push expansion
for (int i = 0; i < 20; i++) {
ClientCalls.futureUnaryCall(
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
}

// Resize 4 times, should not log warning yet
for (int i = 0; i < 4; i++) {
pool.resize();
}
assertThat(logHandler.getAllMessages()).isEmpty();

// 5th resize, should log warning
pool.resize();
assertThat(logHandler.getAllMessages()).hasSize(1);
assertThat(logHandler.getAllMessages())
.contains(
"Channel pool is repeatedly resizing. Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.");

// 6th resize, should not log again
pool.resize();
assertThat(logHandler.getAllMessages()).hasSize(1);
} finally {
ChannelPool.LOG.removeHandler(logHandler);
}
}

@Test
void repeatedResizingLogsWarningOnShrink() throws Exception {
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);

List<ManagedChannel> channels = new ArrayList<>();
ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
channels.add(channel);
return channel;
};

pool =
new ChannelPool(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
.setMinRpcsPerChannel(1)
.setMaxRpcsPerChannel(2)
.setMaxResizeDelta(1)
.setMinChannelCount(1)
.setMaxChannelCount(10)
.build(),
channelFactory,
provider);
assertThat(pool.entries.get()).hasSize(10);

FakeLogHandler logHandler = new FakeLogHandler();
ChannelPool.LOG.addHandler(logHandler);

try {
// 0 RPCs, should shrink every cycle
// Resize 4 times, should not log warning yet
for (int i = 0; i < 4; i++) {
pool.resize();
}
assertThat(logHandler.getAllMessages()).isEmpty();

// 5th resize, should log warning
pool.resize();
assertThat(logHandler.getAllMessages())
.contains(
"Channel pool is repeatedly resizing. Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.");
} finally {
ChannelPool.LOG.removeHandler(logHandler);
}
}

@Test
void testDoubleRelease() throws Exception {
FakeLogHandler logHandler = new FakeLogHandler();
Expand Down
Loading