mirror of https://github.com/grpc/grpc-java.git
xds: Fix XdsDepManager aggregate cluster child ordering and loop detection
The children of aggregate clusters have a priority order, so we can't ever throw them in an ordinary set for later iteration. This now detects recusion limits only after subscribing, but that matches our existing behavior in CdsLoadBalancer2. We don't get much value detecting the limit before subscribing and doing so makes watcher types more different. Loops are still a bit broken as they won't be unwatched when orphaned, as they will form a reference loop. In CdsLoadBalancer2, duplicate clusters had duplicate watchers so there was single-ownership and reference cycles couldn't form. Fixing that is a bigger change. Intermediate aggregate clusters are now included in XdsConfig, just for simplicity. It doesn't hurt anything whether they are present or missing. but it required updates to some tests.
This commit is contained in:
parent
4c73999102
commit
4cd7881086
|
@ -18,6 +18,7 @@ package io.grpc.xds;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
|
@ -26,9 +27,9 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate;
|
||||||
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the xDS configuration tree for a specified Listener.
|
* Represents the xDS configuration tree for a specified Listener.
|
||||||
|
@ -191,10 +192,14 @@ final class XdsConfig {
|
||||||
|
|
||||||
// The list of leaf clusters for an aggregate cluster.
|
// The list of leaf clusters for an aggregate cluster.
|
||||||
static final class AggregateConfig implements ClusterChild {
|
static final class AggregateConfig implements ClusterChild {
|
||||||
private final Set<String> leafNames;
|
private final List<String> leafNames;
|
||||||
|
|
||||||
public AggregateConfig(Set<String> leafNames) {
|
public AggregateConfig(List<String> leafNames) {
|
||||||
this.leafNames = checkNotNull(leafNames, "leafNames");
|
this.leafNames = ImmutableList.copyOf(checkNotNull(leafNames, "leafNames"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getLeafNames() {
|
||||||
|
return leafNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||||
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
|
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import io.grpc.InternalLogId;
|
import io.grpc.InternalLogId;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
|
@ -42,12 +43,12 @@ import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -101,7 +102,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
subscription.closed = true;
|
subscription.closed = true;
|
||||||
return; // shutdown() called
|
return; // shutdown() called
|
||||||
}
|
}
|
||||||
addClusterWatcher(clusterName, subscription, 1);
|
addClusterWatcher(clusterName, subscription);
|
||||||
});
|
});
|
||||||
|
|
||||||
return subscription;
|
return subscription;
|
||||||
|
@ -164,7 +165,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
|
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
|
||||||
if (!cdsWatcher.parentContexts.isEmpty()) {
|
if (!cdsWatcher.parentContexts.isEmpty()) {
|
||||||
String msg = String.format("CdsWatcher %s has parent contexts %s",
|
String msg = String.format("CdsWatcher %s has parent contexts %s",
|
||||||
cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
|
cdsWatcher.resourceName(), cdsWatcher.parentContexts);
|
||||||
throw new IllegalStateException(msg);
|
throw new IllegalStateException(msg);
|
||||||
}
|
}
|
||||||
} else if (watcher instanceof EdsWatcher) {
|
} else if (watcher instanceof EdsWatcher) {
|
||||||
|
@ -309,24 +310,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
}
|
}
|
||||||
builder.setVirtualHost(activeVirtualHost);
|
builder.setVirtualHost(activeVirtualHost);
|
||||||
|
|
||||||
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
|
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
|
||||||
getWatchers(ENDPOINT_RESOURCE);
|
LinkedHashSet<String> ancestors = new LinkedHashSet<>();
|
||||||
Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
|
for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) {
|
||||||
getWatchers(CLUSTER_RESOURCE);
|
addConfigForCluster(clusters, cluster, ancestors);
|
||||||
|
}
|
||||||
// Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
|
for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
|
||||||
List<String> topLevelClusters =
|
builder.addCluster(me.getKey(), me.getValue());
|
||||||
cdsWatchers.values().stream()
|
}
|
||||||
.filter(XdsDependencyManager::isTopLevelCluster)
|
|
||||||
.map(XdsWatcherBase<?>::resourceName)
|
|
||||||
.distinct()
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// Flatten multi-level aggregates into lists of leaf clusters
|
|
||||||
Set<String> leafNames =
|
|
||||||
addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
|
|
||||||
|
|
||||||
addLeavesToBuilder(builder, edsWatchers, leafNames);
|
|
||||||
|
|
||||||
return StatusOr.fromValue(builder.build());
|
return StatusOr.fromValue(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -344,111 +335,81 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
return tTypeWatchers.watchers;
|
return tTypeWatchers.watchers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addLeavesToBuilder(
|
private void addConfigForCluster(
|
||||||
XdsConfig.XdsConfigBuilder builder,
|
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
|
||||||
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
|
String clusterName,
|
||||||
Set<String> leafNames) {
|
@SuppressWarnings("NonApiType") // Need order-preserving set for errors
|
||||||
for (String clusterName : leafNames) {
|
LinkedHashSet<String> ancestors) {
|
||||||
CdsWatcher cdsWatcher = getCluster(clusterName);
|
if (clusters.containsKey(clusterName)) {
|
||||||
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
|
return;
|
||||||
|
}
|
||||||
|
if (ancestors.contains(clusterName)) {
|
||||||
|
clusters.put(clusterName, StatusOr.fromStatus(
|
||||||
|
Status.INTERNAL.withDescription(
|
||||||
|
"Aggregate cluster cycle detected: " + ancestors)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
|
||||||
|
clusters.put(clusterName, StatusOr.fromStatus(
|
||||||
|
Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!cdsUpdateOr.hasValue()) {
|
CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
|
||||||
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
|
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
|
||||||
continue;
|
if (!cdsWatcherDataOr.hasValue()) {
|
||||||
}
|
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
|
XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
|
||||||
if (cdsUpdate.clusterType() == ClusterType.EDS) {
|
XdsConfig.XdsClusterConfig.ClusterChild child;
|
||||||
|
switch (cdsUpdate.clusterType()) {
|
||||||
|
case AGGREGATE:
|
||||||
|
// Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
|
||||||
|
// preserves the priority across all aggregate clusters
|
||||||
|
LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
|
||||||
|
ancestors.add(clusterName);
|
||||||
|
for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
|
||||||
|
addConfigForCluster(clusters, childCluster, ancestors);
|
||||||
|
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
|
||||||
|
if (!config.hasValue()) {
|
||||||
|
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
||||||
|
"Unable to get leaves for " + clusterName + ": "
|
||||||
|
+ config.getStatus().getDescription())));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
|
||||||
|
if (children instanceof AggregateConfig) {
|
||||||
|
leafNames.addAll(((AggregateConfig) children).getLeafNames());
|
||||||
|
} else {
|
||||||
|
leafNames.add(childCluster);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ancestors.remove(clusterName);
|
||||||
|
|
||||||
|
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
|
||||||
|
break;
|
||||||
|
case EDS:
|
||||||
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
|
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
|
||||||
edsWatchers.get(cdsWatcher.getEdsServiceName());
|
getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName());
|
||||||
EndpointConfig child;
|
|
||||||
if (edsWatcher != null) {
|
if (edsWatcher != null) {
|
||||||
child = new EndpointConfig(edsWatcher.getData());
|
child = new EndpointConfig(edsWatcher.getData());
|
||||||
} else {
|
} else {
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
||||||
"EDS resource not found for cluster " + clusterName)));
|
"EDS resource not found for cluster " + clusterName)));
|
||||||
}
|
}
|
||||||
builder.addCluster(clusterName, StatusOr.fromValue(
|
break;
|
||||||
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
case LOGICAL_DNS:
|
||||||
} else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
|
// TODO get the resolved endpoint configuration
|
||||||
builder.addCluster(clusterName, StatusOr.fromStatus(
|
child = new EndpointConfig(StatusOr.fromStatus(
|
||||||
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
|
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
|
||||||
}
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
|
||||||
}
|
}
|
||||||
}
|
clusters.put(clusterName, StatusOr.fromValue(
|
||||||
|
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
||||||
// Adds the top-level clusters to the builder and returns the leaf cluster names
|
|
||||||
private Set<String> addTopLevelClustersToBuilder(
|
|
||||||
XdsConfig.XdsConfigBuilder builder,
|
|
||||||
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
|
|
||||||
Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
|
|
||||||
List<String> topLevelClusters) {
|
|
||||||
|
|
||||||
Set<String> leafClusterNames = new HashSet<>();
|
|
||||||
for (String clusterName : topLevelClusters) {
|
|
||||||
CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
|
|
||||||
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
|
|
||||||
if (!cdsWatcher.hasDataValue()) {
|
|
||||||
builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
|
|
||||||
XdsConfig.XdsClusterConfig.ClusterChild child;
|
|
||||||
switch (cdsUpdate.clusterType()) {
|
|
||||||
case AGGREGATE:
|
|
||||||
Set<String> leafNames = new HashSet<>();
|
|
||||||
addLeafNames(leafNames, cdsUpdate);
|
|
||||||
child = new AggregateConfig(leafNames);
|
|
||||||
leafClusterNames.addAll(leafNames);
|
|
||||||
break;
|
|
||||||
case EDS:
|
|
||||||
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
|
|
||||||
edsWatchers.get(cdsWatcher.getEdsServiceName());
|
|
||||||
if (edsWatcher != null) {
|
|
||||||
child = new EndpointConfig(edsWatcher.getData());
|
|
||||||
} else {
|
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
|
||||||
"EDS resource not found for cluster " + clusterName)));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case LOGICAL_DNS:
|
|
||||||
// TODO get the resolved endpoint configuration
|
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(
|
|
||||||
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
|
|
||||||
}
|
|
||||||
builder.addCluster(clusterName, StatusOr.fromValue(
|
|
||||||
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
|
||||||
}
|
|
||||||
|
|
||||||
return leafClusterNames;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
|
|
||||||
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
|
|
||||||
if (leafNames.contains(cluster)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
|
|
||||||
if (data == null || !data.hasValue() || data.getValue() == null) {
|
|
||||||
leafNames.add(cluster);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
|
|
||||||
addLeafNames(leafNames, data.getValue());
|
|
||||||
} else {
|
|
||||||
leafNames.add(cluster);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isTopLevelCluster(
|
|
||||||
XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
|
|
||||||
return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
|
|
||||||
.anyMatch(depth -> depth == 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -467,14 +428,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
addWatcher(new EdsWatcher(edsServiceName, parentContext));
|
addWatcher(new EdsWatcher(edsServiceName, parentContext));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
|
private void addClusterWatcher(String clusterName, Object parentContext) {
|
||||||
CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
|
CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
|
||||||
if (watcher != null) {
|
if (watcher != null) {
|
||||||
watcher.parentContexts.put(parentContext, depth);
|
watcher.parentContexts.add(parentContext);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
addWatcher(new CdsWatcher(clusterName, parentContext, depth));
|
addWatcher(new CdsWatcher(clusterName, parentContext));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
|
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
|
||||||
|
@ -494,9 +455,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
|
|
||||||
deletedClusters.forEach(watcher ->
|
deletedClusters.forEach(watcher ->
|
||||||
cancelClusterWatcherTree(getCluster(watcher), newParentContext));
|
cancelClusterWatcherTree(getCluster(watcher), newParentContext));
|
||||||
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
|
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
|
||||||
} else {
|
} else {
|
||||||
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
|
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -805,11 +766,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
}
|
}
|
||||||
|
|
||||||
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
|
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
|
||||||
Map<Object, Integer> parentContexts = new HashMap<>();
|
Set<Object> parentContexts = new HashSet<>();
|
||||||
|
|
||||||
CdsWatcher(String resourceName, Object parentContext, int depth) {
|
CdsWatcher(String resourceName, Object parentContext) {
|
||||||
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
||||||
this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
|
this.parentContexts.add(checkNotNull(parentContext, "parentContext"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -829,14 +790,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
break;
|
break;
|
||||||
case AGGREGATE:
|
case AGGREGATE:
|
||||||
Object parentContext = this;
|
Object parentContext = this;
|
||||||
int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
|
|
||||||
if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
|
|
||||||
logger.log(XdsLogger.XdsLogLevel.WARNING,
|
|
||||||
"Cluster recursion depth limit exceeded for cluster {0}", resourceName());
|
|
||||||
Status error = Status.UNAVAILABLE.withDescription(
|
|
||||||
"aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
|
|
||||||
setDataAsStatus(error);
|
|
||||||
}
|
|
||||||
if (hasDataValue()) {
|
if (hasDataValue()) {
|
||||||
Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
|
Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
|
||||||
? new HashSet<>(getData().getValue().prioritizedClusterNames())
|
? new HashSet<>(getData().getValue().prioritizedClusterNames())
|
||||||
|
@ -847,21 +800,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
deletedClusters.forEach((cluster)
|
deletedClusters.forEach((cluster)
|
||||||
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));
|
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));
|
||||||
|
|
||||||
if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
|
setData(update);
|
||||||
setData(update);
|
Set<String> addedClusters = Sets.difference(newNames, oldNames);
|
||||||
Set<String> addedClusters = Sets.difference(newNames, oldNames);
|
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext));
|
||||||
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
|
} else {
|
||||||
}
|
|
||||||
|
|
||||||
} else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
|
|
||||||
setData(update);
|
setData(update);
|
||||||
update.prioritizedClusterNames()
|
update.prioritizedClusterNames()
|
||||||
.forEach(name -> addClusterWatcher(name, parentContext, depth));
|
.forEach(name -> addClusterWatcher(name, parentContext));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
Status error = Status.UNAVAILABLE.withDescription(
|
Status error = Status.UNAVAILABLE.withDescription(
|
||||||
"aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
|
"unknown cluster type in " + resourceName() + " " + update.clusterType());
|
||||||
setDataAsStatus(error);
|
setDataAsStatus(error);
|
||||||
}
|
}
|
||||||
maybePublishConfig();
|
maybePublishConfig();
|
||||||
|
|
|
@ -18,7 +18,6 @@ package io.grpc.xds;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static io.grpc.StatusMatcher.statusHasCode;
|
import static io.grpc.StatusMatcher.statusHasCode;
|
||||||
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE;
|
|
||||||
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS;
|
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS;
|
||||||
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
|
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
|
||||||
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
|
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
|
||||||
|
@ -239,9 +238,10 @@ public class XdsDependencyManagerTest {
|
||||||
testWatcher.lastConfig.getClusters();
|
testWatcher.lastConfig.getClusters();
|
||||||
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
|
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
|
||||||
StatusOr<XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
|
StatusOr<XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
|
||||||
CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
|
assertThat(rootC.getValue().getChildren()).isInstanceOf(XdsClusterConfig.AggregateConfig.class);
|
||||||
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
|
XdsClusterConfig.AggregateConfig aggConfig =
|
||||||
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);
|
(XdsClusterConfig.AggregateConfig) rootC.getValue().getChildren();
|
||||||
|
assertThat(aggConfig.getLeafNames()).isEqualTo(childNames);
|
||||||
|
|
||||||
for (String childName : childNames) {
|
for (String childName : childNames) {
|
||||||
assertThat(lastConfigClusters).containsKey(childName);
|
assertThat(lastConfigClusters).containsKey(childName);
|
||||||
|
@ -552,7 +552,7 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(
|
controlPlaneService.setXdsConfig(
|
||||||
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig));
|
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig));
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet().size()).isEqualTo(4);
|
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters()).hasSize(8);
|
||||||
|
|
||||||
// Now that it is released, we should only have A11
|
// Now that it is released, we should only have A11
|
||||||
rootSub.close();
|
rootSub.close();
|
||||||
|
@ -561,6 +561,29 @@ public class XdsDependencyManagerTest {
|
||||||
.containsExactly("clusterA11");
|
.containsExactly("clusterA11");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCdsCycle() throws Exception {
|
||||||
|
RouteConfiguration routeConfig =
|
||||||
|
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA");
|
||||||
|
Map<String, Message> clusterMap = new HashMap<>();
|
||||||
|
Map<String, Message> edsMap = new HashMap<>();
|
||||||
|
clusterMap.put("clusterA", XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterB")));
|
||||||
|
clusterMap.put("clusterB", XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterA")));
|
||||||
|
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterC");
|
||||||
|
controlPlaneService.setXdsConfig(
|
||||||
|
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
|
||||||
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
|
||||||
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||||
|
|
||||||
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
|
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
|
||||||
|
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultipleCdsReferToSameEds() {
|
public void testMultipleCdsReferToSameEds() {
|
||||||
// Create the maps and Update the config to have 2 clusters that refer to the same EDS resource
|
// Create the maps and Update the config to have 2 clusters that refer to the same EDS resource
|
||||||
|
@ -646,7 +669,7 @@ public class XdsDependencyManagerTest {
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName);
|
assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName);
|
||||||
assertThat(config.getClusters().size()).isEqualTo(4);
|
assertThat(config.getClusters()).hasSize(8);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -697,8 +720,8 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||||
|
|
||||||
// Verify that the config is updated as expected
|
// Verify that the config is updated as expected
|
||||||
ClusterNameMatcher nameMatcher
|
ClusterNameMatcher nameMatcher = new ClusterNameMatcher(Arrays.asList(
|
||||||
= new ClusterNameMatcher(Arrays.asList("root", "clusterA21", "clusterA22"));
|
"root", "clusterA", "clusterA2", "clusterA21", "clusterA22"));
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher));
|
inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue