xds: ClusterResolverLoadBalancer handle update for both resolved addresses and errors via ResolutionResult (#11997)

This commit is contained in:
Kannan J 2025-04-04 16:38:29 +00:00 committed by GitHub
parent edc2bf7346
commit a13fca2bf2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 140 additions and 81 deletions

View File

@ -33,6 +33,7 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.NameResolver.ResolutionResult; import io.grpc.NameResolver.ResolutionResult;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy; import io.grpc.internal.BackoffPolicy;
@ -657,79 +658,84 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
@Override @Override
public void onResult(final ResolutionResult resolutionResult) { public void onResult(final ResolutionResult resolutionResult) {
class NameResolved implements Runnable { syncContext.execute(() -> onResult2(resolutionResult));
@Override }
public void run() {
if (shutdown) {
return;
}
backoffPolicy = null; // reset backoff sequence if succeeded
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
handleEndpointResourceUpdate();
}
}
syncContext.execute(new NameResolved()); @Override
public Status onResult2(final ResolutionResult resolutionResult) {
if (shutdown) {
return Status.OK;
}
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
StatusOr<List<EquivalentAddressGroup>> addressesOrError =
resolutionResult.getAddressesOrError();
if (addressesOrError.hasValue()) {
backoffPolicy = null; // reset backoff sequence if succeeded
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
handleEndpointResourceUpdate();
return Status.OK;
} else {
handleErrorInSyncContext(addressesOrError.getStatus());
return addressesOrError.getStatus();
}
} }
@Override @Override
public void onError(final Status error) { public void onError(final Status error) {
syncContext.execute(new Runnable() { syncContext.execute(() -> handleErrorInSyncContext(error));
@Override }
public void run() {
if (shutdown) { private void handleErrorInSyncContext(final Status error) {
return; if (shutdown) {
} return;
status = error; }
// NameResolver.Listener API cannot distinguish between address-not-found and status = error;
// transient errors. If the error occurs in the first resolution, treat it as // NameResolver.Listener API cannot distinguish between address-not-found and
// address not found. Otherwise, either there is previously resolved addresses // transient errors. If the error occurs in the first resolution, treat it as
// previously encountered error, propagate the error to downstream/upstream and // address not found. Otherwise, either there is previously resolved addresses
// let downstream/upstream handle it. // previously encountered error, propagate the error to downstream/upstream and
if (!resolved) { // let downstream/upstream handle it.
resolved = true; if (!resolved) {
handleEndpointResourceUpdate(); resolved = true;
} else { handleEndpointResourceUpdate();
handleEndpointResolutionError(); } else {
} handleEndpointResolutionError();
if (scheduledRefresh != null && scheduledRefresh.isPending()) { }
return; if (scheduledRefresh != null && scheduledRefresh.isPending()) {
} return;
if (backoffPolicy == null) { }
backoffPolicy = backoffPolicyProvider.get(); if (backoffPolicy == null) {
} backoffPolicy = backoffPolicyProvider.get();
long delayNanos = backoffPolicy.nextBackoffNanos(); }
logger.log(XdsLogLevel.DEBUG, long delayNanos = backoffPolicy.nextBackoffNanos();
logger.log(XdsLogLevel.DEBUG,
"Logical DNS resolver for cluster {0} encountered name resolution " "Logical DNS resolver for cluster {0} encountered name resolution "
+ "error: {1}, scheduling DNS resolution backoff for {2} ns", + "error: {1}, scheduling DNS resolution backoff for {2} ns",
name, error, delayNanos); name, error, delayNanos);
scheduledRefresh = scheduledRefresh =
syncContext.schedule( syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
timeService); timeService);
}
});
} }
} }
} }

View File

