xds: Use tracing GC in XdsDepManager

Reference counting doesn't release cycles, so swap to a tracing garbage
collector. This greatly simplifies the code as well, as diffing is no
longer necessary. (If vanilla reference counting was used, diffing
wouldn't have been necessary either as you just increment all the new
objects and decrement the old ones. But that doesn't work when use a set
instead of an integer.)
This commit is contained in:
Eric Anderson 2025-05-29 17:08:33 -07:00
parent dc192f5c5e
commit 1fd29bc804
3 changed files with 212 additions and 268 deletions

View File

@ -19,12 +19,9 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.XdsClient.ResourceUpdate; import static io.grpc.xds.client.XdsClient.ResourceUpdate;
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.grpc.InternalLogId;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusOr; import io.grpc.StatusOr;
@ -36,7 +33,6 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -61,22 +57,21 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
private final String listenerName;
private final XdsClient xdsClient; private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher; private final XdsConfigWatcher xdsConfigWatcher;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final String dataPlaneAuthority; private final String dataPlaneAuthority;
private final InternalLogId logId;
private final XdsLogger logger;
private StatusOr<XdsConfig> lastUpdate = null; private StatusOr<XdsConfig> lastUpdate = null;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>(); private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority, SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName, NameResolver.Args nameResolverArgs, String listenerName, NameResolver.Args nameResolverArgs,
ScheduledExecutorService scheduler) { ScheduledExecutorService scheduler) {
logId = InternalLogId.allocate("xds-dependency-manager", listenerName); this.listenerName = checkNotNull(listenerName, "listenerName");
logger = XdsLogger.withLogId(logId);
this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext"); this.syncContext = checkNotNull(syncContext, "syncContext");
@ -102,7 +97,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
subscription.closed = true; subscription.closed = true;
return; // shutdown() called return; // shutdown() called
} }
addClusterWatcher(clusterName, subscription); subscriptions.add(subscription);
addClusterWatcher(clusterName);
}); });
return subscription; return subscription;
@ -117,73 +113,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext); xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
} }
private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
if (watcher == null) {
return;
}
watcher.parentContexts.remove(parentContext);
if (watcher.parentContexts.isEmpty()) {
cancelWatcher(watcher);
}
}
private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
if (watcher == null) {
return;
}
watcher.parentContexts.remove(parentContext);
if (watcher.parentContexts.isEmpty()) {
cancelWatcher(watcher);
}
}
private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
if (watcher == null) {
return;
}
if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
throwIfParentContextsNotEmpty(watcher);
}
watcher.cancelled = true;
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;
if (getWatchers(type).remove(resourceName) == null) {
logger.log(DEBUG, "Trying to cancel watcher {0}, but it isn't watched", watcher);
return;
}
xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
}
private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
if (watcher instanceof CdsWatcher) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
if (!cdsWatcher.parentContexts.isEmpty()) {
String msg = String.format("CdsWatcher %s has parent contexts %s",
cdsWatcher.resourceName(), cdsWatcher.parentContexts);
throw new IllegalStateException(msg);
}
} else if (watcher instanceof EdsWatcher) {
EdsWatcher edsWatcher = (EdsWatcher) watcher;
if (!edsWatcher.parentContexts.isEmpty()) {
String msg = String.format("CdsWatcher %s has parent contexts %s",
edsWatcher.resourceName(), edsWatcher.parentContexts);
throw new IllegalStateException(msg);
}
}
}
public void shutdown() { public void shutdown() {
syncContext.execute(() -> { syncContext.execute(() -> {
for (TypeWatchers<?> watchers : resourceWatchers.values()) { for (TypeWatchers<?> watchers : resourceWatchers.values()) {
shutdownWatchersForType(watchers); shutdownWatchersForType(watchers);
} }
resourceWatchers.clear(); resourceWatchers.clear();
subscriptions.clear();
}); });
} }
@ -197,54 +133,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private void releaseSubscription(ClusterSubscription subscription) { private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription"); checkNotNull(subscription, "subscription");
String clusterName = subscription.getClusterName();
syncContext.execute(() -> { syncContext.execute(() -> {
if (subscription.closed) { if (subscription.closed) {
return; return;
} }
subscription.closed = true; subscription.closed = true;
XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher if (!subscriptions.remove(subscription)) {
= getWatchers(CLUSTER_RESOURCE).get(clusterName);
if (cdsWatcher == null) {
return; // shutdown() called return; // shutdown() called
} }
cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
maybePublishConfig(); maybePublishConfig();
}); });
} }
private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
checkNotNull(root, "root");
cancelCdsWatcher(root, parentContext);
if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
return;
}
XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
switch (cdsUpdate.clusterType()) {
case EDS:
String edsServiceName = root.getEdsServiceName();
EdsWatcher edsWatcher = (EdsWatcher) getWatchers(ENDPOINT_RESOURCE).get(edsServiceName);
cancelEdsWatcher(edsWatcher, root);
break;
case AGGREGATE:
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
CdsWatcher clusterWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(cluster);
if (clusterWatcher != null) {
cancelClusterWatcherTree(clusterWatcher, root);
}
}
break;
case LOGICAL_DNS:
// no eds needed, so everything happens in cancelCdsWatcher()
break;
default:
throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
}
}
/** /**
* Check if all resources have results, and if so, generate a new XdsConfig and send it to all * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
* the watchers. * the watchers.
@ -274,27 +174,40 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@VisibleForTesting @VisibleForTesting
StatusOr<XdsConfig> buildUpdate() { StatusOr<XdsConfig> buildUpdate() {
// Create a config and discard any watchers not accessed
WatcherTracer tracer = new WatcherTracer(resourceWatchers);
StatusOr<XdsConfig> config = buildUpdate(
tracer, listenerName, dataPlaneAuthority, subscriptions);
tracer.closeUnusedWatchers();
return config;
}
private static StatusOr<XdsConfig> buildUpdate(
WatcherTracer tracer,
String listenerName,
String dataPlaneAuthority,
Set<ClusterSubscription> subscriptions) {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
// Iterate watchers and build the XdsConfig // Iterate watchers and build the XdsConfig
// Will only be 1 listener and 1 route resource XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
RdsUpdateSupplier routeSource = null; = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher : if (ldsWatcher == null) {
getWatchers(XdsListenerResource.getInstance()).values()) { return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
if (!ldsWatcher.getData().hasValue()) { "Bug: No listener watcher found for " + listenerName));
return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
}
XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
builder.setListener(ldsUpdate);
routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
} }
if (!ldsWatcher.getData().hasValue()) {
return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
}
XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
builder.setListener(ldsUpdate);
RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
if (routeSource == null) { if (routeSource == null) {
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"Bug: No route source found for listener " + dataPlaneAuthority)); "Bug: No route source found for listener " + dataPlaneAuthority));
} }
StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate(); StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
if (!statusOrRdsUpdate.hasValue()) { if (!statusOrRdsUpdate.hasValue()) {
return StatusOr.fromStatus(statusOrRdsUpdate.getStatus()); return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
@ -312,8 +225,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>(); Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
LinkedHashSet<String> ancestors = new LinkedHashSet<>(); LinkedHashSet<String> ancestors = new LinkedHashSet<>();
for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) { for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
addConfigForCluster(clusters, cluster, ancestors); addConfigForCluster(clusters, cluster, ancestors, tracer);
}
for (ClusterSubscription subscription : subscriptions) {
addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
} }
for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) { for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
builder.addCluster(me.getKey(), me.getValue()); builder.addCluster(me.getKey(), me.getValue());
@ -335,11 +251,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return tTypeWatchers.watchers; return tTypeWatchers.watchers;
} }
private void addConfigForCluster( private static void addConfigForCluster(
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters, Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
String clusterName, String clusterName,
@SuppressWarnings("NonApiType") // Need order-preserving set for errors @SuppressWarnings("NonApiType") // Need order-preserving set for errors
LinkedHashSet<String> ancestors) { LinkedHashSet<String> ancestors,
WatcherTracer tracer) {
if (clusters.containsKey(clusterName)) { if (clusters.containsKey(clusterName)) {
return; return;
} }
@ -355,7 +272,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return; return;
} }
CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData(); StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
if (!cdsWatcherDataOr.hasValue()) { if (!cdsWatcherDataOr.hasValue()) {
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
@ -371,7 +288,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
LinkedHashSet<String> leafNames = new LinkedHashSet<String>(); LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
ancestors.add(clusterName); ancestors.add(clusterName);
for (String childCluster : cdsUpdate.prioritizedClusterNames()) { for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
addConfigForCluster(clusters, childCluster, ancestors); addConfigForCluster(clusters, childCluster, ancestors, tracer);
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster); StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
if (!config.hasValue()) { if (!config.hasValue()) {
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
@ -392,7 +309,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
break; break;
case EDS: case EDS:
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher = XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName()); tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
if (edsWatcher != null) { if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData()); child = new EndpointConfig(edsWatcher.getData());
} else { } else {
@ -412,53 +329,35 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
} }
@Override private void addRdsWatcher(String resourceName) {
public String toString() { if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
return logId.toString();
}
private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
EdsWatcher watcher
= (EdsWatcher) getWatchers(XdsEndpointResource.getInstance()).get(edsServiceName);
if (watcher != null) {
watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
return; return;
} }
addWatcher(new EdsWatcher(edsServiceName, parentContext)); addWatcher(new RdsWatcher(resourceName));
} }
private void addClusterWatcher(String clusterName, Object parentContext) { private void addEdsWatcher(String edsServiceName) {
CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
if (watcher != null) {
watcher.parentContexts.add(parentContext);
return; return;
} }
addWatcher(new CdsWatcher(clusterName, parentContext)); addWatcher(new EdsWatcher(edsServiceName));
} }
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext, private void addClusterWatcher(String clusterName) {
List<VirtualHost> oldVirtualHosts, boolean sameParentContext) { if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
VirtualHost oldVirtualHost = return;
RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority); }
addWatcher(new CdsWatcher(clusterName));
}
private void updateRoutes(List<VirtualHost> virtualHosts) {
VirtualHost virtualHost = VirtualHost virtualHost =
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority); RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost); Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost); newClusters.forEach((cluster) -> addClusterWatcher(cluster));
if (sameParentContext) {
// Calculate diffs.
Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
deletedClusters.forEach(watcher ->
cancelClusterWatcherTree(getCluster(watcher), newParentContext));
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
} else {
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext));
}
} }
private String nodeInfo() { private String nodeInfo() {
@ -489,10 +388,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return clusters; return clusters;
} }
private CdsWatcher getCluster(String clusterName) {
return (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName);
}
private static class TypeWatchers<T extends ResourceUpdate> { private static class TypeWatchers<T extends ResourceUpdate> {
// Key is resource name // Key is resource name
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>(); final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
@ -529,6 +424,64 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
} }
} }
/** State for tracing garbage collector. */
private static final class WatcherTracer {
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
this.resourceWatchers = resourceWatchers;
this.usedWatchers = new HashMap<>();
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
usedWatchers.put(type, newTypeWatchers(type));
}
}
private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
XdsResourceType<T> type) {
return new TypeWatchers<T>(type);
}
public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
XdsResourceType<T> resourceType, String name) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
if (typeWatchers == null) {
return null;
}
assert typeWatchers.resourceType == resourceType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
if (watcher == null) {
return null;
}
@SuppressWarnings("unchecked")
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
usedTypeWatchers.watchers.put(name, watcher);
return watcher;
}
/** Shut down unused watchers. */
public void closeUnusedWatchers() {
boolean changed = false; // Help out the GC by preferring old objects
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
TypeWatchers<?> orig = resourceWatchers.get(type);
TypeWatchers<?> used = usedWatchers.get(type);
for (String name : orig.watchers.keySet()) {
if (used.watchers.containsKey(name)) {
continue;
}
orig.watchers.get(name).close();
changed = true;
}
}
if (changed) {
resourceWatchers.putAll(usedWatchers);
}
}
}
private abstract class XdsWatcherBase<T extends ResourceUpdate> private abstract class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T> { implements ResourceWatcher<T> {
private final XdsResourceType<T> type; private final XdsResourceType<T> type;
@ -571,6 +524,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
maybePublishConfig(); maybePublishConfig();
} }
public void close() {
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
}
boolean missingResult() { boolean missingResult() {
return data == null; return data == null;
} }
@ -633,24 +590,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
virtualHosts = httpConnectionManager.virtualHosts(); virtualHosts = httpConnectionManager.virtualHosts();
rdsName = httpConnectionManager.rdsName(); rdsName = httpConnectionManager.rdsName();
} }
StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
? activeRdsUpdate.getValue().virtualHosts
: Collections.emptyList();
boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
if (changedRdsName) {
cleanUpRdsWatcher();
}
if (virtualHosts != null) { if (virtualHosts != null) {
// No RDS watcher since we are getting RDS updates via LDS // No RDS watcher since we are getting RDS updates via LDS
updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null); updateRoutes(virtualHosts);
this.rdsName = null; this.rdsName = null;
} else if (changedRdsName) { } else {
this.rdsName = rdsName; this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName)); addRdsWatcher(rdsName);
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
} }
setData(update); setData(update);
@ -666,36 +613,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
checkArgument(resourceName().equals(resourceName), "Resource name does not match"); checkArgument(resourceName().equals(resourceName), "Resource name does not match");
setDataAsStatus(Status.UNAVAILABLE.withDescription( setDataAsStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo())); toContextString() + " does not exist" + nodeInfo()));
cleanUpRdsWatcher();
rdsName = null; rdsName = null;
maybePublishConfig(); maybePublishConfig();
} }
private void cleanUpRdsWatcher() { private RdsWatcher getRdsWatcher(WatcherTracer tracer) {
RdsWatcher oldRdsWatcher = getRdsWatcher();
if (oldRdsWatcher != null) {
cancelWatcher(oldRdsWatcher);
logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
// Cleanup clusters (as appropriate) that had the old rds watcher as a parent
if (!oldRdsWatcher.hasDataValue()) {
return;
}
for (XdsWatcherBase<XdsClusterResource.CdsUpdate> watcher :
getWatchers(CLUSTER_RESOURCE).values()) {
cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
}
}
}
private RdsWatcher getRdsWatcher() {
if (rdsName == null) { if (rdsName == null) {
return null; return null;
} }
return (RdsWatcher) getWatchers(XdsRouteConfigureResource.getInstance()).get(rdsName); return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
} }
public RdsUpdateSupplier getRouteSource() { public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
if (!hasDataValue()) { if (!hasDataValue()) {
return this; return this;
} }
@ -707,7 +636,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
if (virtualHosts != null) { if (virtualHosts != null) {
return this; return this;
} }
RdsWatcher rdsWatcher = getRdsWatcher(); RdsWatcher rdsWatcher = getRdsWatcher(tracer);
assert rdsWatcher != null; assert rdsWatcher != null;
return rdsWatcher; return rdsWatcher;
} }
@ -748,11 +677,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
if (cancelled) { if (cancelled) {
return; return;
} }
List<VirtualHost> oldVirtualHosts = hasDataValue()
? getData().getValue().virtualHosts
: Collections.emptyList();
setData(update); setData(update);
updateRoutes(update.virtualHosts, this, oldVirtualHosts, true); updateRoutes(update.virtualHosts);
maybePublishConfig(); maybePublishConfig();
} }
@ -766,11 +692,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
} }
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> { private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
Set<Object> parentContexts = new HashSet<>(); CdsWatcher(String resourceName) {
CdsWatcher(String resourceName, Object parentContext) {
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
this.parentContexts.add(checkNotNull(parentContext, "parentContext"));
} }
@Override @Override
@ -782,32 +705,16 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
switch (update.clusterType()) { switch (update.clusterType()) {
case EDS: case EDS:
setData(update); setData(update);
addEdsWatcher(getEdsServiceName(), this); addEdsWatcher(getEdsServiceName());
break; break;
case LOGICAL_DNS: case LOGICAL_DNS:
setData(update); setData(update);
// no eds needed // no eds needed
break; break;
case AGGREGATE: case AGGREGATE:
Object parentContext = this; setData(update);
if (hasDataValue()) { update.prioritizedClusterNames()
Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE .forEach(name -> addClusterWatcher(name));
? new HashSet<>(getData().getValue().prioritizedClusterNames())
: Collections.emptySet();
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
deletedClusters.forEach((cluster)
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));
setData(update);
Set<String> addedClusters = Sets.difference(newNames, oldNames);
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext));
} else {
setData(update);
update.prioritizedClusterNames()
.forEach(name -> addClusterWatcher(name, parentContext));
}
break; break;
default: default:
Status error = Status.UNAVAILABLE.withDescription( Status error = Status.UNAVAILABLE.withDescription(
@ -829,11 +736,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
} }
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> { private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
private final Set<CdsWatcher> parentContexts = new HashSet<>(); private EdsWatcher(String resourceName) {
private EdsWatcher(String resourceName, CdsWatcher parentContext) {
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName")); super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
parentContexts.add(checkNotNull(parentContext, "parentContext"));
} }
@Override @Override
@ -844,9 +748,5 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
setData(checkNotNull(update, "update")); setData(checkNotNull(update, "update"));
maybePublishConfig(); maybePublishConfig();
} }
void addParentContext(CdsWatcher parentContext) {
parentContexts.add(checkNotNull(parentContext, "parentContext"));
}
} }
} }

