diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index c92f592ebc..0fb7cf1590 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -33,6 +33,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver; import io.grpc.NameResolver.ResolutionResult; import io.grpc.Status; +import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; @@ -657,79 +658,84 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { @Override public void onResult(final ResolutionResult resolutionResult) { - class NameResolved implements Runnable { - @Override - public void run() { - if (shutdown) { - return; - } - backoffPolicy = null; // reset backoff sequence if succeeded - // Arbitrary priority notation for all DNS-resolved endpoints. - String priorityName = priorityName(name, 0); // value doesn't matter - List addresses = new ArrayList<>(); - for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { - // No weight attribute is attached, all endpoint-level LB policy should be able - // to handle such it. - String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); - Attributes attr = eag.getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) - .build(); - eag = new EquivalentAddressGroup(eag.getAddresses(), attr); - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( - name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); - handleEndpointResourceUpdate(); - } - } + syncContext.execute(() -> onResult2(resolutionResult)); + } - syncContext.execute(new NameResolved()); + @Override + public Status onResult2(final ResolutionResult resolutionResult) { + if (shutdown) { + return Status.OK; + } + // Arbitrary priority notation for all DNS-resolved endpoints. + String priorityName = priorityName(name, 0); // value doesn't matter + List addresses = new ArrayList<>(); + StatusOr> addressesOrError = + resolutionResult.getAddressesOrError(); + if (addressesOrError.hasValue()) { + backoffPolicy = null; // reset backoff sequence if succeeded + for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { + // No weight attribute is attached, all endpoint-level LB policy should be able + // to handle such it. + String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); + Attributes attr = eag.getAttributes().toBuilder() + .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) + .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) + .build(); + eag = new EquivalentAddressGroup(eag.getAddresses(), attr); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); + addresses.add(eag); + } + PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( + name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + lbRegistry, Collections.emptyList()); + status = Status.OK; + resolved = true; + result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); + handleEndpointResourceUpdate(); + return Status.OK; + } else { + handleErrorInSyncContext(addressesOrError.getStatus()); + return addressesOrError.getStatus(); + } } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - status = error; - // NameResolver.Listener API cannot distinguish between address-not-found and - // transient errors. If the error occurs in the first resolution, treat it as - // address not found. Otherwise, either there is previously resolved addresses - // previously encountered error, propagate the error to downstream/upstream and - // let downstream/upstream handle it. - if (!resolved) { - resolved = true; - handleEndpointResourceUpdate(); - } else { - handleEndpointResolutionError(); - } - if (scheduledRefresh != null && scheduledRefresh.isPending()) { - return; - } - if (backoffPolicy == null) { - backoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = backoffPolicy.nextBackoffNanos(); - logger.log(XdsLogLevel.DEBUG, + syncContext.execute(() -> handleErrorInSyncContext(error)); + } + + private void handleErrorInSyncContext(final Status error) { + if (shutdown) { + return; + } + status = error; + // NameResolver.Listener API cannot distinguish between address-not-found and + // transient errors. If the error occurs in the first resolution, treat it as + // address not found. Otherwise, either there is previously resolved addresses + // previously encountered error, propagate the error to downstream/upstream and + // let downstream/upstream handle it. + if (!resolved) { + resolved = true; + handleEndpointResourceUpdate(); + } else { + handleEndpointResolutionError(); + } + if (scheduledRefresh != null && scheduledRefresh.isPending()) { + return; + } + if (backoffPolicy == null) { + backoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = backoffPolicy.nextBackoffNanos(); + logger.log(XdsLogLevel.DEBUG, "Logical DNS resolver for cluster {0} encountered name resolution " - + "error: {1}, scheduling DNS resolution backoff for {2} ns", + + "error: {1}, scheduling DNS resolution backoff for {2} ns", name, error, delayNanos); - scheduledRefresh = + scheduledRefresh = syncContext.schedule( - new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, - timeService); - } - }); + new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, + timeService); } } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index d0176d7aa3..d701f281c0 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -200,6 +200,7 @@ public class ClusterResolverLoadBalancerTest { private ArgumentCaptor pickerCaptor; private int xdsClientRefs; private ClusterResolverLoadBalancer loadBalancer; + private NameResolverProvider fakeNameResolverProvider; @Before public void setUp() throws URISyntaxException { @@ -216,7 +217,8 @@ public class ClusterResolverLoadBalancerTest { .setServiceConfigParser(mock(ServiceConfigParser.class)) .setChannelLogger(mock(ChannelLogger.class)) .build(); - nsRegistry.register(new FakeNameResolverProvider()); + fakeNameResolverProvider = new FakeNameResolverProvider(false); + nsRegistry.register(fakeNameResolverProvider); when(helper.getNameResolverRegistry()).thenReturn(nsRegistry); when(helper.getNameResolverArgs()).thenReturn(args); when(helper.getSynchronizationContext()).thenReturn(syncContext); @@ -826,6 +828,17 @@ public class ClusterResolverLoadBalancerTest { @Test public void onlyLogicalDnsCluster_endpointsResolved() { + do_onlyLogicalDnsCluster_endpointsResolved(); + } + + @Test + public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() { + nsRegistry.deregister(fakeNameResolverProvider); + nsRegistry.register(new FakeNameResolverProvider(true)); + do_onlyLogicalDnsCluster_endpointsResolved(); + } + + void do_onlyLogicalDnsCluster_endpointsResolved() { ClusterResolverConfig config = new ClusterResolverConfig( Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); @@ -854,7 +867,6 @@ public class ClusterResolverLoadBalancerTest { .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); assertThat(childBalancer.addresses.get(1).getAttributes() .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); - } @Test @@ -874,37 +886,48 @@ public class ClusterResolverLoadBalancerTest { } @Test - public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { + public void resolutionError_backoffAndRefresh() { + do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); + } + + @Test + public void oldListenerCallback_resolutionError_backoffAndRefresh() { + nsRegistry.deregister(fakeNameResolverProvider); + nsRegistry.register(new FakeNameResolverProvider(true)); + do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); + } + + void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, - backoffPolicy1, backoffPolicy2); + backoffPolicy1, backoffPolicy2); ClusterResolverConfig config = new ClusterResolverConfig( - Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false); deliverLbConfig(config); FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); assertThat(childBalancers).isEmpty(); Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); resolver.deliverError(error); inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); assertPicker(pickerCaptor.getValue(), error, null); assertThat(resolver.refreshCount).isEqualTo(0); inOrder.verify(backoffPolicyProvider).get(); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(1L); + .isEqualTo(1L); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertThat(resolver.refreshCount).isEqualTo(1); error = Status.UNKNOWN.withDescription("I am lost"); resolver.deliverError(error); inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertPicker(pickerCaptor.getValue(), error, null); assertThat(fakeClock.getPendingTasks()).hasSize(1); assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(10L); + .isEqualTo(10L); fakeClock.forwardTime(10L, TimeUnit.SECONDS); assertThat(resolver.refreshCount).isEqualTo(2); @@ -914,7 +937,7 @@ public class ClusterResolverLoadBalancerTest { resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); assertThat(childBalancers).hasSize(1); assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), - Iterables.getOnlyElement(childBalancers).addresses); + Iterables.getOnlyElement(childBalancers).addresses); assertThat(fakeClock.getPendingTasks()).isEmpty(); inOrder.verifyNoMoreInteractions(); @@ -1319,10 +1342,18 @@ public class ClusterResolverLoadBalancerTest { } private class FakeNameResolverProvider extends NameResolverProvider { + private final boolean useOldListenerCallback; + + private FakeNameResolverProvider(boolean useOldListenerCallback) { + this.useOldListenerCallback = useOldListenerCallback; + } + @Override public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { assertThat(targetUri.getScheme()).isEqualTo("dns"); - FakeNameResolver resolver = new FakeNameResolver(targetUri); + FakeNameResolver resolver = useOldListenerCallback + ? new FakeNameResolverUsingOldListenerCallback(targetUri) + : new FakeNameResolver(targetUri); resolvers.add(resolver); return resolver; } @@ -1343,9 +1374,10 @@ public class ClusterResolverLoadBalancerTest { } } + private class FakeNameResolver extends NameResolver { private final URI targetUri; - private Listener2 listener; + protected Listener2 listener; private int refreshCount; private FakeNameResolver(URI targetUri) { @@ -1372,12 +1404,33 @@ public class ClusterResolverLoadBalancerTest { resolvers.remove(this); } - private void deliverEndpointAddresses(List addresses) { - listener.onResult(ResolutionResult.newBuilder() - .setAddressesOrError(StatusOr.fromValue(addresses)).build()); + protected void deliverEndpointAddresses(List addresses) { + syncContext.execute(() -> { + Status ret = listener.onResult2(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(addresses)).build()); + assertThat(ret.getCode()).isEqualTo(Status.Code.OK); + }); } - private void deliverError(Status error) { + protected void deliverError(Status error) { + syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromStatus(error)).build())); + } + } + + private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver { + private FakeNameResolverUsingOldListenerCallback(URI targetUri) { + super(targetUri); + } + + @Override + protected void deliverEndpointAddresses(List addresses) { + listener.onResult(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(addresses)).build()); + } + + @Override + protected void deliverError(Status error) { listener.onError(error); } }