xds: Change XdsClusterConfig to have children field instead of endpoint (#11888)

* Change XdsConfig to match spec with a `children` object holding either `a list of leaf cluster names` or `an EdsUpdate`.  Removed intermediate aggregate nodes from `XdsConfig.clusters`.
This commit is contained in:
Larry Safran 2025-02-11 12:38:52 -08:00 committed by GitHub
parent fc8571a0e5
commit ade2dd2038
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 208 additions and 46 deletions

View File

@ -26,6 +26,7 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import java.io.Closeable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -103,19 +104,17 @@ final class XdsConfig {
static final class XdsClusterConfig {
private final String clusterName;
private final CdsUpdate clusterResource;
private final StatusOr<EdsUpdate> endpoint; //Will be null for non-EDS clusters
private final ClusterChild children; // holds details
XdsClusterConfig(String clusterName, CdsUpdate clusterResource,
StatusOr<EdsUpdate> endpoint) {
XdsClusterConfig(String clusterName, CdsUpdate clusterResource, ClusterChild details) {
this.clusterName = checkNotNull(clusterName, "clusterName");
this.clusterResource = checkNotNull(clusterResource, "clusterResource");
this.endpoint = endpoint;
this.children = checkNotNull(details, "details");
}
@Override
public int hashCode() {
int endpointHash = (endpoint != null) ? endpoint.hashCode() : 0;
return clusterName.hashCode() + clusterResource.hashCode() + endpointHash;
return clusterName.hashCode() + clusterResource.hashCode() + children.hashCode();
}
@Override
@ -126,7 +125,7 @@ final class XdsConfig {
XdsClusterConfig o = (XdsClusterConfig) obj;
return Objects.equals(clusterName, o.clusterName)
&& Objects.equals(clusterResource, o.clusterResource)
&& Objects.equals(endpoint, o.endpoint);
&& Objects.equals(children, o.children);
}
@Override
@ -134,7 +133,8 @@ final class XdsConfig {
StringBuilder builder = new StringBuilder();
builder.append("XdsClusterConfig{clusterName=").append(clusterName)
.append(", clusterResource=").append(clusterResource)
.append(", endpoint=").append(endpoint).append("}");
.append(", children={").append(children)
.append("}");
return builder.toString();
}
@ -146,8 +146,60 @@ final class XdsConfig {
return clusterResource;
}
public StatusOr<EdsUpdate> getEndpoint() {
return endpoint;
public ClusterChild getChildren() {
return children;
}
interface ClusterChild {}
/** Endpoint info for EDS and LOGICAL_DNS clusters. If there was an
* error, endpoints will be null and resolution_note will be set.
*/
static final class EndpointConfig implements ClusterChild {
private final StatusOr<EdsUpdate> endpoint;
public EndpointConfig(StatusOr<EdsUpdate> endpoint) {
this.endpoint = checkNotNull(endpoint, "endpoint");
}
@Override
public int hashCode() {
return endpoint.hashCode();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof EndpointConfig)) {
return false;
}
return Objects.equals(endpoint, ((EndpointConfig)obj).endpoint);
}
public StatusOr<EdsUpdate> getEndpoint() {
return endpoint;
}
}
// The list of leaf clusters for an aggregate cluster.
static final class AggregateConfig implements ClusterChild {
private final List<String> leafNames;
public AggregateConfig(List<String> leafNames) {
this.leafNames = checkNotNull(leafNames, "leafNames");
}
@Override
public int hashCode() {
return leafNames.hashCode();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof AggregateConfig)) {
return false;
}
return Objects.equals(leafNames, ((AggregateConfig) obj).leafNames);
}
}
}

View File

