From 2604ce8a551244849f5d22dcefe99a7b2b474117 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 13 Jun 2025 13:26:51 -0700 Subject: [PATCH] xds: XdsNR should be subscribing to clusters with XdsDepManager This is missing behavior defined in gRFC A74: > As per gRFC A31, the ConfigSelector gives each RPC a ref to the > cluster that was selected for it to ensure that the cluster is not > removed from the xds_cluster_manager LB policy config before the RPC > is done with its LB picks. These cluster refs will also hold a > subscription for the cluster from the XdsDependencyManager, so that > the XdsDependencyManager will not stop watching the cluster resource > until the cluster is removed from the xds_cluster_manager LB policy > config. Without the logic, RPCs can race and see the error: > INTERNAL: CdsLb for cluster0: Unable to find non-dynamic root cluster Fixes #12152. This fixes the regression introduced in 297ab05e --- .../java/io/grpc/xds/XdsNameResolver.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 37a8e19ef3..25918facf7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -548,7 +548,7 @@ final class XdsNameResolver extends NameResolver { if (clusterRefs.get(cluster).refCount.get() != 0) { throw new AssertionError(); } - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); if (resolveState.lastConfigOrStatus.hasValue()) { updateResolutionResult(resolveState.lastConfigOrStatus.getValue()); } else { @@ -793,9 +793,13 @@ final class XdsNameResolver extends NameResolver { clusterRefs.get(cluster).refCount.incrementAndGet(); } else { if (clusterNameMap.containsKey(cluster)) { + assert cluster.startsWith("cluster:"); + XdsConfig.Subscription subscription = + xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length())); clusterRefs.put( cluster, - ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster))); + ClusterRefState.forCluster( + new AtomicInteger(1), clusterNameMap.get(cluster), subscription)); } if (rlsPluginConfigMap.containsKey(cluster)) { clusterRefs.put( @@ -826,7 +830,7 @@ final class XdsNameResolver extends NameResolver { for (String cluster : deletedClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); if (count == 0) { - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); shouldUpdateResult = true; } } @@ -879,7 +883,7 @@ final class XdsNameResolver extends NameResolver { for (String cluster : existingClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); if (count == 0) { - clusterRefs.remove(cluster); + clusterRefs.remove(cluster).close(); } } existingClusters = null; @@ -965,15 +969,18 @@ final class XdsNameResolver extends NameResolver { final String traditionalCluster; @Nullable final RlsPluginConfig rlsPluginConfig; + @Nullable + final XdsConfig.Subscription subscription; private ClusterRefState( AtomicInteger refCount, @Nullable String traditionalCluster, - @Nullable RlsPluginConfig rlsPluginConfig) { + @Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) { this.refCount = refCount; checkArgument(traditionalCluster == null ^ rlsPluginConfig == null, "There must be exactly one non-null value in traditionalCluster and pluginConfig"); this.traditionalCluster = traditionalCluster; this.rlsPluginConfig = rlsPluginConfig; + this.subscription = subscription; } private Map toLbPolicy() { @@ -993,12 +1000,21 @@ final class XdsNameResolver extends NameResolver { } } - static ClusterRefState forCluster(AtomicInteger refCount, String name) { - return new ClusterRefState(refCount, name, null); + private void close() { + if (subscription != null) { + subscription.close(); + } } - static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) { - return new ClusterRefState(refCount, null, rlsPluginConfig); + static ClusterRefState forCluster( + AtomicInteger refCount, String name, XdsConfig.Subscription subscription) { + return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription")); + } + + static ClusterRefState forRlsPlugin( + AtomicInteger refCount, + RlsPluginConfig rlsPluginConfig) { + return new ClusterRefState(refCount, null, rlsPluginConfig, null); } } }