From 1fd29bc804d0820a8f72c061bd671811adf85546 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 29 May 2025 17:08:33 -0700 Subject: [PATCH] xds: Use tracing GC in XdsDepManager Reference counting doesn't release cycles, so swap to a tracing garbage collector. This greatly simplifies the code as well, as diffing is no longer necessary. (If vanilla reference counting was used, diffing wouldn't have been necessary either as you just increment all the new objects and decrement the old ones. But that doesn't work when use a set instead of an integer.) --- .../io/grpc/xds/XdsDependencyManager.java | 368 +++++++----------- .../io/grpc/xds/XdsDependencyManagerTest.java | 47 ++- .../java/io/grpc/xds/XdsNameResolverTest.java | 65 ++-- 3 files changed, 212 insertions(+), 268 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 7bbd0064ed..78d4dbb198 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -19,12 +19,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.client.XdsClient.ResourceUpdate; -import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import io.grpc.InternalLogId; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StatusOr; @@ -36,7 +33,6 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; -import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsResourceType; import java.io.Closeable; import java.io.IOException; @@ -61,22 +57,21 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ + private final String listenerName; private final XdsClient xdsClient; private final XdsConfigWatcher xdsConfigWatcher; private final SynchronizationContext syncContext; private final String dataPlaneAuthority; - private final InternalLogId logId; - private final XdsLogger logger; private StatusOr lastUpdate = null; private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); + private final Set subscriptions = new HashSet<>(); XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, SynchronizationContext syncContext, String dataPlaneAuthority, String listenerName, NameResolver.Args nameResolverArgs, ScheduledExecutorService scheduler) { - logId = InternalLogId.allocate("xds-dependency-manager", listenerName); - logger = XdsLogger.withLogId(logId); + this.listenerName = checkNotNull(listenerName, "listenerName"); this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.syncContext = checkNotNull(syncContext, "syncContext"); @@ -102,7 +97,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi subscription.closed = true; return; // shutdown() called } - addClusterWatcher(clusterName, subscription); + subscriptions.add(subscription); + addClusterWatcher(clusterName); }); return subscription; @@ -117,73 +113,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi xdsClient.watchXdsResource(type, resourceName, watcher, syncContext); } - private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) { - if (watcher == null) { - return; - } - watcher.parentContexts.remove(parentContext); - if (watcher.parentContexts.isEmpty()) { - cancelWatcher(watcher); - } - } - - private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) { - if (watcher == null) { - return; - } - watcher.parentContexts.remove(parentContext); - if (watcher.parentContexts.isEmpty()) { - cancelWatcher(watcher); - } - } - - private void cancelWatcher(XdsWatcherBase watcher) { - syncContext.throwIfNotInThisSynchronizationContext(); - - if (watcher == null) { - return; - } - - if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) { - throwIfParentContextsNotEmpty(watcher); - } - - watcher.cancelled = true; - XdsResourceType type = watcher.type; - String resourceName = watcher.resourceName; - - if (getWatchers(type).remove(resourceName) == null) { - logger.log(DEBUG, "Trying to cancel watcher {0}, but it isn't watched", watcher); - return; - } - - xdsClient.cancelXdsResourceWatch(type, resourceName, watcher); - } - - private static void throwIfParentContextsNotEmpty(XdsWatcherBase watcher) { - if (watcher instanceof CdsWatcher) { - CdsWatcher cdsWatcher = (CdsWatcher) watcher; - if (!cdsWatcher.parentContexts.isEmpty()) { - String msg = String.format("CdsWatcher %s has parent contexts %s", - cdsWatcher.resourceName(), cdsWatcher.parentContexts); - throw new IllegalStateException(msg); - } - } else if (watcher instanceof EdsWatcher) { - EdsWatcher edsWatcher = (EdsWatcher) watcher; - if (!edsWatcher.parentContexts.isEmpty()) { - String msg = String.format("CdsWatcher %s has parent contexts %s", - edsWatcher.resourceName(), edsWatcher.parentContexts); - throw new IllegalStateException(msg); - } - } - } - public void shutdown() { syncContext.execute(() -> { for (TypeWatchers watchers : resourceWatchers.values()) { shutdownWatchersForType(watchers); } resourceWatchers.clear(); + subscriptions.clear(); }); } @@ -197,54 +133,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi private void releaseSubscription(ClusterSubscription subscription) { checkNotNull(subscription, "subscription"); - String clusterName = subscription.getClusterName(); syncContext.execute(() -> { if (subscription.closed) { return; } subscription.closed = true; - XdsWatcherBase cdsWatcher - = getWatchers(CLUSTER_RESOURCE).get(clusterName); - if (cdsWatcher == null) { + if (!subscriptions.remove(subscription)) { return; // shutdown() called } - cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription); maybePublishConfig(); }); } - private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) { - checkNotNull(root, "root"); - - cancelCdsWatcher(root, parentContext); - - if (!root.hasDataValue() || !root.parentContexts.isEmpty()) { - return; - } - - XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue(); - switch (cdsUpdate.clusterType()) { - case EDS: - String edsServiceName = root.getEdsServiceName(); - EdsWatcher edsWatcher = (EdsWatcher) getWatchers(ENDPOINT_RESOURCE).get(edsServiceName); - cancelEdsWatcher(edsWatcher, root); - break; - case AGGREGATE: - for (String cluster : cdsUpdate.prioritizedClusterNames()) { - CdsWatcher clusterWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(cluster); - if (clusterWatcher != null) { - cancelClusterWatcherTree(clusterWatcher, root); - } - } - break; - case LOGICAL_DNS: - // no eds needed, so everything happens in cancelCdsWatcher() - break; - default: - throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType()); - } - } - /** * Check if all resources have results, and if so, generate a new XdsConfig and send it to all * the watchers. @@ -274,27 +174,40 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @VisibleForTesting StatusOr buildUpdate() { + // Create a config and discard any watchers not accessed + WatcherTracer tracer = new WatcherTracer(resourceWatchers); + StatusOr config = buildUpdate( + tracer, listenerName, dataPlaneAuthority, subscriptions); + tracer.closeUnusedWatchers(); + return config; + } + + private static StatusOr buildUpdate( + WatcherTracer tracer, + String listenerName, + String dataPlaneAuthority, + Set subscriptions) { XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); // Iterate watchers and build the XdsConfig - // Will only be 1 listener and 1 route resource - RdsUpdateSupplier routeSource = null; - for (XdsWatcherBase ldsWatcher : - getWatchers(XdsListenerResource.getInstance()).values()) { - if (!ldsWatcher.getData().hasValue()) { - return StatusOr.fromStatus(ldsWatcher.getData().getStatus()); - } - XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue(); - builder.setListener(ldsUpdate); - routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(); + XdsWatcherBase ldsWatcher + = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName); + if (ldsWatcher == null) { + return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + "Bug: No listener watcher found for " + listenerName)); } + if (!ldsWatcher.getData().hasValue()) { + return StatusOr.fromStatus(ldsWatcher.getData().getStatus()); + } + XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue(); + builder.setListener(ldsUpdate); + RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer); if (routeSource == null) { return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( "Bug: No route source found for listener " + dataPlaneAuthority)); } - StatusOr statusOrRdsUpdate = routeSource.getRdsUpdate(); if (!statusOrRdsUpdate.hasValue()) { return StatusOr.fromStatus(statusOrRdsUpdate.getStatus()); @@ -312,8 +225,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi Map> clusters = new HashMap<>(); LinkedHashSet ancestors = new LinkedHashSet<>(); - for (String cluster : getWatchers(CLUSTER_RESOURCE).keySet()) { - addConfigForCluster(clusters, cluster, ancestors); + for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) { + addConfigForCluster(clusters, cluster, ancestors, tracer); + } + for (ClusterSubscription subscription : subscriptions) { + addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer); } for (Map.Entry> me : clusters.entrySet()) { builder.addCluster(me.getKey(), me.getValue()); @@ -335,11 +251,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return tTypeWatchers.watchers; } - private void addConfigForCluster( + private static void addConfigForCluster( Map> clusters, String clusterName, @SuppressWarnings("NonApiType") // Need order-preserving set for errors - LinkedHashSet ancestors) { + LinkedHashSet ancestors, + WatcherTracer tracer) { if (clusters.containsKey(clusterName)) { return; } @@ -355,7 +272,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return; } - CdsWatcher cdsWatcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); + CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName); StatusOr cdsWatcherDataOr = cdsWatcher.getData(); if (!cdsWatcherDataOr.hasValue()) { clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); @@ -371,7 +288,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi LinkedHashSet leafNames = new LinkedHashSet(); ancestors.add(clusterName); for (String childCluster : cdsUpdate.prioritizedClusterNames()) { - addConfigForCluster(clusters, childCluster, ancestors); + addConfigForCluster(clusters, childCluster, ancestors, tracer); StatusOr config = clusters.get(childCluster); if (!config.hasValue()) { clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( @@ -392,7 +309,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi break; case EDS: XdsWatcherBase edsWatcher = - getWatchers(ENDPOINT_RESOURCE).get(cdsWatcher.getEdsServiceName()); + tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName()); if (edsWatcher != null) { child = new EndpointConfig(edsWatcher.getData()); } else { @@ -412,53 +329,35 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } - @Override - public String toString() { - return logId.toString(); - } - - private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) { - EdsWatcher watcher - = (EdsWatcher) getWatchers(XdsEndpointResource.getInstance()).get(edsServiceName); - if (watcher != null) { - watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence + private void addRdsWatcher(String resourceName) { + if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) { return; } - addWatcher(new EdsWatcher(edsServiceName, parentContext)); + addWatcher(new RdsWatcher(resourceName)); } - private void addClusterWatcher(String clusterName, Object parentContext) { - CdsWatcher watcher = (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); - if (watcher != null) { - watcher.parentContexts.add(parentContext); + private void addEdsWatcher(String edsServiceName) { + if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) { return; } - addWatcher(new CdsWatcher(clusterName, parentContext)); + addWatcher(new EdsWatcher(edsServiceName)); } - private void updateRoutes(List virtualHosts, Object newParentContext, - List oldVirtualHosts, boolean sameParentContext) { - VirtualHost oldVirtualHost = - RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority); + private void addClusterWatcher(String clusterName) { + if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) { + return; + } + + addWatcher(new CdsWatcher(clusterName)); + } + + private void updateRoutes(List virtualHosts) { VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority); - Set newClusters = getClusterNamesFromVirtualHost(virtualHost); - Set oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost); - - if (sameParentContext) { - // Calculate diffs. - Set addedClusters = Sets.difference(newClusters, oldClusters); - Set deletedClusters = Sets.difference(oldClusters, newClusters); - - deletedClusters.forEach(watcher -> - cancelClusterWatcherTree(getCluster(watcher), newParentContext)); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); - } else { - newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext)); - } + newClusters.forEach((cluster) -> addClusterWatcher(cluster)); } private String nodeInfo() { @@ -489,10 +388,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return clusters; } - private CdsWatcher getCluster(String clusterName) { - return (CdsWatcher) getWatchers(CLUSTER_RESOURCE).get(clusterName); - } - private static class TypeWatchers { // Key is resource name final Map> watchers = new HashMap<>(); @@ -529,6 +424,64 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } } + /** State for tracing garbage collector. */ + private static final class WatcherTracer { + private final Map, TypeWatchers> resourceWatchers; + private final Map, TypeWatchers> usedWatchers; + + public WatcherTracer(Map, TypeWatchers> resourceWatchers) { + this.resourceWatchers = resourceWatchers; + + this.usedWatchers = new HashMap<>(); + for (XdsResourceType type : resourceWatchers.keySet()) { + usedWatchers.put(type, newTypeWatchers(type)); + } + } + + private static TypeWatchers newTypeWatchers( + XdsResourceType type) { + return new TypeWatchers(type); + } + + public XdsWatcherBase getWatcher( + XdsResourceType resourceType, String name) { + TypeWatchers typeWatchers = resourceWatchers.get(resourceType); + if (typeWatchers == null) { + return null; + } + assert typeWatchers.resourceType == resourceType; + @SuppressWarnings("unchecked") + TypeWatchers tTypeWatchers = (TypeWatchers) typeWatchers; + XdsWatcherBase watcher = tTypeWatchers.watchers.get(name); + if (watcher == null) { + return null; + } + @SuppressWarnings("unchecked") + TypeWatchers usedTypeWatchers = (TypeWatchers) usedWatchers.get(resourceType); + usedTypeWatchers.watchers.put(name, watcher); + return watcher; + } + + /** Shut down unused watchers. */ + public void closeUnusedWatchers() { + boolean changed = false; // Help out the GC by preferring old objects + for (XdsResourceType type : resourceWatchers.keySet()) { + TypeWatchers orig = resourceWatchers.get(type); + TypeWatchers used = usedWatchers.get(type); + for (String name : orig.watchers.keySet()) { + if (used.watchers.containsKey(name)) { + continue; + } + orig.watchers.get(name).close(); + changed = true; + } + } + if (changed) { + resourceWatchers.putAll(usedWatchers); + } + } + } + private abstract class XdsWatcherBase implements ResourceWatcher { private final XdsResourceType type; @@ -571,6 +524,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi maybePublishConfig(); } + public void close() { + xdsClient.cancelXdsResourceWatch(type, resourceName, this); + } + boolean missingResult() { return data == null; } @@ -633,24 +590,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi virtualHosts = httpConnectionManager.virtualHosts(); rdsName = httpConnectionManager.rdsName(); } - StatusOr activeRdsUpdate = getRouteSource().getRdsUpdate(); - List activeVirtualHosts = activeRdsUpdate.hasValue() - ? activeRdsUpdate.getValue().virtualHosts - : Collections.emptyList(); - - boolean changedRdsName = !Objects.equals(rdsName, this.rdsName); - if (changedRdsName) { - cleanUpRdsWatcher(); - } if (virtualHosts != null) { // No RDS watcher since we are getting RDS updates via LDS - updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null); + updateRoutes(virtualHosts); this.rdsName = null; - } else if (changedRdsName) { + } else { this.rdsName = rdsName; - addWatcher(new RdsWatcher(rdsName)); - logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + addRdsWatcher(rdsName); } setData(update); @@ -666,36 +613,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi checkArgument(resourceName().equals(resourceName), "Resource name does not match"); setDataAsStatus(Status.UNAVAILABLE.withDescription( toContextString() + " does not exist" + nodeInfo())); - cleanUpRdsWatcher(); rdsName = null; maybePublishConfig(); } - private void cleanUpRdsWatcher() { - RdsWatcher oldRdsWatcher = getRdsWatcher(); - if (oldRdsWatcher != null) { - cancelWatcher(oldRdsWatcher); - logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName); - - // Cleanup clusters (as appropriate) that had the old rds watcher as a parent - if (!oldRdsWatcher.hasDataValue()) { - return; - } - for (XdsWatcherBase watcher : - getWatchers(CLUSTER_RESOURCE).values()) { - cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher); - } - } - } - - private RdsWatcher getRdsWatcher() { + private RdsWatcher getRdsWatcher(WatcherTracer tracer) { if (rdsName == null) { return null; } - return (RdsWatcher) getWatchers(XdsRouteConfigureResource.getInstance()).get(rdsName); + return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName); } - public RdsUpdateSupplier getRouteSource() { + public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) { if (!hasDataValue()) { return this; } @@ -707,7 +636,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi if (virtualHosts != null) { return this; } - RdsWatcher rdsWatcher = getRdsWatcher(); + RdsWatcher rdsWatcher = getRdsWatcher(tracer); assert rdsWatcher != null; return rdsWatcher; } @@ -748,11 +677,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi if (cancelled) { return; } - List oldVirtualHosts = hasDataValue() - ? getData().getValue().virtualHosts - : Collections.emptyList(); setData(update); - updateRoutes(update.virtualHosts, this, oldVirtualHosts, true); + updateRoutes(update.virtualHosts); maybePublishConfig(); } @@ -766,11 +692,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class CdsWatcher extends XdsWatcherBase { - Set parentContexts = new HashSet<>(); - - CdsWatcher(String resourceName, Object parentContext) { + CdsWatcher(String resourceName) { super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); - this.parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -782,32 +705,16 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi switch (update.clusterType()) { case EDS: setData(update); - addEdsWatcher(getEdsServiceName(), this); + addEdsWatcher(getEdsServiceName()); break; case LOGICAL_DNS: setData(update); // no eds needed break; case AGGREGATE: - Object parentContext = this; - if (hasDataValue()) { - Set oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE - ? new HashSet<>(getData().getValue().prioritizedClusterNames()) - : Collections.emptySet(); - Set newNames = new HashSet<>(update.prioritizedClusterNames()); - - Set deletedClusters = Sets.difference(oldNames, newNames); - deletedClusters.forEach((cluster) - -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); - - setData(update); - Set addedClusters = Sets.difference(newNames, oldNames); - addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext)); - } else { - setData(update); - update.prioritizedClusterNames() - .forEach(name -> addClusterWatcher(name, parentContext)); - } + setData(update); + update.prioritizedClusterNames() + .forEach(name -> addClusterWatcher(name)); break; default: Status error = Status.UNAVAILABLE.withDescription( @@ -829,11 +736,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private class EdsWatcher extends XdsWatcherBase { - private final Set parentContexts = new HashSet<>(); - - private EdsWatcher(String resourceName, CdsWatcher parentContext) { + private EdsWatcher(String resourceName) { super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName")); - parentContexts.add(checkNotNull(parentContext, "parentContext")); } @Override @@ -844,9 +748,5 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi setData(checkNotNull(update, "update")); maybePublishConfig(); } - - void addParentContext(CdsWatcher parentContext) { - parentContexts.add(checkNotNull(parentContext, "parentContext")); - } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index da960e1e13..aea1ad66d7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -66,7 +66,9 @@ import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; import java.io.IOException; @@ -562,7 +564,39 @@ public class XdsDependencyManagerTest { } @Test - public void testCdsCycle() throws Exception { + public void testCdsDeleteUnsubscribesChild() throws Exception { + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isTrue(); + Map, Map> watches = + xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches.get(XdsEndpointResource.getInstance()).keySet()) + .containsExactly("eds_clusterA"); + + // Delete cluster + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + config = xdsUpdateCaptor.getValue().getValue(); + assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); + watches = xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches).doesNotContainKey(XdsEndpointResource.getInstance()); + } + + @Test + public void testCdsCycleReclaimed() throws Exception { RouteConfiguration routeConfig = XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA"); Map clusterMap = new HashMap<>(); @@ -575,6 +609,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + // The cycle is loaded and detected InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, serverName, serverName, nameResolverArgs, scheduler); @@ -582,6 +617,16 @@ public class XdsDependencyManagerTest { XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + + // Orphan the cycle and it is discarded + routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterC"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + inOrder.verify(xdsConfigWatcher).onUpdate(any()); + Map, Map> watches = + xdsClient.getSubscribedResourcesMetadataSnapshot().get(); + assertThat(watches.get(XdsClusterResource.getInstance()).keySet()).containsExactly("clusterC"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index ab966ce002..f5c40fa211 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -241,7 +241,7 @@ public class XdsNameResolverTest { resolver.shutdown(); if (xdsClient != null) { assertThat(xdsClient.ldsWatcher).isNull(); - assertThat(xdsClient.rdsWatcher).isNull(); + assertThat(xdsClient.rdsWatchers).isEmpty(); } } @@ -421,7 +421,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route1), @@ -438,13 +438,14 @@ public class XdsNameResolverTest { ArgumentCaptor.forClass(ResolutionResult.class); String alternativeRdsResource = "route-configuration-alter.googleapis.com"; xdsClient.deliverLdsUpdateForRdsName(alternativeRdsResource); - assertThat(xdsClient.rdsResource).isEqualTo(alternativeRdsResource); + assertThat(xdsClient.rdsWatchers.keySet()).contains(alternativeRdsResource); virtualHost = VirtualHost.create("virtualhost-alter", Collections.singletonList(AUTHORITY), Collections.singletonList(route2), ImmutableMap.of()); xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost)); createAndDeliverClusterUpdates(xdsClient, cluster2); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(alternativeRdsResource); // Two new service config updates triggered: // - with load balancing config being able to select cluster1 and cluster2 // - with load balancing config being able to select cluster2 only @@ -477,7 +478,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route), @@ -491,14 +492,14 @@ public class XdsNameResolverTest { reset(mockListener); xdsClient.deliverLdsResourceNotFound(); // revoke LDS resource - assertThat(xdsClient.rdsResource).isNull(); // stop subscribing to stale RDS resource + assertThat(xdsClient.rdsWatchers.keySet()).isEmpty(); // stop subscribing to stale RDS resource assertEmptyResolutionResult(expectedLdsResourceName); reset(mockListener); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); // No name resolution result until new RDS resource update is received. Do not use stale config verifyNoInteractions(mockListener); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); @@ -518,7 +519,7 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); - assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); + assertThat(xdsClient.rdsWatchers.keySet()).containsExactly(RDS_RESOURCE_NAME); VirtualHost virtualHost = VirtualHost.create("virtualhost", Collections.singletonList(AUTHORITY), Collections.singletonList(route), @@ -537,6 +538,7 @@ public class XdsNameResolverTest { // Simulate management server adds back the previously used RDS resource. reset(mockListener); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -2430,9 +2432,8 @@ public class XdsNameResolverTest { private class FakeXdsClient extends XdsClient { // Should never be subscribing to more than one LDS and RDS resource at any point of time. private String ldsResource; // should always be AUTHORITY - private String rdsResource; private ResourceWatcher ldsWatcher; - private ResourceWatcher rdsWatcher; + private final Map>> rdsWatchers = new HashMap<>(); private final Map>> cdsWatchers = new HashMap<>(); private final Map>> edsWatchers = new HashMap<>(); @@ -2457,10 +2458,8 @@ public class XdsNameResolverTest { ldsWatcher = (ResourceWatcher) watcher; break; case "RDS": - assertThat(rdsResource).isNull(); - assertThat(rdsWatcher).isNull(); - rdsResource = resourceName; - rdsWatcher = (ResourceWatcher) watcher; + rdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); break; case "CDS": cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) @@ -2488,10 +2487,12 @@ public class XdsNameResolverTest { ldsWatcher = null; break; case "RDS": - assertThat(rdsResource).isNotNull(); - assertThat(rdsWatcher).isNotNull(); - rdsResource = null; - rdsWatcher = null; + assertThat(rdsWatchers).containsKey(resourceName); + assertThat(rdsWatchers.get(resourceName)).contains(watcher); + rdsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + if (rdsWatchers.get(resourceName).isEmpty()) { + rdsWatchers.remove(resourceName); + } break; case "CDS": assertThat(cdsWatchers).containsKey(resourceName); @@ -2659,9 +2660,6 @@ public class XdsNameResolverTest { void deliverRdsUpdateWithFaultInjection( String resourceName, @Nullable FaultConfig virtualHostFaultConfig, @Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) { - if (!resourceName.equals(rdsResource)) { - return; - } ImmutableMap overrideConfig = weightedClusterFaultConfig == null ? ImmutableMap.of() : ImmutableMap.of( @@ -2690,18 +2688,19 @@ public class XdsNameResolverTest { Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); - createAndDeliverClusterUpdates(this, cluster1); - }); + deliverRdsUpdate(resourceName, virtualHost); + createAndDeliverClusterUpdates(this, cluster1); } void deliverRdsUpdate(String resourceName, List virtualHosts) { - if (!resourceName.equals(rdsResource)) { + if (!rdsWatchers.containsKey(resourceName)) { return; } syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); + RdsUpdate update = new RdsUpdate(virtualHosts); + List> resourceWatchers = + ImmutableList.copyOf(rdsWatchers.get(resourceName)); + resourceWatchers.forEach(w -> w.onChanged(update)); }); } @@ -2710,11 +2709,13 @@ public class XdsNameResolverTest { } void deliverRdsResourceNotFound(String resourceName) { - if (!resourceName.equals(rdsResource)) { + if (!rdsWatchers.containsKey(resourceName)) { return; } syncContext.execute(() -> { - rdsWatcher.onResourceDoesNotExist(rdsResource); + List> resourceWatchers = + ImmutableList.copyOf(rdsWatchers.get(resourceName)); + resourceWatchers.forEach(w -> w.onResourceDoesNotExist(resourceName)); }); } @@ -2747,12 +2748,10 @@ public class XdsNameResolverTest { ldsWatcher.onError(error); }); } - if (rdsWatcher != null) { - syncContext.execute(() -> { - rdsWatcher.onError(error); - }); - } syncContext.execute(() -> { + rdsWatchers.values().stream() + .flatMap(List::stream) + .forEach(w -> w.onError(error)); cdsWatchers.values().stream() .flatMap(List::stream) .forEach(w -> w.onError(error));