From 4cd78810865fa8703b8a69753cf8d85291e34c6b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 29 May 2025 13:12:48 -0700 Subject: [PATCH] xds: Fix XdsDepManager aggregate cluster child ordering and loop detection The children of aggregate clusters have a priority order, so we can't ever throw them in an ordinary set for later iteration. This now detects recusion limits only after subscribing, but that matches our existing behavior in CdsLoadBalancer2. We don't get much value detecting the limit before subscribing and doing so makes watcher types more different. Loops are still a bit broken as they won't be unwatched when orphaned, as they will form a reference loop. In CdsLoadBalancer2, duplicate clusters had duplicate watchers so there was single-ownership and reference cycles couldn't form. Fixing that is a bigger change. Intermediate aggregate clusters are now included in XdsConfig, just for simplicity. It doesn't hurt anything whether they are present or missing. but it required updates to some tests. --- xds/src/main/java/io/grpc/xds/XdsConfig.java | 13 +- .../io/grpc/xds/XdsDependencyManager.java | 230 +++++++----------- .../io/grpc/xds/XdsDependencyManagerTest.java | 39 ++- 3 files changed, 130 insertions(+), 152 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index ec8f3dc076..1f464aa132 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.StatusOr; import io.grpc.xds.XdsClusterResource.CdsUpdate; @@ -26,9 +27,9 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import java.io.Closeable; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * Represents the xDS configuration tree for a specified Listener. @@ -191,10 +192,14 @@ final class XdsConfig { // The list of leaf clusters for an aggregate cluster. static final class AggregateConfig implements ClusterChild { - private final Set leafNames; + private final List leafNames; - public AggregateConfig(Set leafNames) { - this.leafNames = checkNotNull(leafNames, "leafNames"); + public AggregateConfig(List leafNames) { + this.leafNames = ImmutableList.copyOf(checkNotNull(leafNames, "leafNames")); + } + + public List getLeafNames() { + return leafNames; } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index d786e525c0..7bbd0064ed 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -22,6 +22,7 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate; import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import io.grpc.InternalLogId; import io.grpc.NameResolver; @@ -42,12 +43,12 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -101,7 +102,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi subscription.closed = true; return; // shutdown() called } - addClusterWatcher(clusterName, subscription, 1); + addClusterWatcher(clusterName, subscription); }); return subscription; @@ -164,7 +165,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi CdsWatcher cdsWatcher = (CdsWatcher) watcher; if (!cdsWatcher.parentContexts.isEmpty()) { String msg = String.format("CdsWatcher %s has parent contexts %s", - cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet()); + cdsWatcher.resourceName(), cdsWatcher.parentContexts); throw new IllegalStateException(msg); } } else if (watcher instanceof EdsWatcher) { @@ -309,24 +310,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } builder.setVirtualHost(activeVirtualHost); - Map> edsWatchers = - getWatchers(ENDPOINT_RESOURCE); - Map> cdsWatchers = - getWatchers(CLUSTER_RESOURCE); - - // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters - List topLevelClusters = - cdsWatchers.values().stream() - .filter(XdsDependencyManager::isTopLevelCluster) - .map(XdsWatcherBase::resourceName) - .distinct() - .collect(Collectors.toList()); - - // Flatten multi-level aggregates into lists of leaf clusters - Set leafNames = - addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters); - - addLeavesToBuilder(builder, edsWatchers, leafNames); + Map> clusters = new HashMap<>(); + LinkedHashSet ancestors = new LinkedHashSet<>(); + for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) { + addConfigForCluster(clusters, cluster, ancestors); + } + for (Map.Entry> me : clusters.entrySet()) { + builder.addCluster(me.getKey(), me.getValue()); + } return StatusOr.fromValue(builder.build()); } @@ -344,111 +335,81 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return tTypeWatchers.watchers; } - private void addLeavesToBuilder( - XdsConfig.XdsConfigBuilder builder, - Map> edsWatchers, - Set leafNames) { - for (String clusterName : leafNames) { - CdsWatcher cdsWatcher = getCluster(clusterName); - StatusOr cdsUpdateOr = cdsWatcher.getData(); + private void addConfigForCluster( + Map> clusters, + String clusterName, + @SuppressWarnings("NonApiType") // Need order-preserving set for errors + LinkedHashSet ancestors) { + if (clusters.containsKey(clusterName)) { + return; + } + if (ancestors.contains(clusterName)) { + clusters.put(clusterName, StatusOr.fromStatus( + Status.INTERNAL.withDescription( + "Aggregate cluster cycle detected: " + ancestors))); + return; + } + if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) { + clusters.put(clusterName, StatusOr.fromStatus( + Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors))); + return; + } - if (!cdsUpdateOr.hasValue()) { - builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus())); - continue; - } + CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); + StatusOr cdsWatcherDataOr = cdsWatcher.getData(); + if (!cdsWatcherDataOr.hasValue()) { + clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); + return; + } - XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); - if (cdsUpdate.clusterType() == ClusterType.EDS) { + XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); + XdsConfig.XdsClusterConfig.ClusterChild child; + switch (cdsUpdate.clusterType()) { + case AGGREGATE: + // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it + // preserves the priority across all aggregate clusters + LinkedHashSet leafNames = new LinkedHashSet(); + ancestors.add(clusterName); + for (String childCluster : cdsUpdate.prioritizedClusterNames()) { + addConfigForCluster(clusters, childCluster, ancestors); + StatusOr config = clusters.get(childCluster); + if (!config.hasValue()) { + clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( + "Unable to get leaves for " + clusterName + ": " + + config.getStatus().getDescription()))); + return; + } + XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren(); + if (children instanceof AggregateConfig) { + leafNames.addAll(((AggregateConfig) children).getLeafNames()); + } else { + leafNames.add(childCluster); + } + } + ancestors.remove(clusterName); + + child = new AggregateConfig(ImmutableList.copyOf(leafNames)); + break; + case EDS: XdsWatcherBase edsWatcher = - edsWatchers.get(cdsWatcher.getEdsServiceName()); - EndpointConfig child; + getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName()); if (edsWatcher != null) { child = new EndpointConfig(edsWatcher.getData()); } else { child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( "EDS resource not found for cluster " + clusterName))); } - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); - } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { - builder.addCluster(clusterName, StatusOr.fromStatus( + break; + case LOGICAL_DNS: + // TODO get the resolved endpoint configuration + child = new EndpointConfig(StatusOr.fromStatus( Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); - } + break; + default: + throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); } - } - - // Adds the top-level clusters to the builder and returns the leaf cluster names - private Set addTopLevelClustersToBuilder( - XdsConfig.XdsConfigBuilder builder, - Map> edsWatchers, - Map> cdsWatchers, - List topLevelClusters) { - - Set leafClusterNames = new HashSet<>(); - for (String clusterName : topLevelClusters) { - CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName); - StatusOr cdsWatcherDataOr = cdsWatcher.getData(); - if (!cdsWatcher.hasDataValue()) { - builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); - continue; - } - - XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); - XdsConfig.XdsClusterConfig.ClusterChild child; - switch (cdsUpdate.clusterType()) { - case AGGREGATE: - Set leafNames = new HashSet<>(); - addLeafNames(leafNames, cdsUpdate); - child = new AggregateConfig(leafNames); - leafClusterNames.addAll(leafNames); - break; - case EDS: - XdsWatcherBase edsWatcher = - edsWatchers.get(cdsWatcher.getEdsServiceName()); - if (edsWatcher != null) { - child = new EndpointConfig(edsWatcher.getData()); - } else { - child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( - "EDS resource not found for cluster " + clusterName))); - } - break; - case LOGICAL_DNS: - // TODO get the resolved endpoint configuration - child = new EndpointConfig(StatusOr.fromStatus( - Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); - break; - default: - throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); - } - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); - } - - return leafClusterNames; - } - - private void addLeafNames(Set leafNames, XdsClusterResource.CdsUpdate cdsUpdate) { - for (String cluster : cdsUpdate.prioritizedClusterNames()) { - if (leafNames.contains(cluster)) { - continue; - } - StatusOr data = getCluster(cluster).getData(); - if (data == null || !data.hasValue() || data.getValue() == null) { - leafNames.add(cluster); - continue; - } - if (data.getValue().clusterType() == ClusterType.AGGREGATE) { - addLeafNames(leafNames, data.getValue()); - } else { - leafNames.add(cluster); - } - } - } - - private static boolean isTopLevelCluster( - XdsWatcherBase cdsWatcher) { - return ((CdsWatcher)cdsWatcher).parentContexts.values().stream() - .anyMatch(depth -> depth == 1); + clusters.put(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } @Override @@ -467,14 +428,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi addWatcher(new EdsWatcher(edsServiceName, parentContext)); } - private void addClusterWatcher(String clusterName, Object parentContext, int depth) { + private void addClusterWatcher(String clusterName, Object parentContext) { CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); if (watcher != null) { - watcher.parentContexts.put(parentContext, depth); + watcher.parentContexts.add(parentContext); return; } - addWatcher(new CdsWatcher(clusterName, parentContext, depth)); + addWatcher(new CdsWatcher(clusterName, parentContext)); } private void updateRoutes(List virtualHosts, Object newParentContext, @@ -494,9 +455,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi deletedClusters.forEach(watcher -> cancelClusterWatcherTree(getCluster(watcher), newParentContext)); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); } else { - newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); } } @@ -805,11 +766,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class CdsWatcher extends XdsWatcherBase { - Map parentContexts = new HashMap<>(); + Set parentContexts = new HashSet<>(); - CdsWatcher(String resourceName, Object parentContext, int depth) { + CdsWatcher(String resourceName, Object parentContext) { super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); - this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth); + this.parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -829,14 +790,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi break; case AGGREGATE: Object parentContext = this; - int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1; - if (depth > MAX_CLUSTER_RECURSION_DEPTH) { - logger.log(XdsLogger.XdsLogLevel.WARNING, - "Cluster recursion depth limit exceeded for cluster {0}", resourceName()); - Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); - setDataAsStatus(error); - } if (hasDataValue()) { Set oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE ? new HashSet<>(getData().getValue().prioritizedClusterNames()) @@ -847,21 +800,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); - if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { - setData(update); - Set addedClusters = Sets.difference(newNames, oldNames); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth)); - } - - } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { + setData(update); + Set addedClusters = Sets.difference(newNames, oldNames); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext)); + } else { setData(update); update.prioritizedClusterNames() - .forEach(name -> addClusterWatcher(name, parentContext, depth)); + .forEach(name -> addClusterWatcher(name, parentContext)); } break; default: Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); + "unknown cluster type in " + resourceName() + " " + update.clusterType()); setDataAsStatus(error); } maybePublishConfig(); diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 14f554412a..da960e1e13 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -18,7 +18,6 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.StatusMatcher.statusHasCode; -import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE; import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS; import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; @@ -239,9 +238,10 @@ public class XdsDependencyManagerTest { testWatcher.lastConfig.getClusters(); assertThat(lastConfigClusters).hasSize(childNames.size() + 1); StatusOr rootC = lastConfigClusters.get(rootName); - CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); - assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE); - assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames); + assertThat(rootC.getValue().getChildren()).isInstanceOf(XdsClusterConfig.AggregateConfig.class); + XdsClusterConfig.AggregateConfig aggConfig = + (XdsClusterConfig.AggregateConfig) rootC.getValue().getChildren(); + assertThat(aggConfig.getLeafNames()).isEqualTo(childNames); for (String childName : childNames) { assertThat(lastConfigClusters).containsKey(childName); @@ -552,7 +552,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig)); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); - assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet().size()).isEqualTo(4); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters()).hasSize(8); // Now that it is released, we should only have A11 rootSub.close(); @@ -561,6 +561,29 @@ public class XdsDependencyManagerTest { .containsExactly("clusterA11"); } + @Test + public void testCdsCycle() throws Exception { + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + clusterMap.put("clusterA", XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterB"))); + clusterMap.put("clusterB", XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterA"))); + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterC"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); + assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + } + @Test public void testMultipleCdsReferToSameEds() { // Create the maps and Update the config to have 2 clusters that refer to the same EDS resource @@ -646,7 +669,7 @@ public class XdsDependencyManagerTest { inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName); - assertThat(config.getClusters().size()).isEqualTo(4); + assertThat(config.getClusters()).hasSize(8); } @Test @@ -697,8 +720,8 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); // Verify that the config is updated as expected - ClusterNameMatcher nameMatcher - = new ClusterNameMatcher(Arrays.asList("root", "clusterA21", "clusterA22")); + ClusterNameMatcher nameMatcher = new ClusterNameMatcher(Arrays.asList( + "root", "clusterA", "clusterA2", "clusterA21", "clusterA22")); inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher)); }