@ -29,6 +29,9 @@ import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
@ -36,6 +39,7 @@ import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -43,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
/**
@ -299,27 +304,123 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
// Iterate CDS watchers
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
String clusterName = cdsWatcher.resourceName();
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdate = cdsWatcher.getData();
if (cdsUpdate.hasValue()) {
XdsConfig.XdsClusterConfig clusterConfig;
String edsName = cdsUpdate.getValue().edsServiceName();
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName);
// Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
List<String> topLevelClusters =
cdsWatchers.values().stream()
.filter(XdsDependencyManager::isTopLevelCluster)
.map(w -> w.resourceName())
.collect(Collectors.toList());
// Only EDS type clusters have endpoint data
StatusOr<XdsEndpointResource.EdsUpdate> data =
edsWatcher != null ? edsWatcher.getData() : null;
clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data);
builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig));
// Flatten multi-level aggregates into lists of leaf clusters
Set<String> leafNames =
addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
addLeavesToBuilder(builder, edsWatchers, leafNames);
return builder.build();
}
private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder,
Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
Set<String> leafNames) {
for (String clusterName : leafNames) {
CdsWatcher cdsWatcher = getCluster(clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
if (cdsUpdateOr.hasValue()) {
XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
if (cdsUpdate.clusterType() == ClusterType.EDS) {
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
if (edsWatcher != null) {
EndpointConfig child = new EndpointConfig(edsWatcher.getData());
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"EDS resource not found for cluster " + clusterName)));
}
} else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
// TODO get the resolved endpoint configuration
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null))));
}
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus()));
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
}
}
}
// Adds the top-level clusters to the builder and returns the leaf cluster names
private Set<String> addTopLevelClustersToBuilder(
XdsConfig.XdsConfigBuilder builder, Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers, List<String> topLevelClusters) {
Set<String> leafClusterNames = new HashSet<>();
for (String clusterName : topLevelClusters) {
CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
if (!cdsWatcher.hasDataValue()) {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
continue;
}
XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
XdsConfig.XdsClusterConfig.ClusterChild child;
switch (cdsUpdate.clusterType()) {
case AGGREGATE:
List<String> leafNames = getLeafNames(cdsUpdate);
child = new AggregateConfig(leafNames);
leafClusterNames.addAll(leafNames);
break;
case EDS:
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData());
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"EDS resource not found for cluster " + clusterName)));
continue;
}
break;
case LOGICAL_DNS:
// TODO get the resolved endpoint configuration
child = new EndpointConfig(null);
break;
default:
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
}
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
}
return leafClusterNames;
}
private List<String> getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) {
List<String> childNames = new ArrayList<>();
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
if (data == null || !data.hasValue() || data.getValue() == null) {
childNames.add(cluster);
continue;
}
if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
childNames.addAll(getLeafNames(data.getValue()));
} else {
childNames.add(cluster);
}
}
return builder.build();
return childNames;
}
private static boolean isTopLevelCluster(XdsWatcherBase<?> cdsWatcher) {
if (! (cdsWatcher instanceof CdsWatcher)) {
return false;
}
return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
.anyMatch(depth -> depth == 1);
}
@Override

View File

@ -57,6 +57,8 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClient;
@ -219,28 +221,37 @@ public class XdsDependencyManagerTest {
XdsTestUtils.setAggregateCdsConfig(controlPlaneService, serverName, rootName, childNames);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> lastConfigClusters =
Map<String, StatusOr<XdsClusterConfig>> lastConfigClusters =
testWatcher.lastConfig.getClusters();
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
StatusOr<XdsConfig.XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
StatusOr<XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);
for (String childName : childNames) {
assertThat(lastConfigClusters).containsKey(childName);
StatusOr<XdsClusterConfig> childConfigOr = lastConfigClusters.get(childName);
XdsClusterResource.CdsUpdate childResource =
lastConfigClusters.get(childName).getValue().getClusterResource();
childConfigOr.getValue().getClusterResource();
assertThat(childResource.clusterType()).isEqualTo(EDS);
assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName));
StatusOr<XdsEndpointResource.EdsUpdate> endpoint =
lastConfigClusters.get(childName).getValue().getEndpoint();
StatusOr<EdsUpdate> endpoint = getEndpoint(childConfigOr);
assertThat(endpoint.hasValue()).isTrue();
assertThat(endpoint.getValue().clusterName).isEqualTo(getEdsNameForCluster(childName));
}
}
private static StatusOr<EdsUpdate> getEndpoint(StatusOr<XdsClusterConfig> childConfigOr) {
XdsClusterConfig.ClusterChild clusterChild = childConfigOr.getValue()
.getChildren();
assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
StatusOr<EdsUpdate> endpoint = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
assertThat(endpoint).isNotNull();
return endpoint;
}
@Test
public void testComplexRegisteredAggregate() throws IOException {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
@ -289,7 +300,6 @@ public class XdsDependencyManagerTest {
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
String rootName1 = "root_c";
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
assertThat(subscription1).isNotNull();
@ -299,6 +309,7 @@ public class XdsDependencyManagerTest {
StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString());
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue();
@ -336,7 +347,7 @@ public class XdsDependencyManagerTest {
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
List<StatusOr<XdsConfig.XdsClusterConfig>> returnedClusters = new ArrayList<>();
List<StatusOr<XdsClusterConfig>> returnedClusters = new ArrayList<>();
for (String childName : childNames) {
returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName));
}
@ -344,7 +355,7 @@ public class XdsDependencyManagerTest {
// Check that missing cluster reported Status and the other 2 are present
Status expectedClusterStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2)));
StatusOr<XdsConfig.XdsClusterConfig> missingCluster = returnedClusters.get(2);
StatusOr<XdsClusterConfig> missingCluster = returnedClusters.get(2);
assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString());
assertThat(returnedClusters.get(0).hasValue()).isTrue();
assertThat(returnedClusters.get(1).hasValue()).isTrue();
@ -352,9 +363,9 @@ public class XdsDependencyManagerTest {
// Check that missing EDS reported Status, the other one is present and the garbage EDS is not
Status expectedEdsStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1));
assertThat(returnedClusters.get(0).getValue().getEndpoint().hasValue()).isTrue();
assertThat(returnedClusters.get(1).getValue().getEndpoint().hasValue()).isFalse();
assertThat(returnedClusters.get(1).getValue().getEndpoint().getStatus().toString())
assertThat(getEndpoint(returnedClusters.get(0)).hasValue()).isTrue();
assertThat(getEndpoint(returnedClusters.get(1)).hasValue()).isFalse();
assertThat(getEndpoint(returnedClusters.get(1)).getStatus().toString())
.isEqualTo(expectedEdsStatus.toString());
verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any());
@ -539,7 +550,7 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(8);
assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(4);
// Now that it is released, we should only have A11
rootSub.close();
@ -582,11 +593,9 @@ public class XdsDependencyManagerTest {
assertThat(initialConfig.getClusters().keySet())
.containsExactly("root", "clusterA", "clusterB");
XdsEndpointResource.EdsUpdate edsForA =
initialConfig.getClusters().get("clusterA").getValue().getEndpoint().getValue();
EdsUpdate edsForA = getEndpoint(initialConfig.getClusters().get("clusterA")).getValue();
assertThat(edsForA.clusterName).isEqualTo(edsName);
XdsEndpointResource.EdsUpdate edsForB =
initialConfig.getClusters().get("clusterB").getValue().getEndpoint().getValue();
EdsUpdate edsForB = getEndpoint(initialConfig.getClusters().get("clusterB")).getValue();
assertThat(edsForB.clusterName).isEqualTo(edsName);
assertThat(edsForA).isEqualTo(edsForB);
edsForA.localityLbEndpointsMap.values().forEach(
@ -635,7 +644,7 @@ public class XdsDependencyManagerTest {
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig config = xdsConfigCaptor.getValue();
assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName);
assertThat(config.getClusters().size()).isEqualTo(8);
assertThat(config.getClusters().size()).isEqualTo(4);
}
@Test
@ -689,8 +698,7 @@ public class XdsDependencyManagerTest {
// Verify that the config is updated as expected
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig config = xdsConfigCaptor.getValue();
assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA", "clusterA2",
"clusterA21", "clusterA22");
assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA21", "clusterA22");
}
private Listener buildInlineClientListener(String rdsName, String clusterName) {

View File

@ -57,6 +57,7 @@ import io.grpc.internal.JsonParser;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsResourceType;
@ -269,7 +270,7 @@ public class XdsTestUtils {
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null)
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
CLUSTER_NAME, cdsUpdate, StatusOr.fromValue(edsUpdate));
CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate)));
builder
.setListener(ldsUpdate)