Fix RLS regressions from XdsDepMan conversion

297ab05ef converted CDS to XdsDependencyManager. This caused three
regressions:

 * CdsLB2 as a RLS child would always fail with "Unable to find
   non-dynamic root cluster" because is_dynamic=true was missing in
   its service config
 * XdsNameResolver only propagated resolution updates when the clusters
   changed, so a CdsUpdate change would be ignored. This caused a hang
   for RLS even with is_dynamic=true. For non-RLS the lack config update
   broke the circuit breaking psm interop test. This would have been
   more severe if ClusterResolverLb had been converted to
   XdsDependenceManager, as it would have ignored EDS updates
 * RLS did not propagate resolution updates, so CdsLB2 even with
   is_dynamic=true the CdsUpdate for the new cluster would never arrive,
   causing a hang

b/428120265
b/427912384
This commit is contained in:
Eric Anderson 2025-06-29 20:05:03 -07:00
parent 2ee4f9b488
commit ca99a8c478
5 changed files with 76 additions and 25 deletions

View File

@ -255,6 +255,13 @@ final class CachingRlsLbClient {
}
}
Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
synchronized (lock) {
return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
childLbResolvedAddressFactory);
}
}
/**
* Convert the status to UNAVAILABLE and enhance the error message.
* @param status status as provided by server

View File

@ -31,6 +31,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.ObjectPool;
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
@ -211,7 +212,7 @@ final class LbPolicyConfiguration {
private final ChildLoadBalancerHelperProvider childLbHelperProvider;
private final ChildLbStatusListener childLbStatusListener;
private final ChildLoadBalancingPolicy childPolicy;
private final ResolvedAddressFactory childLbResolvedAddressFactory;
private ResolvedAddressFactory childLbResolvedAddressFactory;
public RefCountedChildPolicyWrapperFactory(
ChildLoadBalancingPolicy childPolicy,
@ -229,6 +230,19 @@ final class LbPolicyConfiguration {
childLbHelperProvider.init();
}
Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
this.childLbResolvedAddressFactory = childLbResolvedAddressFactory;
Status status = Status.OK;
for (RefCountedChildPolicyWrapper wrapper : childPolicyMap.values()) {
Status newStatus =
wrapper.childPolicyWrapper.acceptResolvedAddressFactory(childLbResolvedAddressFactory);
if (!newStatus.isOk()) {
status = newStatus;
}
}
return status;
}
ChildPolicyWrapper createOrGet(String target) {
// TODO(creamsoup) check if the target is valid or not
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
@ -277,6 +291,7 @@ final class LbPolicyConfiguration {
private final String target;
private final ChildPolicyReportingHelper helper;
private final LoadBalancer lb;
private final Object childLbConfig;
private volatile SubchannelPicker picker;
private ConnectivityState state;
@ -295,14 +310,14 @@ final class LbPolicyConfiguration {
.parseLoadBalancingPolicyConfig(
childPolicy.getEffectiveChildPolicy(target));
this.lb = lbProvider.newLoadBalancer(helper);
this.childLbConfig = lbConfig.getConfig();
helper.getChannelLogger().log(
ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig());
ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", childLbConfig);
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
if (!lb.acceptResolvedAddresses(
childLbResolvedAddressFactory.create(lbConfig.getConfig())).isOk()) {
if (!acceptResolvedAddressFactory(childLbResolvedAddressFactory).isOk()) {
helper.refreshNameResolution();
}
lb.requestConnection();
@ -310,6 +325,11 @@ final class LbPolicyConfiguration {
});
}
Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
helper.getSynchronizationContext().throwIfNotInThisSynchronizationContext();
return lb.acceptResolvedAddresses(childLbResolvedAddressFactory.create(childLbConfig));
}
String getTarget() {
return target;
}

View File

@ -79,7 +79,9 @@ final class RlsLoadBalancer extends LoadBalancer {
// not required.
this.lbPolicyConfiguration = lbPolicyConfiguration;
}
return Status.OK;
return routeLookupClient.acceptResolvedAddressFactory(
new ChildLbResolvedAddressFactory(
resolvedAddresses.getAddresses(), resolvedAddresses.getAttributes()));
}
@Override

View File

@ -823,6 +823,9 @@ final class XdsNameResolver extends NameResolver {
if (shouldUpdateResult && routingConfig != null) {
updateResolutionResult(xdsConfig);
shouldUpdateResult = false;
} else {
// Need to update at least once
shouldUpdateResult = true;
}
// Make newly added clusters selectable by config selector and deleted clusters no longer
// selectable.
@ -993,7 +996,8 @@ final class XdsNameResolver extends NameResolver {
.put("routeLookupConfig", rlsPluginConfig.config())
.put(
"childPolicy",
ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of(
"is_dynamic", true))))
.put("childPolicyConfigTargetFieldName", "cluster")
.buildOrThrow();
return ImmutableMap.of("rls_experimental", rlsConfig);

View File

@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -449,7 +450,7 @@ public class XdsNameResolverTest {
// Two new service config updates triggered:
// - with load balancing config being able to select cluster1 and cluster2
// - with load balancing config being able to select cluster2 only
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
verify(mockListener, times(3)).onResult2(resultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster1, cluster2),
(Map<String, ?>) resultCaptor.getAllValues().get(0).getServiceConfig().getConfig());
@ -1070,7 +1071,9 @@ public class XdsNameResolverTest {
assertCallSelectClusterResult(call1, configSelector, "another-cluster", 20.0);
firstCall.deliverErrorStatus(); // completes previous call
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
// Two updates: one for XdsNameResolver releasing the cluster, and another for
// XdsDependencyManager updating the XdsConfig
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster2, "another-cluster"),
@ -1100,7 +1103,7 @@ public class XdsNameResolverTest {
ImmutableMap.of())));
// Two consecutive service config updates: one for removing clcuster1,
// one for adding "another=cluster".
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster2, "another-cluster"),
@ -1155,7 +1158,7 @@ public class XdsNameResolverTest {
cluster2, Collections.emptyList(),
TimeUnit.SECONDS.toNanos(15L), null, false),
ImmutableMap.of())));
verifyNoMoreInteractions(mockListener); // no cluster added/deleted
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
assertCallSelectClusterResult(call1, configSelector, "another-cluster", 15.0);
}
@ -1187,7 +1190,13 @@ public class XdsNameResolverTest {
null, false),
ImmutableMap.of())));
testCall.deliverErrorStatus();
verifyNoMoreInteractions(mockListener);
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(1));
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(2));
assertServiceConfigForLoadBalancingConfig(
Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(3));
}
@SuppressWarnings("unchecked")
@ -1268,7 +1277,7 @@ public class XdsNameResolverTest {
"routeLookupConfig",
ImmutableMap.of("lookupService", "rls-cbt.googleapis.com"),
"childPolicy",
ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of())),
ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of("is_dynamic", true))),
"childPolicyConfigTargetFieldName",
"cluster");
Map<String, ?> expectedClusterManagerLbConfig = ImmutableMap.of(
@ -1315,7 +1324,7 @@ public class XdsNameResolverTest {
"routeLookupConfig",
ImmutableMap.of("lookupService", "rls-cbt-2.googleapis.com"),
"childPolicy",
ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of())),
ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of("is_dynamic", true))),
"childPolicyConfigTargetFieldName",
"cluster");
Map<String, ?> expectedClusterManagerLbConfig2 = ImmutableMap.of(
@ -1656,7 +1665,7 @@ public class XdsNameResolverTest {
}
private void assertClusterResolutionResult(CallInfo call, String expectedCluster) {
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, atLeast(1)).onResult2(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
assertCallSelectClusterResult(call, configSelector, expectedCluster, null);
@ -1744,6 +1753,13 @@ public class XdsNameResolverTest {
return result.getAttributes().get(InternalConfigSelector.KEY);
}
private static void assertServiceConfigForLoadBalancingConfig(
List<String> clusters, ResolutionResult result) {
@SuppressWarnings("unchecked")
Map<String, ?> config = (Map<String, ?>) result.getServiceConfig().getConfig();
assertServiceConfigForLoadBalancingConfig(clusters, config);
}
/**
* Verifies the raw service config contains an xDS load balancing config for the given clusters.
*/
@ -1847,7 +1863,9 @@ public class XdsNameResolverTest {
+ " \"lookupService\": \"rls.bigtable.google.com\"\n"
+ " },\n"
+ " \"childPolicy\": [\n"
+ " {\"cds_experimental\": {}}\n"
+ " {\"cds_experimental\": {\n"
+ " \"is_dynamic\": true\n"
+ " }}\n"
+ " ],\n"
+ " \"childPolicyConfigTargetFieldName\": \"cluster\"\n"
+ " }\n"
@ -2035,7 +2053,7 @@ public class XdsNameResolverTest {
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2051,7 +2069,7 @@ public class XdsNameResolverTest {
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(0)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2066,7 +2084,7 @@ public class XdsNameResolverTest {
FaultConfig.FractionalPercent.perMillion(600_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(4)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2084,7 +2102,7 @@ public class XdsNameResolverTest {
FaultConfig.FractionalPercent.perMillion(400_000)),
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(5)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2119,7 +2137,7 @@ public class XdsNameResolverTest {
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null, null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2130,7 +2148,7 @@ public class XdsNameResolverTest {
httpFilterFaultConfig = FaultConfig.create(
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null, null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2143,7 +2161,7 @@ public class XdsNameResolverTest {
null,
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(4)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2156,7 +2174,7 @@ public class XdsNameResolverTest {
null,
null);
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(5)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2281,7 +2299,7 @@ public class XdsNameResolverTest {
null);
xdsClient.deliverLdsUpdateWithFaultInjection(
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, null);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
@ -2298,7 +2316,7 @@ public class XdsNameResolverTest {
xdsClient.deliverLdsUpdateWithFaultInjection(
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig,
weightedClusterFaultConfig);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,