Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import com.google.common.primitives.UnsignedInts;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
Expand Down Expand Up @@ -83,15 +86,17 @@ final class CdsLoadBalancer2 extends LoadBalancer {
private final Helper helper;
private final LoadBalancerRegistry lbRegistry;
private final ClusterState clusterState = new ClusterState();
private final CdsLbHelper cdsLbHelper = new CdsLbHelper();
private GracefulSwitchLoadBalancer delegate;
private boolean addBackendServicePickDetailsLabel;
// Following fields are effectively final.
private String clusterName;
private Subscription clusterSubscription;

CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.delegate = new GracefulSwitchLoadBalancer(helper);
this.delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
Expand Down Expand Up @@ -126,6 +131,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();

if (clusterConfig.getChildren() instanceof EndpointConfig) {
addBackendServicePickDetailsLabel = true;
StatusOr<EdsUpdate> edsUpdate = getEdsUpdate(xdsConfig, clusterName);
StatusOr<ClusterResolutionResult> statusOrResult = clusterState.edsUpdateToResult(
clusterName,
Expand Down Expand Up @@ -156,8 +162,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(gracefulConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, clusterName)
.build())
.build());
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
addBackendServicePickDetailsLabel = false;
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
for (String childCluster: leafClusters) {
Expand Down Expand Up @@ -196,7 +206,8 @@ public void handleNameResolutionError(Status error) {
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
delegate.shutdown();
delegate = new GracefulSwitchLoadBalancer(helper);
delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
addBackendServicePickDetailsLabel = false;
if (clusterSubscription != null) {
clusterSubscription.close();
clusterSubscription = null;
Expand All @@ -206,6 +217,7 @@ public void shutdown() {
@CheckReturnValue // don't forget to return up the stack after the fail call
private Status fail(Status error) {
delegate.shutdown();
addBackendServicePickDetailsLabel = false;
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
Expand All @@ -215,6 +227,38 @@ private String errorPrefix() {
return "CdsLb for " + clusterName + ": ";
}

private final class CdsLbHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
if (addBackendServicePickDetailsLabel) {
newPicker = new BackendServiceMetricLabelSubchannelPicker(newPicker, clusterName);
}
delegate().updateBalancingState(newState, newPicker);
}

@Override
protected Helper delegate() {
return helper;
}
}

private static final class BackendServiceMetricLabelSubchannelPicker extends SubchannelPicker {
private final SubchannelPicker delegate;
private final String backendService;

private BackendServiceMetricLabelSubchannelPicker(
SubchannelPicker delegate, String backendService) {
this.delegate = checkNotNull(delegate, "delegate");
this.backendService = checkNotNull(backendService, "backendService");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", backendService);
return delegate.pickSubchannel(args);
}
}

/**
* The number of bits assigned to the fractional part of fixed-point values. We normalize weights
* to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==
Expand Down
5 changes: 0 additions & 5 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -154,9 +153,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

return childSwitchLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setAttributes(attributes.toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
.build())
.setLoadBalancingPolicyConfig(config.childConfig)
.build());
}
Expand Down Expand Up @@ -409,7 +405,6 @@ private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
.accept(filterMetadata);
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
for (DropOverload dropOverload : dropPolicies) {
int rand = random.nextInt(1_000_000);
if (rand < dropOverload.dropsPerMillion()) {
Expand Down
125 changes: 120 additions & 5 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
Expand Down Expand Up @@ -296,6 +297,24 @@ public void nonAggregateCluster_resourceUpdate() {
assertThat(childLbConfig.maxConcurrentRequests).isEqualTo(200L);
}

@Test
public void nonAggregateCluster_addsBackendServiceAttributeAndPickDetailsLabel() {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER));
startXdsDepManager();

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);

verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
PickResult result =
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));

assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
}

@Test
public void nonAggregateCluster_resourceRevoked() {
lbRegistry.register(new PriorityLoadBalancerProvider());
Expand Down Expand Up @@ -429,6 +448,73 @@ public void discoverAggregateCluster_createsPriorityLbPolicy() {
.isEqualTo("cds_experimental");
}

@Test
public void aggregateCluster_doesNotAddBackendServiceAttributeAndPickDetailsLabelFromRoot() {
String cluster1 = "cluster-01.googleapis.com";
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(
// CLUSTER (aggr.) -> [cluster1 (EDS)]
CLUSTER, Cluster.newBuilder()
.setName(CLUSTER)
.setClusterType(Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(Any.pack(ClusterConfig.newBuilder()
.addClusters(cluster1)
.build())))
.build(),
cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build()));
startXdsDepManager();

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull();
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);

verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
PickResult result =
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));

assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any());
}

@Test
public void aggregateCluster_leafAddsBackendServicePickDetailsLabel() {
lbRegistry.register(new PriorityLoadBalancerProvider());
CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry);
lbRegistry.register(cdsLoadBalancerProvider);
loadBalancer = (CdsLoadBalancer2) cdsLoadBalancerProvider.newLoadBalancer(helper);

String cluster1 = "cluster-01.googleapis.com";
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(
// CLUSTER (aggr.) -> [cluster1 (EDS)]
CLUSTER, Cluster.newBuilder()
.setName(CLUSTER)
.setClusterType(Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(Any.pack(ClusterConfig.newBuilder()
.addClusters(cluster1)
.build())))
.build(),
cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build()));
startXdsDepManager();

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterImplConfig clusterImplConfig = (ClusterImplConfig) childBalancer.config;
assertThat(clusterImplConfig.cluster).isEqualTo(cluster1);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
.isEqualTo(cluster1);
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);

verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
PickResult result =
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));

assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", cluster1);
verify(detailsConsumer, never()).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
}

@Test
// Both priorities will get tried using real priority LB policy.
public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also test triggering a subchannel pick using a mock PickDetailsConsumer for the aggregate cluster and verify that the leaf instance actually adds the rpc.lb.backend_service label during a pick?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

Done. I added a test that uses the real priority policy to instantiate the leaf CDS path under an aggregate cluster, then triggers a pick through the top-level picker with a mock PickDetailsConsumer.

The test verifies that grpc.lb.backend_service is added with the leaf cluster name and not the aggregate root cluster name.

I also renamed the aggregate-root negative test from "Or" to "And" since it checks both the backend service attribute and the pick-details label.

Expand Down Expand Up @@ -462,12 +548,16 @@ public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {
.isEqualTo("cluster-01.googleapis.com");
assertThat(cluster1ImplConfig.edsServiceName)
.isEqualTo("backend-service-1.googleapis.com");
assertThat(childBalancers.get(0).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
.isEqualTo(cluster1);
ClusterImplConfig cluster2ImplConfig =
(ClusterImplConfig) childBalancers.get(1).config;
assertThat(cluster2ImplConfig.cluster)
.isEqualTo("cluster-02.googleapis.com");
assertThat(cluster2ImplConfig.edsServiceName)
.isEqualTo("backend-service-1.googleapis.com");
assertThat(childBalancers.get(1).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
.isEqualTo(cluster2);
}

@Test
Expand Down Expand Up @@ -577,7 +667,9 @@ public void unknownLbProvider() {
startXdsDepManager();
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result =
pickerCaptor.getValue().pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy");
Expand Down Expand Up @@ -605,7 +697,9 @@ public void invalidLbConfig() {
startXdsDepManager();
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result =
pickerCaptor.getValue().pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'");
Expand Down Expand Up @@ -639,12 +733,19 @@ private void startXdsDepManager(final CdsConfig cdsConfig) {
}

private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) {
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = picker.pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
}

private static PickSubchannelArgs newPickSubchannelArgs(PickDetailsConsumer pickDetailsConsumer) {
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getPickDetailsConsumer()).thenReturn(pickDetailsConsumer);
return args;
}

private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
private final LoadBalancerProvider configParsingDelegate;
Expand All @@ -660,7 +761,7 @@ private final class FakeLoadBalancerProvider extends LoadBalancerProvider {

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName);
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper);
childBalancers.add(balancer);
return balancer;
}
Expand Down Expand Up @@ -692,17 +793,21 @@ public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig(

private final class FakeLoadBalancer extends LoadBalancer {
private final String name;
private final Helper helper;
private Object config;
private Attributes attributes;
private Status upstreamError;
private boolean shutdown;

FakeLoadBalancer(String name) {
FakeLoadBalancer(String name, Helper helper) {
this.name = name;
this.helper = helper;
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
attributes = resolvedAddresses.getAttributes();
return Status.OK;
}

Expand All @@ -716,5 +821,15 @@ public void shutdown() {
shutdown = true;
childBalancers.remove(this);
}

void deliverSubchannelState(final PickResult result, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return result;
}
};
helper.updateBalancingState(state, picker);
}
}
}
Loading
Loading