diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 70833416d5..7855468ee6 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -255,6 +255,13 @@ final class CachingRlsLbClient { } } + Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) { + synchronized (lock) { + return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory( + childLbResolvedAddressFactory); + } + } + /** * Convert the status to UNAVAILABLE and enhance the error message. * @param status status as provided by server diff --git a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java index 4d6ceed923..226176d25f 100644 --- a/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java +++ b/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java @@ -31,6 +31,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; import io.grpc.internal.ObjectPool; import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider; import io.grpc.rls.RlsProtoData.RouteLookupConfig; @@ -211,7 +212,7 @@ final class LbPolicyConfiguration { private final ChildLoadBalancerHelperProvider childLbHelperProvider; private final ChildLbStatusListener childLbStatusListener; private final ChildLoadBalancingPolicy childPolicy; - private final ResolvedAddressFactory childLbResolvedAddressFactory; + private ResolvedAddressFactory childLbResolvedAddressFactory; public RefCountedChildPolicyWrapperFactory( ChildLoadBalancingPolicy childPolicy, @@ -229,6 +230,19 @@ final class LbPolicyConfiguration { childLbHelperProvider.init(); } + Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) { + this.childLbResolvedAddressFactory = childLbResolvedAddressFactory; + Status status = Status.OK; + for (RefCountedChildPolicyWrapper wrapper : childPolicyMap.values()) { + Status newStatus = + wrapper.childPolicyWrapper.acceptResolvedAddressFactory(childLbResolvedAddressFactory); + if (!newStatus.isOk()) { + status = newStatus; + } + } + return status; + } + ChildPolicyWrapper createOrGet(String target) { // TODO(creamsoup) check if the target is valid or not RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target); @@ -277,6 +291,7 @@ final class LbPolicyConfiguration { private final String target; private final ChildPolicyReportingHelper helper; private final LoadBalancer lb; + private final Object childLbConfig; private volatile SubchannelPicker picker; private ConnectivityState state; @@ -295,14 +310,14 @@ final class LbPolicyConfiguration { .parseLoadBalancingPolicyConfig( childPolicy.getEffectiveChildPolicy(target)); this.lb = lbProvider.newLoadBalancer(helper); + this.childLbConfig = lbConfig.getConfig(); helper.getChannelLogger().log( - ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig()); + ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", childLbConfig); helper.getSynchronizationContext().execute( new Runnable() { @Override public void run() { - if (!lb.acceptResolvedAddresses( - childLbResolvedAddressFactory.create(lbConfig.getConfig())).isOk()) { + if (!acceptResolvedAddressFactory(childLbResolvedAddressFactory).isOk()) { helper.refreshNameResolution(); } lb.requestConnection(); @@ -310,6 +325,11 @@ final class LbPolicyConfiguration { }); } + Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) { + helper.getSynchronizationContext().throwIfNotInThisSynchronizationContext(); + return lb.acceptResolvedAddresses(childLbResolvedAddressFactory.create(childLbConfig)); + } + String getTarget() { return target; } diff --git a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java index 81ef8fdb31..6e59e867e3 100644 --- a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java +++ b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java @@ -79,7 +79,9 @@ final class RlsLoadBalancer extends LoadBalancer { // not required. this.lbPolicyConfiguration = lbPolicyConfiguration; } - return Status.OK; + return routeLookupClient.acceptResolvedAddressFactory( + new ChildLbResolvedAddressFactory( + resolvedAddresses.getAddresses(), resolvedAddresses.getAttributes())); } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index c71e4dc255..58d1ff769f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -823,6 +823,9 @@ final class XdsNameResolver extends NameResolver { if (shouldUpdateResult && routingConfig != null) { updateResolutionResult(xdsConfig); shouldUpdateResult = false; + } else { + // Need to update at least once + shouldUpdateResult = true; } // Make newly added clusters selectable by config selector and deleted clusters no longer // selectable. @@ -993,7 +996,8 @@ final class XdsNameResolver extends NameResolver { .put("routeLookupConfig", rlsPluginConfig.config()) .put( "childPolicy", - ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of()))) + ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of( + "is_dynamic", true)))) .put("childPolicyConfigTargetFieldName", "cluster") .buildOrThrow(); return ImmutableMap.of("rls_experimental", rlsConfig); diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index f5c40fa211..7015a43f6e 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -449,7 +450,7 @@ public class XdsNameResolverTest { // Two new service config updates triggered: // - with load balancing config being able to select cluster1 and cluster2 // - with load balancing config being able to select cluster2 only - verify(mockListener, times(2)).onResult2(resultCaptor.capture()); + verify(mockListener, times(3)).onResult2(resultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Arrays.asList(cluster1, cluster2), (Map) resultCaptor.getAllValues().get(0).getServiceConfig().getConfig()); @@ -1070,7 +1071,9 @@ public class XdsNameResolverTest { assertCallSelectClusterResult(call1, configSelector, "another-cluster", 20.0); firstCall.deliverErrorStatus(); // completes previous call - verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); + // Two updates: one for XdsNameResolver releasing the cluster, and another for + // XdsDependencyManager updating the XdsConfig + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); assertServiceConfigForLoadBalancingConfig( Arrays.asList(cluster2, "another-cluster"), @@ -1100,7 +1103,7 @@ public class XdsNameResolverTest { ImmutableMap.of()))); // Two consecutive service config updates: one for removing clcuster1, // one for adding "another=cluster". - verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); assertServiceConfigForLoadBalancingConfig( Arrays.asList(cluster2, "another-cluster"), @@ -1155,7 +1158,7 @@ public class XdsNameResolverTest { cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L), null, false), ImmutableMap.of()))); - verifyNoMoreInteractions(mockListener); // no cluster added/deleted + verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); assertCallSelectClusterResult(call1, configSelector, "another-cluster", 15.0); } @@ -1187,7 +1190,13 @@ public class XdsNameResolverTest { null, false), ImmutableMap.of()))); testCall.deliverErrorStatus(); - verifyNoMoreInteractions(mockListener); + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(1)); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(2)); + assertServiceConfigForLoadBalancingConfig( + Arrays.asList(cluster1, cluster2), resolutionResultCaptor.getAllValues().get(3)); } @SuppressWarnings("unchecked") @@ -1268,7 +1277,7 @@ public class XdsNameResolverTest { "routeLookupConfig", ImmutableMap.of("lookupService", "rls-cbt.googleapis.com"), "childPolicy", - ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of())), + ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of("is_dynamic", true))), "childPolicyConfigTargetFieldName", "cluster"); Map expectedClusterManagerLbConfig = ImmutableMap.of( @@ -1315,7 +1324,7 @@ public class XdsNameResolverTest { "routeLookupConfig", ImmutableMap.of("lookupService", "rls-cbt-2.googleapis.com"), "childPolicy", - ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of())), + ImmutableList.of(ImmutableMap.of("cds_experimental", ImmutableMap.of("is_dynamic", true))), "childPolicyConfigTargetFieldName", "cluster"); Map expectedClusterManagerLbConfig2 = ImmutableMap.of( @@ -1656,7 +1665,7 @@ public class XdsNameResolverTest { } private void assertClusterResolutionResult(CallInfo call, String expectedCluster) { - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, atLeast(1)).onResult2(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); assertCallSelectClusterResult(call, configSelector, expectedCluster, null); @@ -1744,6 +1753,13 @@ public class XdsNameResolverTest { return result.getAttributes().get(InternalConfigSelector.KEY); } + private static void assertServiceConfigForLoadBalancingConfig( + List clusters, ResolutionResult result) { + @SuppressWarnings("unchecked") + Map config = (Map) result.getServiceConfig().getConfig(); + assertServiceConfigForLoadBalancingConfig(clusters, config); + } + /** * Verifies the raw service config contains an xDS load balancing config for the given clusters. */ @@ -1847,7 +1863,9 @@ public class XdsNameResolverTest { + " \"lookupService\": \"rls.bigtable.google.com\"\n" + " },\n" + " \"childPolicy\": [\n" - + " {\"cds_experimental\": {}}\n" + + " {\"cds_experimental\": {\n" + + " \"is_dynamic\": true\n" + + " }}\n" + " ],\n" + " \"childPolicyConfigTargetFieldName\": \"cluster\"\n" + " }\n" @@ -2035,7 +2053,7 @@ public class XdsNameResolverTest { FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2051,7 +2069,7 @@ public class XdsNameResolverTest { FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2066,7 +2084,7 @@ public class XdsNameResolverTest { FaultConfig.FractionalPercent.perMillion(600_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(4)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2084,7 +2102,7 @@ public class XdsNameResolverTest { FaultConfig.FractionalPercent.perMillion(400_000)), null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(5)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2119,7 +2137,7 @@ public class XdsNameResolverTest { httpFilterFaultConfig = FaultConfig.create( FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2130,7 +2148,7 @@ public class XdsNameResolverTest { httpFilterFaultConfig = FaultConfig.create( FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2143,7 +2161,7 @@ public class XdsNameResolverTest { null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(4)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2156,7 +2174,7 @@ public class XdsNameResolverTest { null, null); xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(5)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2281,7 +2299,7 @@ public class XdsNameResolverTest { null); xdsClient.deliverLdsUpdateWithFaultInjection( cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, null); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, @@ -2298,7 +2316,7 @@ public class XdsNameResolverTest { xdsClient.deliverLdsUpdateWithFaultInjection( cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, weightedClusterFaultConfig); - verify(mockListener).onResult2(resolutionResultCaptor.capture()); + verify(mockListener, times(3)).onResult2(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,