xds: Do RLS fallback policy eagar start (#12211)

The resource subscription to the fallback target was done only at the time of falling back, which can cause rpcs to fail. This change makes the fallback target to be subscribed and cached earlier, similar to C++ and go gRPC implementations.
This commit is contained in:
Kannan J 2025-07-24 11:28:32 +00:00 committed by GitHub
parent c4256add4d
commit 42e1829b37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 49 additions and 27 deletions

View File

@ -132,6 +132,7 @@ final class CachingRlsLbClient {
@GuardedBy("lock")
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
private final ChannelLogger logger;
private final ChildPolicyWrapper fallbackChildPolicyWrapper;
static {
MetricInstrumentRegistry metricInstrumentRegistry
@ -226,6 +227,13 @@ final class CachingRlsLbClient {
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());
// TODO(creamsoup) wait until lb is ready
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
if (defaultTarget != null && !defaultTarget.isEmpty()) {
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
} else {
fallbackChildPolicyWrapper = null;
}
gaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@ -1022,12 +1030,8 @@ final class CachingRlsLbClient {
}
}
private ChildPolicyWrapper fallbackChildPolicyWrapper;
/** Uses Subchannel connected to default target. */
private PickResult useFallback(PickSubchannelArgs args) {
// TODO(creamsoup) wait until lb is ready
startFallbackChildPolicy();
SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
if (picker == null) {
return PickResult.withNoResult();
@ -1052,17 +1056,6 @@ final class CachingRlsLbClient {
}
}
private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
synchronized (lock) {
if (fallbackChildPolicyWrapper != null) {
return;
}
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
}
}
// GuardedBy CachingRlsLbClient.lock
void close() {
synchronized (lock) { // Lock is already held, but ErrorProne can't tell

View File

@ -191,7 +191,9 @@ public class CachingRlsLbClientTest {
@After
public void tearDown() throws Exception {
rlsLbClient.close();
if (rlsLbClient != null) {
rlsLbClient.close();
}
assertWithMessage(
"On client shut down, RlsLoadBalancer must shut down with all its child loadbalancers.")
.that(lbProvider.loadBalancers).isEmpty();
@ -372,12 +374,14 @@ public class CachingRlsLbClientTest {
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
ArgumentCaptor<ConnectivityState> stateCaptor =
ArgumentCaptor.forClass(ConnectivityState.class);
inOrder.verify(helper, times(2))
inOrder.verify(helper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1);
// TRANSIENT_FAILURE is because the test setup pretends fallback is not available.
assertThat(stateCaptor.getAllValues())
.containsExactly(ConnectivityState.CONNECTING, ConnectivityState.READY);
.containsExactly(ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.CONNECTING,
ConnectivityState.READY);
Metadata headers = new Metadata();
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
assertThat(pickResult.getStatus().isOk()).isTrue();
@ -439,7 +443,7 @@ public class CachingRlsLbClientTest {
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
ArgumentCaptor<ConnectivityState> stateCaptor =
ArgumentCaptor.forClass(ConnectivityState.class);
verify(helper, times(4)).updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
verify(helper, times(5)).updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Metadata headers = new Metadata();
PickResult pickResult = getPickResultForCreate(pickerCaptor, headers);
@ -509,7 +513,7 @@ public class CachingRlsLbClientTest {
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
ArgumentCaptor<ConnectivityState> stateCaptor =
ArgumentCaptor.forClass(ConnectivityState.class);
inOrder.verify(helper, times(2))
inOrder.verify(helper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Metadata headers = new Metadata();
@ -699,6 +703,7 @@ public class CachingRlsLbClientTest {
// Shutdown
rlsLbClient.close();
rlsLbClient = null;
verify(mockGaugeRegistration).close();
}

View File

@ -201,7 +201,13 @@ public class RlsLoadBalancerTest {
@Test
public void lb_serverStatusCodeConversion() throws Exception {
deliverResolvedAddresses();
helper.getSynchronizationContext().execute(() -> {
try {
deliverResolvedAddresses();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -236,7 +242,13 @@ public class RlsLoadBalancerTest {
@Test
public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
deliverResolvedAddresses();
helper.getSynchronizationContext().execute(() -> {
try {
deliverResolvedAddresses();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -257,7 +269,7 @@ public class RlsLoadBalancerTest {
inOrder.verifyNoMoreInteractions();
assertThat(res.getStatus().isOk()).isTrue();
assertThat(subchannels).hasSize(1);
assertThat(subchannels).hasSize(2); // includes fallback sub-channel
FakeSubchannel searchSubchannel = subchannels.getLast();
assertThat(subchannelIsReady(searchSubchannel)).isFalse();
@ -277,7 +289,7 @@ public class RlsLoadBalancerTest {
// other rls picker itself is ready due to first channel.
assertThat(res.getStatus().isOk()).isTrue();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
assertThat(subchannels).hasSize(2);
assertThat(subchannels).hasSize(3); // includes fallback sub-channel
FakeSubchannel rescueSubchannel = subchannels.getLast();
// search subchannel is down, rescue subchannel is connecting
@ -393,7 +405,13 @@ public class RlsLoadBalancerTest {
public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
fakeThrottler.nextResult = true;
deliverResolvedAddresses();
helper.getSynchronizationContext().execute(() -> {
try {
deliverResolvedAddresses();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -535,7 +553,13 @@ public class RlsLoadBalancerTest {
@Test
public void lb_nameResolutionFailed() throws Exception {
deliverResolvedAddresses();
helper.getSynchronizationContext().execute(() -> {
try {
deliverResolvedAddresses();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -545,7 +569,7 @@ public class RlsLoadBalancerTest {
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
assertThat(subchannels).hasSize(1);
assertThat(subchannels).hasSize(2); // includes fallback sub-channel
FakeSubchannel searchSubchannel = subchannels.getLast();
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));