View File

@ -66,7 +66,9 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsTransportFactory; import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -562,7 +564,39 @@ public class XdsDependencyManagerTest {
} }
@Test @Test
public void testCdsCycle() throws Exception { public void testCdsDeleteUnsubscribesChild() throws Exception {
RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA");
Map<String, Message> clusterMap = new HashMap<>();
Map<String, Message> edsMap = new HashMap<>();
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getClusters().get("clusterA").hasValue()).isTrue();
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> watches =
xdsClient.getSubscribedResourcesMetadataSnapshot().get();
assertThat(watches.get(XdsEndpointResource.getInstance()).keySet())
.containsExactly("eds_clusterA");
// Delete cluster
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of());
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
watches = xdsClient.getSubscribedResourcesMetadataSnapshot().get();
assertThat(watches).doesNotContainKey(XdsEndpointResource.getInstance());
}
@Test
public void testCdsCycleReclaimed() throws Exception {
RouteConfiguration routeConfig = RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA");
Map<String, Message> clusterMap = new HashMap<>(); Map<String, Message> clusterMap = new HashMap<>();
@ -575,6 +609,7 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// The cycle is loaded and detected
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler); serverName, serverName, nameResolverArgs, scheduler);
@ -582,6 +617,16 @@ public class XdsDependencyManagerTest {
XdsConfig config = xdsUpdateCaptor.getValue().getValue(); XdsConfig config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
// Orphan the cycle and it is discarded
routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterC");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
inOrder.verify(xdsConfigWatcher).onUpdate(any());
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> watches =
xdsClient.getSubscribedResourcesMetadataSnapshot().get();
assertThat(watches.get(XdsClusterResource.getInstance()).keySet()).containsExactly("clusterC");
} }
@Test @Test

