xds: XdsNR should be subscribing to clusters with XdsDepManager

This is missing behavior defined in gRFC A74:

> As per gRFC A31, the ConfigSelector gives each RPC a ref to the
> cluster that was selected for it to ensure that the cluster is not
> removed from the xds_cluster_manager LB policy config before the RPC
> is done with its LB picks. These cluster refs will also hold a
> subscription for the cluster from the XdsDependencyManager, so that
> the XdsDependencyManager will not stop watching the cluster resource
> until the cluster is removed from the xds_cluster_manager LB policy
> config.

Without the logic, RPCs can race and see the error:

> INTERNAL: CdsLb for cluster0: Unable to find non-dynamic root cluster

Fixes #12152. This fixes the regression introduced in 297ab05e
This commit is contained in:
Eric Anderson 2025-06-13 13:26:51 -07:00
parent 1c43098990
commit 2604ce8a55
1 changed files with 25 additions and 9 deletions

View File

@ -548,7 +548,7 @@ final class XdsNameResolver extends NameResolver {
if (clusterRefs.get(cluster).refCount.get() != 0) {
throw new AssertionError();
}
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
if (resolveState.lastConfigOrStatus.hasValue()) {
updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
} else {
@ -793,9 +793,13 @@ final class XdsNameResolver extends NameResolver {
clusterRefs.get(cluster).refCount.incrementAndGet();
} else {
if (clusterNameMap.containsKey(cluster)) {
assert cluster.startsWith("cluster:");
XdsConfig.Subscription subscription =
xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length()));
clusterRefs.put(
cluster,
ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
ClusterRefState.forCluster(
new AtomicInteger(1), clusterNameMap.get(cluster), subscription));
}
if (rlsPluginConfigMap.containsKey(cluster)) {
clusterRefs.put(
@ -826,7 +830,7 @@ final class XdsNameResolver extends NameResolver {
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
shouldUpdateResult = true;
}
}
@ -879,7 +883,7 @@ final class XdsNameResolver extends NameResolver {
for (String cluster : existingClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
if (count == 0) {
clusterRefs.remove(cluster);
clusterRefs.remove(cluster).close();
}
}
existingClusters = null;
@ -965,15 +969,18 @@ final class XdsNameResolver extends NameResolver {
final String traditionalCluster;
@Nullable
final RlsPluginConfig rlsPluginConfig;
@Nullable
final XdsConfig.Subscription subscription;
private ClusterRefState(
AtomicInteger refCount, @Nullable String traditionalCluster,
@Nullable RlsPluginConfig rlsPluginConfig) {
@Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) {
this.refCount = refCount;
checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
"There must be exactly one non-null value in traditionalCluster and pluginConfig");
this.traditionalCluster = traditionalCluster;
this.rlsPluginConfig = rlsPluginConfig;
this.subscription = subscription;
}
private Map<String, ?> toLbPolicy() {
@ -993,12 +1000,21 @@ final class XdsNameResolver extends NameResolver {
}
}
static ClusterRefState forCluster(AtomicInteger refCount, String name) {
return new ClusterRefState(refCount, name, null);
private void close() {
if (subscription != null) {
subscription.close();
}
}
static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig);
static ClusterRefState forCluster(
AtomicInteger refCount, String name, XdsConfig.Subscription subscription) {
return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription"));
}
static ClusterRefState forRlsPlugin(
AtomicInteger refCount,
RlsPluginConfig rlsPluginConfig) {
return new ClusterRefState(refCount, null, rlsPluginConfig, null);
}
}
}