diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 7bbd0064ed..78d4dbb198 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -19,12 +19,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.client.XdsClient.ResourceUpdate; -import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import io.grpc.InternalLogId; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StatusOr; @@ -36,7 +33,6 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; -import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsResourceType; import java.io.Closeable; import java.io.IOException; @@ -61,22 +57,21 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ + private final String listenerName; private final XdsClient xdsClient; private final XdsConfigWatcher xdsConfigWatcher; private final SynchronizationContext syncContext; private final String dataPlaneAuthority; - private final InternalLogId logId; - private final XdsLogger logger; private StatusOr lastUpdate = null; private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); + private final Set subscriptions = new HashSet<>(); XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, SynchronizationContext syncContext, String dataPlaneAuthority, String listenerName, NameResolver.Args nameResolverArgs, ScheduledExecutorService scheduler) { - logId = InternalLogId.allocate("xds-dependency-manager", listenerName); - logger = XdsLogger.withLogId(logId); + this.listenerName = checkNotNull(listenerName, "listenerName"); this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.syncContext = checkNotNull(syncContext, "syncContext"); @@ -102,7 +97,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi subscription.closed = true; return; // shutdown() called } - addClusterWatcher(clusterName, subscription); + subscriptions.add(subscription); + addClusterWatcher(clusterName); }); return subscription; @@ -117,73 +113,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi xdsClient.watchXdsResource(type, resourceName, watcher, syncContext); } - private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) { - if (watcher == null) { - return; - } - watcher.parentContexts.remove(parentContext); - if (watcher.parentContexts.isEmpty()) { - cancelWatcher(watcher); - } - } - - private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) { - if (watcher == null) { - return; - } - watcher.parentContexts.remove(parentContext); - if (watcher.parentContexts.isEmpty()) { - cancelWatcher(watcher); - } - } - - private void cancelWatcher(XdsWatcherBase watcher) { - syncContext.throwIfNotInThisSynchronizationContext(); - - if (watcher == null) { - return; - } - - if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) { - throwIfParentContextsNotEmpty(watcher); - } - - watcher.cancelled = true; - XdsResourceType type = watcher.type; - String resourceName = watcher.resourceName; - - if (getWatchers(type).remove(resourceName) == null) { - logger.log(DEBUG, "Trying to cancel watcher {0}, but it isn't watched", watcher); - return; - } - - xdsClient.cancelXdsResourceWatch(type, resourceName, watcher); - } - - private static void throwIfParentContextsNotEmpty(XdsWatcherBase watcher) { - if (watcher instanceof CdsWatcher) { - CdsWatcher cdsWatcher = (CdsWatcher) watcher; - if (!cdsWatcher.parentContexts.isEmpty()) { - String msg = String.format("CdsWatcher %s has parent contexts %s", - cdsWatcher.resourceName(), cdsWatcher.parentContexts); - throw new IllegalStateException(msg); - } - } else if (watcher instanceof EdsWatcher) { - EdsWatcher edsWatcher = (EdsWatcher) watcher; - if (!edsWatcher.parentContexts.isEmpty()) { - String msg = String.format("CdsWatcher %s has parent contexts %s", - edsWatcher.resourceName(), edsWatcher.parentContexts); - throw new IllegalStateException(msg); - } - } - } - public void shutdown() { syncContext.execute(() -> { for (TypeWatchers watchers : resourceWatchers.values()) { shutdownWatchersForType(watchers); } resourceWatchers.clear(); + subscriptions.clear(); }); } @@ -197,54 +133,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi private void releaseSubscription(ClusterSubscription subscription) { checkNotNull(subscription, "subscription"); - String clusterName = subscription.getClusterName(); syncContext.execute(() -> { if (subscription.closed) { return; } subscription.closed = true; - XdsWatcherBase cdsWatcher - = getWatchers(CLUSTER_RESOURCE).get(clusterName); - if (cdsWatcher == null) { + if (!subscriptions.remove(subscription)) { return; // shutdown() called } - cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription); maybePublishConfig(); }); } - private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) { - checkNotNull(root, "root"); - - cancelCdsWatcher(root, parentContext); - - if (!root.hasDataValue() || !root.parentContexts.isEmpty()) { - return; - } - - XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue(); - switch (cdsUpdate.clusterType()) { - case EDS: - String edsServiceName = root.getEdsServiceName(); - EdsWatcher edsWatcher = (EdsWatcher) getWatchers(ENDPOINT_RESOURCE).get(edsServiceName); - cancelEdsWatcher(edsWatcher, root); - break; - case AGGREGATE: - for (String cluster : cdsUpdate.prioritizedClusterNames()) { - CdsWatcher clusterWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(cluster); - if (clusterWatcher != null) { - cancelClusterWatcherTree(clusterWatcher, root); - } - } - break; - case LOGICAL_DNS: - // no eds needed, so everything happens in cancelCdsWatcher() - break; - default: - throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType()); - } - } - /** * Check if all resources have results, and if so, generate a new XdsConfig and send it to all * the watchers. @@ -274,27 +174,40 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @VisibleForTesting StatusOr buildUpdate() { + // Create a config and discard any watchers not accessed + WatcherTracer tracer = new WatcherTracer(resourceWatchers); + StatusOr config = buildUpdate( + tracer, listenerName, dataPlaneAuthority, subscriptions); + tracer.closeUnusedWatchers(); + return config; + } + + private static StatusOr buildUpdate( + WatcherTracer tracer, + String listenerName, + String dataPlaneAuthority, + Set subscriptions) { XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); // Iterate watchers and build the XdsConfig - // Will only be 1 listener and 1 route resource - RdsUpdateSupplier routeSource = null; - for (XdsWatcherBase ldsWatcher : - getWatchers(XdsListenerResource.getInstance()).values()) { - if (!ldsWatcher.getData().hasValue()) { - return StatusOr.fromStatus(ldsWatcher.getData().getStatus()); - } - XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue(); - builder.setListener(ldsUpdate); - routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(); + XdsWatcherBase ldsWatcher + = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName); + if (ldsWatcher == null) { + return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + "Bug: No listener watcher found for " + listenerName)); } + if (!ldsWatcher.getData().hasValue()) { + return StatusOr.fromStatus(ldsWatcher.getData().getStatus()); + } + XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue(); + builder.setListener(ldsUpdate); + RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer); if (routeSource == null) { return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( "Bug: No route source found for listener " + dataPlaneAuthority)); } - StatusOr statusOrRdsUpdate = routeSource.getRdsUpdate(); if (!statusOrRdsUpdate.hasValue()) { return StatusOr.fromStatus(statusOrRdsUpdate.getStatus()); @@ -312,8 +225,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi Map> clusters = new HashMap<>(); LinkedHashSet ancestors = new LinkedHashSet<>(); - for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) { - addConfigForCluster(clusters, cluster, ancestors); + for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) { + addConfigForCluster(clusters, cluster, ancestors, tracer); + } + for (ClusterSubscription subscription : subscriptions) { + addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer); } for (Map.Entry> me : clusters.entrySet()) { builder.addCluster(me.getKey(), me.getValue()); @@ -335,11 +251,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return tTypeWatchers.watchers; } - private void addConfigForCluster( + private static void addConfigForCluster( Map> clusters, String clusterName, @SuppressWarnings("NonApiType") // Need order-preserving set for errors - LinkedHashSet ancestors) { + LinkedHashSet ancestors, + WatcherTracer tracer) { if (clusters.containsKey(clusterName)) { return; } @@ -355,7 +272,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return; } - CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); + CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName); StatusOr cdsWatcherDataOr = cdsWatcher.getData(); if (!cdsWatcherDataOr.hasValue()) { clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); @@ -371,7 +288,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi LinkedHashSet leafNames = new LinkedHashSet(); ancestors.add(clusterName); for (String childCluster : cdsUpdate.prioritizedClusterNames()) { - addConfigForCluster(clusters, childCluster, ancestors); + addConfigForCluster(clusters, childCluster, ancestors, tracer); StatusOr config = clusters.get(childCluster); if (!config.hasValue()) { clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( @@ -392,7 +309,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi break; case EDS: XdsWatcherBase edsWatcher = - getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName()); + tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName()); if (edsWatcher != null) { child = new EndpointConfig(edsWatcher.getData()); } else { @@ -412,53 +329,35 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } - @Override - public String toString() { - return logId.toString(); - } - - private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) { - EdsWatcher watcher - = (EdsWatcher) getWatchers(XdsEndpointResource.getInstance()).get(edsServiceName); - if (watcher != null) { - watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence + private void addRdsWatcher(String resourceName) { + if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) { return; } - addWatcher(new EdsWatcher(edsServiceName, parentContext)); + addWatcher(new RdsWatcher(resourceName)); } - private void addClusterWatcher(String clusterName, Object parentContext) { - CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); - if (watcher != null) { - watcher.parentContexts.add(parentContext); + private void addEdsWatcher(String edsServiceName) { + if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) { return; } - addWatcher(new CdsWatcher(clusterName, parentContext)); + addWatcher(new EdsWatcher(edsServiceName)); } - private void updateRoutes(List virtualHosts, Object newParentContext, - List oldVirtualHosts, boolean sameParentContext) { - VirtualHost oldVirtualHost = - RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority); + private void addClusterWatcher(String clusterName) { + if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) { + return; + } + + addWatcher(new CdsWatcher(clusterName)); + } + + private void updateRoutes(List virtualHosts) { VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority); - Set newClusters = getClusterNamesFromVirtualHost(virtualHost); - Set oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost); - - if (sameParentContext) { - // Calculate diffs. - Set addedClusters = Sets.difference(newClusters, oldClusters); - Set deletedClusters = Sets.difference(oldClusters, newClusters); - - deletedClusters.forEach(watcher -> - cancelClusterWatcherTree(getCluster(watcher), newParentContext)); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); - } else { - newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); - } + newClusters.forEach((cluster) -> addClusterWatcher(cluster)); } private String nodeInfo() { @@ -489,10 +388,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return clusters; } - private CdsWatcher getCluster(String clusterName) { - return (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); - } - private static class TypeWatchers { // Key is resource name final Map> watchers = new HashMap<>(); @@ -529,6 +424,64 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } } + /** State for tracing garbage collector. */ + private static final class WatcherTracer { + private final Map, TypeWatchers> resourceWatchers; + private final Map, TypeWatchers> usedWatchers; + + public WatcherTracer(Map, TypeWatchers> resourceWatchers) { + this.resourceWatchers = resourceWatchers; + + this.usedWatchers = new HashMap<>(); + for (XdsResourceType type : resourceWatchers.keySet()) { + usedWatchers.put(type, newTypeWatchers(type)); + } + } + + private static TypeWatchers newTypeWatchers( + XdsResourceType type) { + return new TypeWatchers(type); + } + + public XdsWatcherBase getWatcher( + XdsResourceType resourceType, String name) { + TypeWatchers typeWatchers = resourceWatchers.get(resourceType); + if (typeWatchers == null) { + return null; + } + assert typeWatchers.resourceType == resourceType; + @SuppressWarnings("unchecked") + TypeWatchers tTypeWatchers = (TypeWatchers) typeWatchers; + XdsWatcherBase watcher = tTypeWatchers.watchers.get(name); + if (watcher == null) { + return null; + } + @SuppressWarnings("unchecked") + TypeWatchers usedTypeWatchers = (TypeWatchers) usedWatchers.get(resourceType); + usedTypeWatchers.watchers.put(name, watcher); + return watcher; + } + + /** Shut down unused watchers. */ + public void closeUnusedWatchers() { + boolean changed = false; // Help out the GC by preferring old objects + for (XdsResourceType type : resourceWatchers.keySet()) { + TypeWatchers orig = resourceWatchers.get(type); + TypeWatchers used = usedWatchers.get(type); + for (String name : orig.watchers.keySet()) { + if (used.watchers.containsKey(name)) { + continue; + } + orig.watchers.get(name).close(); + changed = true; + } + } + if (changed) { + resourceWatchers.putAll(usedWatchers); + } + } + } + private abstract class XdsWatcherBase implements ResourceWatcher { private final XdsResourceType type; @@ -571,6 +524,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi maybePublishConfig(); } + public void close() { + xdsClient.cancelXdsResourceWatch(type, resourceName, this); + } + boolean missingResult() { return data == null; } @@ -633,24 +590,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi virtualHosts = httpConnectionManager.virtualHosts(); rdsName = httpConnectionManager.rdsName(); } - StatusOr activeRdsUpdate = getRouteSource().getRdsUpdate(); - List activeVirtualHosts = activeRdsUpdate.hasValue() - ? activeRdsUpdate.getValue().virtualHosts - : Collections.emptyList(); - - boolean changedRdsName = !Objects.equals(rdsName, this.rdsName); - if (changedRdsName) { - cleanUpRdsWatcher(); - } if (virtualHosts != null) { // No RDS watcher since we are getting RDS updates via LDS - updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null); + updateRoutes(virtualHosts); this.rdsName = null; - } else if (changedRdsName) { + } else { this.rdsName = rdsName; - addWatcher(new RdsWatcher(rdsName)); - logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + addRdsWatcher(rdsName); } setData(update); @@ -666,36 +613,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi checkArgument(resourceName().equals(resourceName), "Resource name does not match"); setDataAsStatus(Status.UNAVAILABLE.withDescription( toContextString() + " does not exist" + nodeInfo())); - cleanUpRdsWatcher(); rdsName = null; maybePublishConfig(); } - private void cleanUpRdsWatcher() { - RdsWatcher oldRdsWatcher = getRdsWatcher(); - if (oldRdsWatcher != null) { - cancelWatcher(oldRdsWatcher); - logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName); - - // Cleanup clusters (as appropriate) that had the old rds watcher as a parent - if (!oldRdsWatcher.hasDataValue()) { - return; - } - for (XdsWatcherBase watcher : - getWatchers(CLUSTER_RESOURCE).values()) { - cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher); - } - } - } - - private RdsWatcher getRdsWatcher() { + private RdsWatcher getRdsWatcher(WatcherTracer tracer) { if (rdsName == null) { return null; } - return (RdsWatcher) getWatchers(XdsRouteConfigureResource.getInstance()).get(rdsName); + return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName); } - public RdsUpdateSupplier getRouteSource() { + public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) { if (!hasDataValue()) { return this; } @@ -707,7 +636,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi if (virtualHosts != null) { return this; } - RdsWatcher rdsWatcher = getRdsWatcher(); + RdsWatcher rdsWatcher = getRdsWatcher(tracer); assert rdsWatcher != null; return rdsWatcher; } @@ -748,11 +677,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi if (cancelled) { return; } - List oldVirtualHosts = hasDataValue() - ? getData().getValue().virtualHosts - : Collections.emptyList(); setData(update); - updateRoutes(update.virtualHosts, this, oldVirtualHosts, true); + updateRoutes(update.virtualHosts); maybePublishConfig(); } @@ -766,11 +692,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class CdsWatcher extends XdsWatcherBase { - Set parentContexts = new HashSet<>(); - - CdsWatcher(String resourceName, Object parentContext) { + CdsWatcher(String resourceName) { super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); - this.parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -782,32 +705,16 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi switch (update.clusterType()) { case EDS: setData(update); - addEdsWatcher(getEdsServiceName(), this); + addEdsWatcher(getEdsServiceName()); break; case LOGICAL_DNS: setData(update); // no eds needed break; case AGGREGATE: - Object parentContext = this; - if (hasDataValue()) { - Set oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE - ? new HashSet<>(getData().getValue().prioritizedClusterNames()) - : Collections.emptySet(); - Set newNames = new HashSet<>(update.prioritizedClusterNames()); - - Set deletedClusters = Sets.difference(oldNames, newNames); - deletedClusters.forEach((cluster) - -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); - - setData(update); - Set addedClusters = Sets.difference(newNames, oldNames); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext)); - } else { - setData(update); - update.prioritizedClusterNames() - .forEach(name -> addClusterWatcher(name, parentContext)); - } + setData(update); + update.prioritizedClusterNames() + .forEach(name -> addClusterWatcher(name)); break; default: Status error = Status.UNAVAILABLE.withDescription( @@ -829,11 +736,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class EdsWatcher extends XdsWatcherBase { - private final Set parentContexts = new HashSet<>(); - - private EdsWatcher(String resourceName, CdsWatcher parentContext) { + private EdsWatcher(String resourceName) { super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName")); - parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -844,9 +748,5 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi setData(checkNotNull(update, "update")); maybePublishConfig(); } - - void addParentContext(CdsWatcher parentContext) { - parentContexts.add(checkNotNull(parentContext, "parentContext")); - } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index da960e1e13..aea1ad66d7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -66,7 +66,9 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; import java.io.IOException; @@ -562,7 +564,39 @@ public class XdsDependencyManagerTest { } @Test - public void testCdsCycle() throws Exception { + public void testCdsDeleteUnsubscribesChild() throws Exception { + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isTrue(); + Map, Map> watches = + xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches.get(XdsEndpointResource.getInstance()).keySet()) + .containsExactly("eds_clusterA"); + + // Delete cluster + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); + watches = xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches).doesNotContainKey(XdsEndpointResource.getInstance()); + } + + @Test + public void testCdsCycleReclaimed() throws Exception { RouteConfiguration routeConfig = XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); Map clusterMap = new HashMap<>(); @@ -575,6 +609,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + // The cycle is loaded and detected InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, serverName, serverName, nameResolverArgs, scheduler); @@ -582,6 +617,16 @@ public class XdsDependencyManagerTest { XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + + // Orphan the cycle and it is discarded + routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterC"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + inOrder.verify(xdsConfigWatcher).onUpdate(any()); + Map, Map> watches = + xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches.get(XdsClusterResource.getInstance()).keySet()).containsExactly("clusterC"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index ab966ce002..f5c40fa211 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -241,7 +241,7 @@ public class XdsNameResolverTest { resolver.shutdown(); if (xdsClient != null) { assertThat(xdsClient.ldsWatcher).isNull(); - assertThat(xdsClient.rdsWatcher).isNull(); + assertThat(xdsClient.rdsWatchers).isEmpty(); } } @@ -421,7 +421,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route1), @@ -438,13 +438,14 @@ public class XdsNameResolverTest { ArgumentCaptor.forClass(ResolutionResult.class); String alternativeRdsResource = "route-configuration-alter.googleapis.com"; xdsClient.deliverLdsUpdateForRdsName(alternativeRdsResource); - assertThat(xdsClient.rdsResource).isEqualTo(alternativeRdsResource); + assertThat(xdsClient.rdsWatchers.keySet()).contains(alternativeRdsResource); virtualHost = VirtualHost.create("virtualhost-alter", Collections.singletonList(AUTHORITY), Collections.singletonList(route2), ImmutableMap.of()); xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost)); createAndDeliverClusterUpdates(xdsClient, cluster2); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(alternativeRdsResource); // Two new service config updates triggered: // - with load balancing config being able to select cluster1 and cluster2 // - with load balancing config being able to select cluster2 only @@ -477,7 +478,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route), @@ -491,14 +492,14 @@ public class XdsNameResolverTest { reset(mockListener); xdsClient.deliverLdsResourceNotFound(); // revoke LDS resource - assertThat(xdsClient.rdsResource).isNull(); // stop subscribing to stale RDS resource + assertThat(xdsClient.rdsWatchers.keySet()).isEmpty(); // stop subscribing to stale RDS resource assertEmptyResolutionResult(expectedLdsResourceName); reset(mockListener); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); // No name resolution result until new RDS resource update is received. Do not use stale config verifyNoInteractions(mockListener); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); @@ -518,7 +519,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route), @@ -537,6 +538,7 @@ public class XdsNameResolverTest { // Simulate management server adds back the previously used RDS resource. reset(mockListener); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -2430,9 +2432,8 @@ public class XdsNameResolverTest { private class FakeXdsClient extends XdsClient { // Should never be subscribing to more than one LDS and RDS resource at any point of time. private String ldsResource; // should always be AUTHORITY - private String rdsResource; private ResourceWatcher ldsWatcher; - private ResourceWatcher rdsWatcher; + private final Map>> rdsWatchers = new HashMap<>(); private final Map>> cdsWatchers = new HashMap<>(); private final Map>> edsWatchers = new HashMap<>(); @@ -2457,10 +2458,8 @@ public class XdsNameResolverTest { ldsWatcher = (ResourceWatcher) watcher; break; case "RDS": - assertThat(rdsResource).isNull(); - assertThat(rdsWatcher).isNull(); - rdsResource = resourceName; - rdsWatcher = (ResourceWatcher) watcher; + rdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); break; case "CDS": cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) @@ -2488,10 +2487,12 @@ public class XdsNameResolverTest { ldsWatcher = null; break; case "RDS": - assertThat(rdsResource).isNotNull(); - assertThat(rdsWatcher).isNotNull(); - rdsResource = null; - rdsWatcher = null; + assertThat(rdsWatchers).containsKey(resourceName); + assertThat(rdsWatchers.get(resourceName)).contains(watcher); + rdsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + if (rdsWatchers.get(resourceName).isEmpty()) { + rdsWatchers.remove(resourceName); + } break; case "CDS": assertThat(cdsWatchers).containsKey(resourceName); @@ -2659,9 +2660,6 @@ public class XdsNameResolverTest { void deliverRdsUpdateWithFaultInjection( String resourceName, @Nullable FaultConfig virtualHostFaultConfig, @Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) { - if (!resourceName.equals(rdsResource)) { - return; - } ImmutableMap overrideConfig = weightedClusterFaultConfig == null ? ImmutableMap.of() : ImmutableMap.of( @@ -2690,18 +2688,19 @@ public class XdsNameResolverTest { Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); - createAndDeliverClusterUpdates(this, cluster1); - }); + deliverRdsUpdate(resourceName, virtualHost); + createAndDeliverClusterUpdates(this, cluster1); } void deliverRdsUpdate(String resourceName, List virtualHosts) { - if (!resourceName.equals(rdsResource)) { + if (!rdsWatchers.containsKey(resourceName)) { return; } syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); + RdsUpdate update = new RdsUpdate(virtualHosts); + List> resourceWatchers = + ImmutableList.copyOf(rdsWatchers.get(resourceName)); + resourceWatchers.forEach(w -> w.onChanged(update)); }); } @@ -2710,11 +2709,13 @@ public class XdsNameResolverTest { } void deliverRdsResourceNotFound(String resourceName) { - if (!resourceName.equals(rdsResource)) { + if (!rdsWatchers.containsKey(resourceName)) { return; } syncContext.execute(() -> { - rdsWatcher.onResourceDoesNotExist(rdsResource); + List> resourceWatchers = + ImmutableList.copyOf(rdsWatchers.get(resourceName)); + resourceWatchers.forEach(w -> w.onResourceDoesNotExist(resourceName)); }); } @@ -2747,12 +2748,10 @@ public class XdsNameResolverTest { ldsWatcher.onError(error); }); } - if (rdsWatcher != null) { - syncContext.execute(() -> { - rdsWatcher.onError(error); - }); - } syncContext.execute(() -> { + rdsWatchers.values().stream() + .flatMap(List::stream) + .forEach(w -> w.onError(error)); cdsWatchers.values().stream() .flatMap(List::stream) .forEach(w -> w.onError(error));