@ -200,6 +200,7 @@ public class ClusterResolverLoadBalancerTest {
private ArgumentCaptor<SubchannelPicker> pickerCaptor; private ArgumentCaptor<SubchannelPicker> pickerCaptor;
private int xdsClientRefs; private int xdsClientRefs;
private ClusterResolverLoadBalancer loadBalancer; private ClusterResolverLoadBalancer loadBalancer;
private NameResolverProvider fakeNameResolverProvider;
@Before @Before
public void setUp() throws URISyntaxException { public void setUp() throws URISyntaxException {
@ -216,7 +217,8 @@ public class ClusterResolverLoadBalancerTest {
.setServiceConfigParser(mock(ServiceConfigParser.class)) .setServiceConfigParser(mock(ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class)) .setChannelLogger(mock(ChannelLogger.class))
.build(); .build();
nsRegistry.register(new FakeNameResolverProvider()); fakeNameResolverProvider = new FakeNameResolverProvider(false);
nsRegistry.register(fakeNameResolverProvider);
when(helper.getNameResolverRegistry()).thenReturn(nsRegistry); when(helper.getNameResolverRegistry()).thenReturn(nsRegistry);
when(helper.getNameResolverArgs()).thenReturn(args); when(helper.getNameResolverArgs()).thenReturn(args);
when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getSynchronizationContext()).thenReturn(syncContext);
@ -826,6 +828,17 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void onlyLogicalDnsCluster_endpointsResolved() { public void onlyLogicalDnsCluster_endpointsResolved() {
do_onlyLogicalDnsCluster_endpointsResolved();
}
@Test
public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() {
nsRegistry.deregister(fakeNameResolverProvider);
nsRegistry.register(new FakeNameResolverProvider(true));
do_onlyLogicalDnsCluster_endpointsResolved();
}
void do_onlyLogicalDnsCluster_endpointsResolved() {
ClusterResolverConfig config = new ClusterResolverConfig( ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config); deliverLbConfig(config);
@ -854,7 +867,6 @@ public class ClusterResolverLoadBalancerTest {
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
assertThat(childBalancer.addresses.get(1).getAttributes() assertThat(childBalancer.addresses.get(1).getAttributes()
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
} }
@Test @Test
@ -874,37 +886,48 @@ public class ClusterResolverLoadBalancerTest {
} }
@Test @Test
public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { public void resolutionError_backoffAndRefresh() {
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
}
@Test
public void oldListenerCallback_resolutionError_backoffAndRefresh() {
nsRegistry.deregister(fakeNameResolverProvider);
nsRegistry.register(new FakeNameResolverProvider(true));
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
}
void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
backoffPolicy1, backoffPolicy2); backoffPolicy1, backoffPolicy2);
ClusterResolverConfig config = new ClusterResolverConfig( ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config); deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
resolver.deliverError(error); resolver.deliverError(error);
inOrder.verify(helper).updateBalancingState( inOrder.verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), error, null); assertPicker(pickerCaptor.getValue(), error, null);
assertThat(resolver.refreshCount).isEqualTo(0); assertThat(resolver.refreshCount).isEqualTo(0);
inOrder.verify(backoffPolicyProvider).get(); inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos(); inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
.isEqualTo(1L); .isEqualTo(1L);
fakeClock.forwardTime(1L, TimeUnit.SECONDS); fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertThat(resolver.refreshCount).isEqualTo(1); assertThat(resolver.refreshCount).isEqualTo(1);
error = Status.UNKNOWN.withDescription("I am lost"); error = Status.UNKNOWN.withDescription("I am lost");
resolver.deliverError(error); resolver.deliverError(error);
inOrder.verify(helper).updateBalancingState( inOrder.verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verify(backoffPolicy1).nextBackoffNanos(); inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertPicker(pickerCaptor.getValue(), error, null); assertPicker(pickerCaptor.getValue(), error, null);
assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
.isEqualTo(10L); .isEqualTo(10L);
fakeClock.forwardTime(10L, TimeUnit.SECONDS); fakeClock.forwardTime(10L, TimeUnit.SECONDS);
assertThat(resolver.refreshCount).isEqualTo(2); assertThat(resolver.refreshCount).isEqualTo(2);
@ -914,7 +937,7 @@ public class ClusterResolverLoadBalancerTest {
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
assertThat(childBalancers).hasSize(1); assertThat(childBalancers).hasSize(1);
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), assertAddressesEqual(Arrays.asList(endpoint1, endpoint2),
Iterables.getOnlyElement(childBalancers).addresses); Iterables.getOnlyElement(childBalancers).addresses);
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
@ -1319,10 +1342,18 @@ public class ClusterResolverLoadBalancerTest {
} }
private class FakeNameResolverProvider extends NameResolverProvider { private class FakeNameResolverProvider extends NameResolverProvider {
private final boolean useOldListenerCallback;
private FakeNameResolverProvider(boolean useOldListenerCallback) {
this.useOldListenerCallback = useOldListenerCallback;
}
@Override @Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
assertThat(targetUri.getScheme()).isEqualTo("dns"); assertThat(targetUri.getScheme()).isEqualTo("dns");
FakeNameResolver resolver = new FakeNameResolver(targetUri); FakeNameResolver resolver = useOldListenerCallback
? new FakeNameResolverUsingOldListenerCallback(targetUri)
: new FakeNameResolver(targetUri);
resolvers.add(resolver); resolvers.add(resolver);
return resolver; return resolver;
} }
@ -1343,9 +1374,10 @@ public class ClusterResolverLoadBalancerTest {
} }
} }
private class FakeNameResolver extends NameResolver { private class FakeNameResolver extends NameResolver {
private final URI targetUri; private final URI targetUri;
private Listener2 listener; protected Listener2 listener;
private int refreshCount; private int refreshCount;
private FakeNameResolver(URI targetUri) { private FakeNameResolver(URI targetUri) {
@ -1372,12 +1404,33 @@ public class ClusterResolverLoadBalancerTest {
resolvers.remove(this); resolvers.remove(this);
} }
private void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) { protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
listener.onResult(ResolutionResult.newBuilder() syncContext.execute(() -> {
.setAddressesOrError(StatusOr.fromValue(addresses)).build()); Status ret = listener.onResult2(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
assertThat(ret.getCode()).isEqualTo(Status.Code.OK);
});
} }
private void deliverError(Status error) { protected void deliverError(Status error) {
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromStatus(error)).build()));
}
}
private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver {
private FakeNameResolverUsingOldListenerCallback(URI targetUri) {
super(targetUri);
}
@Override
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
listener.onResult(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
}
@Override
protected void deliverError(Status error) {
listener.onError(error); listener.onError(error);
} }
} }