View File

@ -241,7 +241,7 @@ public class XdsNameResolverTest {
resolver.shutdown(); resolver.shutdown();
if (xdsClient != null) { if (xdsClient != null) {
assertThat(xdsClient.ldsWatcher).isNull(); assertThat(xdsClient.ldsWatcher).isNull();
assertThat(xdsClient.rdsWatcher).isNull(); assertThat(xdsClient.rdsWatchers).isEmpty();
} }
} }
@ -421,7 +421,7 @@ public class XdsNameResolverTest {
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME);
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME);
VirtualHost virtualHost = VirtualHost virtualHost =
VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY),
Collections.singletonList(route1), Collections.singletonList(route1),
@ -438,13 +438,14 @@ public class XdsNameResolverTest {
ArgumentCaptor.forClass(ResolutionResult.class); ArgumentCaptor.forClass(ResolutionResult.class);
String alternativeRdsResource = "route-configuration-alter.googleapis.com"; String alternativeRdsResource = "route-configuration-alter.googleapis.com";
xdsClient.deliverLdsUpdateForRdsName(alternativeRdsResource); xdsClient.deliverLdsUpdateForRdsName(alternativeRdsResource);
assertThat(xdsClient.rdsResource).isEqualTo(alternativeRdsResource); assertThat(xdsClient.rdsWatchers.keySet()).contains(alternativeRdsResource);
virtualHost = virtualHost =
VirtualHost.create("virtualhost-alter", Collections.singletonList(AUTHORITY), VirtualHost.create("virtualhost-alter", Collections.singletonList(AUTHORITY),
Collections.singletonList(route2), Collections.singletonList(route2),
ImmutableMap.of()); ImmutableMap.of());
xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost)); xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster2); createAndDeliverClusterUpdates(xdsClient, cluster2);
assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(alternativeRdsResource);
// Two new service config updates triggered: // Two new service config updates triggered:
// - with load balancing config being able to select cluster1 and cluster2 // - with load balancing config being able to select cluster1 and cluster2
// - with load balancing config being able to select cluster2 only // - with load balancing config being able to select cluster2 only
@ -477,7 +478,7 @@ public class XdsNameResolverTest {
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME);
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME);
VirtualHost virtualHost = VirtualHost virtualHost =
VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY),
Collections.singletonList(route), Collections.singletonList(route),
@ -491,14 +492,14 @@ public class XdsNameResolverTest {
reset(mockListener); reset(mockListener);
xdsClient.deliverLdsResourceNotFound(); // revoke LDS resource xdsClient.deliverLdsResourceNotFound(); // revoke LDS resource
assertThat(xdsClient.rdsResource).isNull(); // stop subscribing to stale RDS resource assertThat(xdsClient.rdsWatchers.keySet()).isEmpty(); // stop subscribing to stale RDS resource
assertEmptyResolutionResult(expectedLdsResourceName); assertEmptyResolutionResult(expectedLdsResourceName);
reset(mockListener); reset(mockListener);
xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME);
// No name resolution result until new RDS resource update is received. Do not use stale config // No name resolution result until new RDS resource update is received. Do not use stale config
verifyNoInteractions(mockListener); verifyNoInteractions(mockListener);
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME);
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1); createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture()); verify(mockListener).onResult2(resolutionResultCaptor.capture());
@ -518,7 +519,7 @@ public class XdsNameResolverTest {
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME);
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME);
VirtualHost virtualHost = VirtualHost virtualHost =
VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY),
Collections.singletonList(route), Collections.singletonList(route),
@ -537,6 +538,7 @@ public class XdsNameResolverTest {
// Simulate management server adds back the previously used RDS resource. // Simulate management server adds back the previously used RDS resource.
reset(mockListener); reset(mockListener);
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture()); verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig( assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1), Collections.singletonList(cluster1),
@ -2430,9 +2432,8 @@ public class XdsNameResolverTest {
private class FakeXdsClient extends XdsClient { private class FakeXdsClient extends XdsClient {
// Should never be subscribing to more than one LDS and RDS resource at any point of time. // Should never be subscribing to more than one LDS and RDS resource at any point of time.
private String ldsResource; // should always be AUTHORITY private String ldsResource; // should always be AUTHORITY
private String rdsResource;
private ResourceWatcher<LdsUpdate> ldsWatcher; private ResourceWatcher<LdsUpdate> ldsWatcher;
private ResourceWatcher<RdsUpdate> rdsWatcher; private final Map<String, List<ResourceWatcher<RdsUpdate>>> rdsWatchers = new HashMap<>();
private final Map<String, List<ResourceWatcher<CdsUpdate>>> cdsWatchers = new HashMap<>(); private final Map<String, List<ResourceWatcher<CdsUpdate>>> cdsWatchers = new HashMap<>();
private final Map<String, List<ResourceWatcher<EdsUpdate>>> edsWatchers = new HashMap<>(); private final Map<String, List<ResourceWatcher<EdsUpdate>>> edsWatchers = new HashMap<>();
@ -2457,10 +2458,8 @@ public class XdsNameResolverTest {
ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher; ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher;
break; break;
case "RDS": case "RDS":
assertThat(rdsResource).isNull(); rdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
assertThat(rdsWatcher).isNull(); .add((ResourceWatcher<RdsUpdate>) watcher);
rdsResource = resourceName;
rdsWatcher = (ResourceWatcher<RdsUpdate>) watcher;
break; break;
case "CDS": case "CDS":
cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
@ -2488,10 +2487,12 @@ public class XdsNameResolverTest {
ldsWatcher = null; ldsWatcher = null;
break; break;
case "RDS": case "RDS":
assertThat(rdsResource).isNotNull(); assertThat(rdsWatchers).containsKey(resourceName);
assertThat(rdsWatcher).isNotNull(); assertThat(rdsWatchers.get(resourceName)).contains(watcher);
rdsResource = null; rdsWatchers.get(resourceName).remove((ResourceWatcher<RdsUpdate>) watcher);
rdsWatcher = null; if (rdsWatchers.get(resourceName).isEmpty()) {
rdsWatchers.remove(resourceName);
}
break; break;
case "CDS": case "CDS":
assertThat(cdsWatchers).containsKey(resourceName); assertThat(cdsWatchers).containsKey(resourceName);
@ -2659,9 +2660,6 @@ public class XdsNameResolverTest {
void deliverRdsUpdateWithFaultInjection( void deliverRdsUpdateWithFaultInjection(
String resourceName, @Nullable FaultConfig virtualHostFaultConfig, String resourceName, @Nullable FaultConfig virtualHostFaultConfig,
@Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) { @Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) {
if (!resourceName.equals(rdsResource)) {
return;
}
ImmutableMap<String, FilterConfig> overrideConfig = weightedClusterFaultConfig == null ImmutableMap<String, FilterConfig> overrideConfig = weightedClusterFaultConfig == null
? ImmutableMap.of() ? ImmutableMap.of()
: ImmutableMap.of( : ImmutableMap.of(
@ -2690,18 +2688,19 @@ public class XdsNameResolverTest {
Collections.singletonList(expectedLdsResourceName), Collections.singletonList(expectedLdsResourceName),
Collections.singletonList(route), Collections.singletonList(route),
overrideConfig); overrideConfig);
syncContext.execute(() -> { deliverRdsUpdate(resourceName, virtualHost);
rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); createAndDeliverClusterUpdates(this, cluster1);
createAndDeliverClusterUpdates(this, cluster1);
});
} }
void deliverRdsUpdate(String resourceName, List<VirtualHost> virtualHosts) { void deliverRdsUpdate(String resourceName, List<VirtualHost> virtualHosts) {
if (!resourceName.equals(rdsResource)) { if (!rdsWatchers.containsKey(resourceName)) {
return; return;
} }
syncContext.execute(() -> { syncContext.execute(() -> {
rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); RdsUpdate update = new RdsUpdate(virtualHosts);
List<ResourceWatcher<RdsUpdate>> resourceWatchers =
ImmutableList.copyOf(rdsWatchers.get(resourceName));
resourceWatchers.forEach(w -> w.onChanged(update));
}); });
} }
@ -2710,11 +2709,13 @@ public class XdsNameResolverTest {
} }
void deliverRdsResourceNotFound(String resourceName) { void deliverRdsResourceNotFound(String resourceName) {
if (!resourceName.equals(rdsResource)) { if (!rdsWatchers.containsKey(resourceName)) {
return; return;
} }
syncContext.execute(() -> { syncContext.execute(() -> {
rdsWatcher.onResourceDoesNotExist(rdsResource); List<ResourceWatcher<RdsUpdate>> resourceWatchers =
ImmutableList.copyOf(rdsWatchers.get(resourceName));
resourceWatchers.forEach(w -> w.onResourceDoesNotExist(resourceName));
}); });
} }
@ -2747,12 +2748,10 @@ public class XdsNameResolverTest {
ldsWatcher.onError(error); ldsWatcher.onError(error);
}); });
} }
if (rdsWatcher != null) {
syncContext.execute(() -> {
rdsWatcher.onError(error);
});
}
syncContext.execute(() -> { syncContext.execute(() -> {
rdsWatchers.values().stream()
.flatMap(List::stream)
.forEach(w -> w.onError(error));
cdsWatchers.values().stream() cdsWatchers.values().stream()
.flatMap(List::stream) .flatMap(List::stream)
.forEach(w -> w.onError(error)); .forEach(w -> w.onError(error));