diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index ec8f3dc076..1f464aa132 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.StatusOr; import io.grpc.xds.XdsClusterResource.CdsUpdate; @@ -26,9 +27,9 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import java.io.Closeable; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * Represents the xDS configuration tree for a specified Listener. @@ -191,10 +192,14 @@ final class XdsConfig { // The list of leaf clusters for an aggregate cluster. static final class AggregateConfig implements ClusterChild { - private final Set leafNames; + private final List leafNames; - public AggregateConfig(Set leafNames) { - this.leafNames = checkNotNull(leafNames, "leafNames"); + public AggregateConfig(List leafNames) { + this.leafNames = ImmutableList.copyOf(checkNotNull(leafNames, "leafNames")); + } + + public List getLeafNames() { + return leafNames; } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index d786e525c0..7bbd0064ed 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -22,6 +22,7 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate; import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import io.grpc.InternalLogId; import io.grpc.NameResolver; @@ -42,12 +43,12 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -101,7 +102,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi subscription.closed = true; return; // shutdown() called } - addClusterWatcher(clusterName, subscription, 1); + addClusterWatcher(clusterName, subscription); }); return subscription; @@ -164,7 +165,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi CdsWatcher cdsWatcher = (CdsWatcher) watcher; if (!cdsWatcher.parentContexts.isEmpty()) { String msg = String.format("CdsWatcher %s has parent contexts %s", - cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet()); + cdsWatcher.resourceName(), cdsWatcher.parentContexts); throw new IllegalStateException(msg); } } else if (watcher instanceof EdsWatcher) { @@ -309,24 +310,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } builder.setVirtualHost(activeVirtualHost); - Map> edsWatchers = - getWatchers(ENDPOINT_RESOURCE); - Map> cdsWatchers = - getWatchers(CLUSTER_RESOURCE); - - // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters - List topLevelClusters = - cdsWatchers.values().stream() - .filter(XdsDependencyManager::isTopLevelCluster) - .map(XdsWatcherBase::resourceName) - .distinct() - .collect(Collectors.toList()); - - // Flatten multi-level aggregates into lists of leaf clusters - Set leafNames = - addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters); - - addLeavesToBuilder(builder, edsWatchers, leafNames); + Map> clusters = new HashMap<>(); + LinkedHashSet ancestors = new LinkedHashSet<>(); + for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) { + addConfigForCluster(clusters, cluster, ancestors); + } + for (Map.Entry> me : clusters.entrySet()) { + builder.addCluster(me.getKey(), me.getValue()); + } return StatusOr.fromValue(builder.build()); } @@ -344,111 +335,81 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return tTypeWatchers.watchers; } - private void addLeavesToBuilder( - XdsConfig.XdsConfigBuilder builder, - Map> edsWatchers, - Set leafNames) { - for (String clusterName : leafNames) { - CdsWatcher cdsWatcher = getCluster(clusterName); - StatusOr cdsUpdateOr = cdsWatcher.getData(); + private void addConfigForCluster( + Map> clusters, + String clusterName, + @SuppressWarnings("NonApiType") // Need order-preserving set for errors + LinkedHashSet ancestors) { + if (clusters.containsKey(clusterName)) { + return; + } + if (ancestors.contains(clusterName)) { + clusters.put(clusterName, StatusOr.fromStatus( + Status.INTERNAL.withDescription( + "Aggregate cluster cycle detected: " + ancestors))); + return; + } + if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) { + clusters.put(clusterName, StatusOr.fromStatus( + Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors))); + return; + } - if (!cdsUpdateOr.hasValue()) { - builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus())); - continue; - } + CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); + StatusOr cdsWatcherDataOr = cdsWatcher.getData(); + if (!cdsWatcherDataOr.hasValue()) { + clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); + return; + } - XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); - if (cdsUpdate.clusterType() == ClusterType.EDS) { + XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); + XdsConfig.XdsClusterConfig.ClusterChild child; + switch (cdsUpdate.clusterType()) { + case AGGREGATE: + // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it + // preserves the priority across all aggregate clusters + LinkedHashSet leafNames = new LinkedHashSet(); + ancestors.add(clusterName); + for (String childCluster : cdsUpdate.prioritizedClusterNames()) { + addConfigForCluster(clusters, childCluster, ancestors); + StatusOr config = clusters.get(childCluster); + if (!config.hasValue()) { + clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( + "Unable to get leaves for " + clusterName + ": " + + config.getStatus().getDescription()))); + return; + } + XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren(); + if (children instanceof AggregateConfig) { + leafNames.addAll(((AggregateConfig) children).getLeafNames()); + } else { + leafNames.add(childCluster); + } + } + ancestors.remove(clusterName); + + child = new AggregateConfig(ImmutableList.copyOf(leafNames)); + break; + case EDS: XdsWatcherBase edsWatcher = - edsWatchers.get(cdsWatcher.getEdsServiceName()); - EndpointConfig child; + getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName()); if (edsWatcher != null) { child = new EndpointConfig(edsWatcher.getData()); } else { child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( "EDS resource not found for cluster " + clusterName))); } - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); - } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { - builder.addCluster(clusterName, StatusOr.fromStatus( + break; + case LOGICAL_DNS: + // TODO get the resolved endpoint configuration + child = new EndpointConfig(StatusOr.fromStatus( Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); - } + break; + default: + throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); } - } - - // Adds the top-level clusters to the builder and returns the leaf cluster names - private Set addTopLevelClustersToBuilder( - XdsConfig.XdsConfigBuilder builder, - Map> edsWatchers, - Map> cdsWatchers, - List topLevelClusters) { - - Set leafClusterNames = new HashSet<>(); - for (String clusterName : topLevelClusters) { - CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName); - StatusOr cdsWatcherDataOr = cdsWatcher.getData(); - if (!cdsWatcher.hasDataValue()) { - builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); - continue; - } - - XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); - XdsConfig.XdsClusterConfig.ClusterChild child; - switch (cdsUpdate.clusterType()) { - case AGGREGATE: - Set leafNames = new HashSet<>(); - addLeafNames(leafNames, cdsUpdate); - child = new AggregateConfig(leafNames); - leafClusterNames.addAll(leafNames); - break; - case EDS: - XdsWatcherBase edsWatcher = - edsWatchers.get(cdsWatcher.getEdsServiceName()); - if (edsWatcher != null) { - child = new EndpointConfig(edsWatcher.getData()); - } else { - child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( - "EDS resource not found for cluster " + clusterName))); - } - break; - case LOGICAL_DNS: - // TODO get the resolved endpoint configuration - child = new EndpointConfig(StatusOr.fromStatus( - Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); - break; - default: - throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); - } - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); - } - - return leafClusterNames; - } - - private void addLeafNames(Set leafNames, XdsClusterResource.CdsUpdate cdsUpdate) { - for (String cluster : cdsUpdate.prioritizedClusterNames()) { - if (leafNames.contains(cluster)) { - continue; - } - StatusOr data = getCluster(cluster).getData(); - if (data == null || !data.hasValue() || data.getValue() == null) { - leafNames.add(cluster); - continue; - } - if (data.getValue().clusterType() == ClusterType.AGGREGATE) { - addLeafNames(leafNames, data.getValue()); - } else { - leafNames.add(cluster); - } - } - } - - private static boolean isTopLevelCluster( - XdsWatcherBase cdsWatcher) { - return ((CdsWatcher)cdsWatcher).parentContexts.values().stream() - .anyMatch(depth -> depth == 1); + clusters.put(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } @Override @@ -467,14 +428,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi addWatcher(new EdsWatcher(edsServiceName, parentContext)); } - private void addClusterWatcher(String clusterName, Object parentContext, int depth) { + private void addClusterWatcher(String clusterName, Object parentContext) { CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); if (watcher != null) { - watcher.parentContexts.put(parentContext, depth); + watcher.parentContexts.add(parentContext); return; } - addWatcher(new CdsWatcher(clusterName, parentContext, depth)); + addWatcher(new CdsWatcher(clusterName, parentContext)); } private void updateRoutes(List virtualHosts, Object newParentContext, @@ -494,9 +455,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi deletedClusters.forEach(watcher -> cancelClusterWatcherTree(getCluster(watcher), newParentContext)); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); } else { - newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); } } @@ -805,11 +766,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class CdsWatcher extends XdsWatcherBase { - Map parentContexts = new HashMap<>(); + Set parentContexts = new HashSet<>(); - CdsWatcher(String resourceName, Object parentContext, int depth) { + CdsWatcher(String resourceName, Object parentContext) { super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); - this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth); + this.parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -829,14 +790,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi break; case AGGREGATE: Object parentContext = this; - int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1; - if (depth > MAX_CLUSTER_RECURSION_DEPTH) { - logger.log(XdsLogger.XdsLogLevel.WARNING, - "Cluster recursion depth limit exceeded for cluster {0}", resourceName()); - Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); - setDataAsStatus(error); - } if (hasDataValue()) { Set oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE ? new HashSet<>(getData().getValue().prioritizedClusterNames()) @@ -847,21 +800,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); - if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { - setData(update); - Set addedClusters = Sets.difference(newNames, oldNames); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth)); - } - - } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { + setData(update); + Set addedClusters = Sets.difference(newNames, oldNames); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext)); + } else { setData(update); update.prioritizedClusterNames() - .forEach(name -> addClusterWatcher(name, parentContext, depth)); + .forEach(name -> addClusterWatcher(name, parentContext)); } break; default: Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); + "unknown cluster type in " + resourceName() + " " + update.clusterType()); setDataAsStatus(error); } maybePublishConfig(); diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 14f554412a..da960e1e13 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -18,7 +18,6 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.StatusMatcher.statusHasCode; -import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE; import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS; import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; @@ -239,9 +238,10 @@ public class XdsDependencyManagerTest { testWatcher.lastConfig.getClusters(); assertThat(lastConfigClusters).hasSize(childNames.size() + 1); StatusOr rootC = lastConfigClusters.get(rootName); - CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); - assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE); - assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames); + assertThat(rootC.getValue().getChildren()).isInstanceOf(XdsClusterConfig.AggregateConfig.class); + XdsClusterConfig.AggregateConfig aggConfig = + (XdsClusterConfig.AggregateConfig) rootC.getValue().getChildren(); + assertThat(aggConfig.getLeafNames()).isEqualTo(childNames); for (String childName : childNames) { assertThat(lastConfigClusters).containsKey(childName); @@ -552,7 +552,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig)); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); - assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet().size()).isEqualTo(4); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters()).hasSize(8); // Now that it is released, we should only have A11 rootSub.close(); @@ -561,6 +561,29 @@ public class XdsDependencyManagerTest { .containsExactly("clusterA11"); } + @Test + public void testCdsCycle() throws Exception { + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + clusterMap.put("clusterA", XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterB"))); + clusterMap.put("clusterB", XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterA"))); + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterC"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); + assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + } + @Test public void testMultipleCdsReferToSameEds() { // Create the maps and Update the config to have 2 clusters that refer to the same EDS resource @@ -646,7 +669,7 @@ public class XdsDependencyManagerTest { inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName); - assertThat(config.getClusters().size()).isEqualTo(4); + assertThat(config.getClusters()).hasSize(8); } @Test @@ -697,8 +720,8 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); // Verify that the config is updated as expected - ClusterNameMatcher nameMatcher - = new ClusterNameMatcher(Arrays.asList("root", "clusterA21", "clusterA22")); + ClusterNameMatcher nameMatcher = new ClusterNameMatcher(Arrays.asList( + "root", "clusterA", "clusterA2", "clusterA21", "clusterA22")); inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher)); }