From 297ab05efeb0565c195518a819dfa851d1c0d62b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 11 Jun 2025 18:56:13 +0000 Subject: [PATCH] xds: Convert CdsLb to XdsDepManager I noticed we deviated from gRFC A37 in some ways. It turned out those were added to the gRFC later in https://github.com/grpc/proposal/pull/344: - NACKing empty aggregate clusters - Failing aggregate cluster when children could not be loaded - Recusion limit of 16. We had this behavior already, but it was ascribed to matching C++ There's disagreement on whether we should actually fail the aggregate cluster for bad children, so I'm preserving the pre-existing behavior for now. The code is now doing a depth-first leaf traversal, not breadth-first. This was odd to see, but the code was also pretty old, so the reasoning seems lost to history. Since we haven't seen more than a single level of aggregate clusters in practice, this wouldn't have been noticed by users. XdsDependencyManager.start() was created to guarantee that the callback could not be called before returning from the constructor. Otherwise XDS_CLUSTER_SUBSCRIPT_REGISTRY could potentially be null. --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 463 ++------ .../io/grpc/xds/CdsLoadBalancerProvider.java | 26 +- .../xds/RingHashLoadBalancerProvider.java | 2 +- .../java/io/grpc/xds/XdsClusterResource.java | 8 +- xds/src/main/java/io/grpc/xds/XdsConfig.java | 8 +- .../io/grpc/xds/XdsDependencyManager.java | 47 +- .../java/io/grpc/xds/XdsNameResolver.java | 9 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 1041 +++++++---------- .../grpc/xds/GrpcXdsClientImplTestBase.java | 17 + .../xds/RingHashLoadBalancerProviderTest.java | 8 +- .../io/grpc/xds/XdsDependencyManagerTest.java | 116 +- .../test/java/io/grpc/xds/XdsTestUtils.java | 32 + 12 files changed, 736 insertions(+), 1041 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index bb44071a48..c50f844d38 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -21,36 +21,30 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.CheckReturnValue; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.internal.ObjectPool; +import io.grpc.StatusOr; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; -import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.XdsConfig.Subscription; +import io.grpc.xds.XdsConfig.XdsClusterConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; /** * Load balancer for cds_experimental LB policy. One instance per top-level cluster. @@ -60,13 +54,11 @@ import javax.annotation.Nullable; final class CdsLoadBalancer2 extends LoadBalancer { private final XdsLogger logger; private final Helper helper; - private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; // Following fields are effectively final. - private ObjectPool xdsClientPool; - private XdsClient xdsClient; - private CdsLbState cdsLbState; - private ResolvedAddresses resolvedAddresses; + private String clusterName; + private Subscription clusterSubscription; + private LoadBalancer childLb; CdsLoadBalancer2(Helper helper) { this(helper, LoadBalancerRegistry.getDefaultRegistry()); @@ -75,7 +67,6 @@ final class CdsLoadBalancer2 extends LoadBalancer { @VisibleForTesting CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) { this.helper = checkNotNull(helper, "helper"); - this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); @@ -83,25 +74,115 @@ final class CdsLoadBalancer2 extends LoadBalancer { @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - if (this.resolvedAddresses != null) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + if (this.clusterName == null) { + CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + logger.log(XdsLogLevel.INFO, "Config: {0}", config); + if (config.isDynamic) { + clusterSubscription = resolvedAddresses.getAttributes() + .get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY) + .subscribeToCluster(config.name); + } + this.clusterName = config.name; + } + XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG); + StatusOr clusterConfigOr = xdsConfig.getClusters().get(clusterName); + if (clusterConfigOr == null) { + if (clusterSubscription == null) { + // Should be impossible, because XdsDependencyManager wouldn't have generated this + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unable to find non-dynamic root cluster")); + } + // The dynamic cluster must not have loaded yet return Status.OK; } - logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); - this.resolvedAddresses = resolvedAddresses; - xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL); - xdsClient = xdsClientPool.getObject(); - CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - logger.log(XdsLogLevel.INFO, "Config: {0}", config); - cdsLbState = new CdsLbState(config.name); - cdsLbState.start(); - return Status.OK; + if (!clusterConfigOr.hasValue()) { + return fail(clusterConfigOr.getStatus()); + } + XdsClusterConfig clusterConfig = clusterConfigOr.getValue(); + List leafNames; + if (clusterConfig.getChildren() instanceof AggregateConfig) { + leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames(); + } else if (clusterConfig.getChildren() instanceof EndpointConfig) { + leafNames = ImmutableList.of(clusterName); + } else { + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unexpected cluster children type: " + + clusterConfig.getChildren().getClass())); + } + if (leafNames.isEmpty()) { + // Should be impossible, because XdsClusterResource validated this + return fail(Status.UNAVAILABLE.withDescription( + errorPrefix() + "Zero leaf clusters for root cluster " + clusterName)); + } + + Status noneFoundError = Status.INTERNAL + .withDescription(errorPrefix() + "No leaves and no error; this is a bug"); + List instances = new ArrayList<>(); + for (String leafName : leafNames) { + StatusOr leafConfigOr = xdsConfig.getClusters().get(leafName); + if (!leafConfigOr.hasValue()) { + noneFoundError = leafConfigOr.getStatus(); + continue; + } + if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) { + noneFoundError = Status.INTERNAL.withDescription( + errorPrefix() + "Unexpected child " + leafName + " cluster children type: " + + leafConfigOr.getValue().getChildren().getClass()); + continue; + } + CdsUpdate result = leafConfigOr.getValue().getClusterResource(); + DiscoveryMechanism instance; + if (result.clusterType() == ClusterType.EDS) { + instance = DiscoveryMechanism.forEds( + leafName, + result.edsServiceName(), + result.lrsServerInfo(), + result.maxConcurrentRequests(), + result.upstreamTlsContext(), + result.filterMetadata(), + result.outlierDetection()); + } else { + instance = DiscoveryMechanism.forLogicalDns( + leafName, + result.dnsHostName(), + result.lrsServerInfo(), + result.maxConcurrentRequests(), + result.upstreamTlsContext(), + result.filterMetadata()); + } + instances.add(instance); + } + if (instances.isEmpty()) { + return fail(noneFoundError); + } + + // The LB policy config is provided in service_config.proto/JSON format. + NameResolver.ConfigOrError configOrError = + GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig( + Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry); + if (configOrError.getError() != null) { + // Should be impossible, because XdsClusterResource validated this + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unable to parse the LB config: " + configOrError.getError())); + } + + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.unmodifiableList(instances), + configOrError.getConfig(), + clusterConfig.getClusterResource().isHttp11ProxyAvailable()); + if (childLb == null) { + childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); + } + return childLb.acceptResolvedAddresses( + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build()); } @Override public void handleNameResolutionError(Status error) { logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); - if (cdsLbState != null && cdsLbState.childLb != null) { - cdsLbState.childLb.handleNameResolutionError(error); + if (childLb != null) { + childLb.handleNameResolutionError(error); } else { helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); @@ -111,314 +192,28 @@ final class CdsLoadBalancer2 extends LoadBalancer { @Override public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); - if (cdsLbState != null) { - cdsLbState.shutdown(); + if (childLb != null) { + childLb.shutdown(); + childLb = null; } - if (xdsClientPool != null) { - xdsClientPool.returnObject(xdsClient); + if (clusterSubscription != null) { + clusterSubscription.close(); + clusterSubscription = null; } } - /** - * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when - * receiving the CDS LB policy config with the top-level cluster name. - */ - private final class CdsLbState { - - private final ClusterState root; - private final Map clusterStates = new ConcurrentHashMap<>(); - private LoadBalancer childLb; - - private CdsLbState(String rootCluster) { - root = new ClusterState(rootCluster); + @CheckReturnValue // don't forget to return up the stack after the fail call + private Status fail(Status error) { + if (childLb != null) { + childLb.shutdown(); + childLb = null; } + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); + return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter + } - private void start() { - root.start(); - } - - private void shutdown() { - root.shutdown(); - if (childLb != null) { - childLb.shutdown(); - } - } - - private void handleClusterDiscovered() { - List instances = new ArrayList<>(); - - // Used for loop detection to break the infinite recursion that loops would cause - Map> parentClusters = new HashMap<>(); - Status loopStatus = null; - - // Level-order traversal. - // Collect configurations for all non-aggregate (leaf) clusters. - Queue queue = new ArrayDeque<>(); - queue.add(root); - while (!queue.isEmpty()) { - int size = queue.size(); - for (int i = 0; i < size; i++) { - ClusterState clusterState = queue.remove(); - if (!clusterState.discovered) { - return; // do not proceed until all clusters discovered - } - if (clusterState.result == null) { // resource revoked or not exists - continue; - } - if (clusterState.isLeaf) { - if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) { - DiscoveryMechanism instance; - if (clusterState.result.clusterType() == ClusterType.EDS) { - instance = DiscoveryMechanism.forEds( - clusterState.name, clusterState.result.edsServiceName(), - clusterState.result.lrsServerInfo(), - clusterState.result.maxConcurrentRequests(), - clusterState.result.upstreamTlsContext(), - clusterState.result.filterMetadata(), - clusterState.result.outlierDetection()); - } else { // logical DNS - instance = DiscoveryMechanism.forLogicalDns( - clusterState.name, clusterState.result.dnsHostName(), - clusterState.result.lrsServerInfo(), - clusterState.result.maxConcurrentRequests(), - clusterState.result.upstreamTlsContext(), - clusterState.result.filterMetadata()); - } - instances.add(instance); - } - } else { - if (clusterState.childClusterStates == null) { - continue; - } - // Do loop detection and break recursion if detected - List namesCausingLoops = identifyLoops(clusterState, parentClusters); - if (namesCausingLoops.isEmpty()) { - queue.addAll(clusterState.childClusterStates.values()); - } else { - // Do cleanup - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - if (loopStatus != null) { - logger.log(XdsLogLevel.WARNING, - "Multiple loops in CDS config. Old msg: " + loopStatus.getDescription()); - } - loopStatus = Status.UNAVAILABLE.withDescription(String.format( - "CDS error: circular aggregate clusters directly under %s for " - + "root cluster %s, named %s, xDS node ID: %s", - clusterState.name, root.name, namesCausingLoops, - xdsClient.getBootstrapInfo().node().getId())); - } - } - } - } - - if (loopStatus != null) { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus))); - return; - } - - if (instances.isEmpty()) { // none of non-aggregate clusters exists - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - Status unavailable = Status.UNAVAILABLE.withDescription(String.format( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s" - + " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId())); - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable))); - return; - } - - // The LB policy config is provided in service_config.proto/JSON format. - NameResolver.ConfigOrError configOrError = - GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig( - Arrays.asList(root.result.lbPolicyConfig()), lbRegistry); - if (configOrError.getError() != null) { - throw configOrError.getError().augmentDescription("Unable to parse the LB config") - .asRuntimeException(); - } - - ClusterResolverConfig config = new ClusterResolverConfig( - Collections.unmodifiableList(instances), - configOrError.getConfig(), - root.result.isHttp11ProxyAvailable()); - if (childLb == null) { - childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); - } - childLb.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build()); - } - - /** - * Returns children that would cause loops and builds up the parentClusters map. - **/ - - private List identifyLoops(ClusterState clusterState, - Map> parentClusters) { - Set ancestors = new HashSet<>(); - ancestors.add(clusterState.name); - addAncestors(ancestors, clusterState, parentClusters); - - List namesCausingLoops = new ArrayList<>(); - for (ClusterState state : clusterState.childClusterStates.values()) { - if (ancestors.contains(state.name)) { - namesCausingLoops.add(state.name); - } - } - - // Update parent map with entries from remaining children to clusterState - clusterState.childClusterStates.values().stream() - .filter(child -> !namesCausingLoops.contains(child.name)) - .forEach( - child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>()) - .add(clusterState)); - - return namesCausingLoops; - } - - /** Recursively add all parents to the ancestors list. **/ - private void addAncestors(Set ancestors, ClusterState clusterState, - Map> parentClusters) { - List directParents = parentClusters.get(clusterState); - if (directParents != null) { - directParents.stream().map(c -> c.name).forEach(ancestors::add); - directParents.forEach(p -> addAncestors(ancestors, p, parentClusters)); - } - } - - private void handleClusterDiscoveryError(Status error) { - String description = error.getDescription() == null ? "" : error.getDescription() + " "; - Status errorWithNodeId = error.withDescription( - description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); - if (childLb != null) { - childLb.handleNameResolutionError(errorWithNodeId); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId))); - } - } - - private final class ClusterState implements ResourceWatcher { - private final String name; - @Nullable - private Map childClusterStates; - @Nullable - private CdsUpdate result; - // Following fields are effectively final. - private boolean isLeaf; - private boolean discovered; - private boolean shutdown; - - private ClusterState(String name) { - this.name = name; - } - - private void start() { - shutdown = false; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext); - } - - void shutdown() { - shutdown = true; - xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this); - if (childClusterStates != null) { - // recursively shut down all descendants - childClusterStates.values().stream() - .filter(state -> !state.shutdown) - .forEach(ClusterState::shutdown); - } - } - - @Override - public void onError(Status error) { - Status status = Status.UNAVAILABLE - .withDescription( - String.format("Unable to load CDS %s. xDS server returned: %s: %s", - name, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - if (shutdown) { - return; - } - // All watchers should receive the same error, so we only propagate it once. - if (ClusterState.this == root) { - handleClusterDiscoveryError(status); - } - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - if (shutdown) { - return; - } - discovered = true; - result = null; - if (childClusterStates != null) { - for (ClusterState state : childClusterStates.values()) { - state.shutdown(); - } - childClusterStates = null; - } - handleClusterDiscovered(); - } - - @Override - public void onChanged(final CdsUpdate update) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); - discovered = true; - result = update; - if (update.clusterType() == ClusterType.AGGREGATE) { - isLeaf = false; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", - update.clusterName(), update.prioritizedClusterNames()); - Map newChildStates = new LinkedHashMap<>(); - for (String cluster : update.prioritizedClusterNames()) { - if (newChildStates.containsKey(cluster)) { - logger.log(XdsLogLevel.WARNING, - String.format("duplicate cluster name %s in aggregate %s is being ignored", - cluster, update.clusterName())); - continue; - } - if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { - ClusterState childState; - if (clusterStates.containsKey(cluster)) { - childState = clusterStates.get(cluster); - if (childState.shutdown) { - childState.start(); - } - } else { - childState = new ClusterState(cluster); - clusterStates.put(cluster, childState); - childState.start(); - } - newChildStates.put(cluster, childState); - } else { - newChildStates.put(cluster, childClusterStates.remove(cluster)); - } - } - if (childClusterStates != null) { // stop subscribing to revoked child clusters - for (ClusterState watcher : childClusterStates.values()) { - watcher.shutdown(); - } - } - childClusterStates = newChildStates; - } else if (update.clusterType() == ClusterType.EDS) { - isLeaf = true; - logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName(), update.edsServiceName()); - } else { // logical DNS - isLeaf = true; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); - } - handleClusterDiscovered(); - } - - } + private String errorPrefix() { + return "CdsLb for " + clusterName + ": "; } } diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java index 01bd2ab27f..9b242822f6 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java @@ -36,8 +36,6 @@ import java.util.Map; @Internal public class CdsLoadBalancerProvider extends LoadBalancerProvider { - private static final String CLUSTER_KEY = "cluster"; - @Override public boolean isAvailable() { return true; @@ -70,9 +68,12 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider { */ static ConfigOrError parseLoadBalancingConfigPolicy(Map rawLoadBalancingPolicyConfig) { try { - String cluster = - JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY); - return ConfigOrError.fromConfig(new CdsConfig(cluster)); + String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster"); + Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic"); + if (isDynamic == null) { + isDynamic = Boolean.FALSE; + } + return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic)); } catch (RuntimeException e) { return ConfigOrError.fromError( Status.UNAVAILABLE.withCause(e).withDescription( @@ -89,15 +90,28 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider { * Name of cluster to query CDS for. */ final String name; + /** + * Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of + * it without an explicit cluster subscription. + */ + final boolean isDynamic; CdsConfig(String name) { + this(name, false); + } + + CdsConfig(String name, boolean isDynamic) { checkArgument(name != null && !name.isEmpty(), "name is null or empty"); this.name = name; + this.isDynamic = isDynamic; } @Override public String toString() { - return MoreObjects.toStringHelper(this).add("name", name).toString(); + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("isDynamic", isDynamic) + .toString(); } } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java index 035ff76c58..bb4f8de5a5 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java @@ -104,7 +104,7 @@ public final class RingHashLoadBalancerProvider extends LoadBalancerProvider { } if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) { return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription( - "Invalid 'mingRingSize'/'maxRingSize'")); + "Invalid 'minRingSize'/'maxRingSize'")); } return ConfigOrError.fromConfig( new RingHashConfig(minRingSize, maxRingSize, requestHashHeader)); diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index 3f2b2d8fd7..a5220515b6 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -172,7 +172,9 @@ class XdsClusterResource extends XdsResourceType { lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig( lbConfig.getRawConfigValue()); if (configOrError.getError() != null) { - throw new ResourceInvalidException(structOrError.getErrorDetail()); + throw new ResourceInvalidException( + "Failed to parse lb config for cluster '" + cluster.getName() + "': " + + configOrError.getError()); } updateBuilder.lbPolicyConfig(lbPolicyConfig); @@ -209,6 +211,10 @@ class XdsClusterResource extends XdsResourceType { } catch (InvalidProtocolBufferException e) { return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e); } + if (clusterConfig.getClustersList().isEmpty()) { + return StructOrError.fromError("Cluster " + clusterName + + ": aggregate ClusterConfig.clusters must not be empty"); + } return StructOrError.fromStruct(CdsUpdate.forAggregate( clusterName, clusterConfig.getClustersList())); } diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index 1f464aa132..d184f08de5 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -254,6 +254,12 @@ final class XdsConfig { } public interface XdsClusterSubscriptionRegistry { - Closeable subscribeToCluster(String clusterName); + Subscription subscribeToCluster(String clusterName); + } + + public interface Subscription extends Closeable { + /** Release resources without throwing exceptions. */ + @Override + void close(); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 45ae7074d1..0428852648 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.xds.client.XdsClient.ResourceUpdate; import com.google.common.annotations.VisibleForTesting; @@ -34,8 +35,6 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsResourceType; -import java.io.Closeable; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -56,39 +55,43 @@ import javax.annotation.Nullable; final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry { public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); - private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ + private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37 private final String listenerName; private final XdsClient xdsClient; - private final XdsConfigWatcher xdsConfigWatcher; private final SynchronizationContext syncContext; private final String dataPlaneAuthority; + private XdsConfigWatcher xdsConfigWatcher; private StatusOr lastUpdate = null; private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); private final Set subscriptions = new HashSet<>(); - XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, + XdsDependencyManager(XdsClient xdsClient, SynchronizationContext syncContext, String dataPlaneAuthority, String listenerName, NameResolver.Args nameResolverArgs, ScheduledExecutorService scheduler) { this.listenerName = checkNotNull(listenerName, "listenerName"); this.xdsClient = checkNotNull(xdsClient, "xdsClient"); - this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority"); checkNotNull(nameResolverArgs, "nameResolverArgs"); checkNotNull(scheduler, "scheduler"); - - // start the ball rolling - syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); } public static String toContextStr(String typeName, String resourceName) { return typeName + " resource " + resourceName; } + public void start(XdsConfigWatcher xdsConfigWatcher) { + checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted"); + this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); + // start the ball rolling + syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); + } + @Override - public Closeable subscribeToCluster(String clusterName) { + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + checkState(this.xdsConfigWatcher != null, "dep manager must first be started"); checkNotNull(clusterName, "clusterName"); ClusterSubscription subscription = new ClusterSubscription(clusterName); @@ -291,10 +294,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi addConfigForCluster(clusters, childCluster, ancestors, tracer); StatusOr config = clusters.get(childCluster); if (!config.hasValue()) { - clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( - "Unable to get leaves for " + clusterName + ": " - + config.getStatus().getDescription()))); - return; + // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not + // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the + // watchers reports a transient ADS stream error, the policy should report that it is in + // TRANSIENT_FAILURE if it has never passed a config to its child. + // + // But there's currently disagreement about whether that is actually what we want, and + // that was not originally implemented in gRPC Java. So we're keeping Java's old + // behavior for now and only failing the "leaves" (which is a bit arbitrary for a + // cycle). + leafNames.add(childCluster); + continue; } XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren(); if (children instanceof AggregateConfig) { @@ -325,6 +335,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi default: throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); } + if (clusters.containsKey(clusterName)) { + // If a cycle is detected, we'll have detected it while recursing, so now there will be a key + // present. We don't want to overwrite it with a non-error value. + return; + } clusters.put(clusterName, StatusOr.fromValue( new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } @@ -406,7 +421,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi void onUpdate(StatusOr config); } - private final class ClusterSubscription implements Closeable { + private final class ClusterSubscription implements XdsConfig.Subscription { private final String clusterName; boolean closed; // Accessed from syncContext @@ -419,7 +434,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } @Override - public void close() throws IOException { + public void close() { releaseSubscription(this); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index a14abf95f4..37a8e19ef3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -230,7 +230,8 @@ final class XdsNameResolver extends NameResolver { ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName); callCounterProvider = SharedCallCounterMap.getInstance(); - resolveState = new ResolveState(ldsResourceName); // auto starts + resolveState = new ResolveState(ldsResourceName); + resolveState.start(); } private static String expandPercentS(String template, String replacement) { @@ -653,10 +654,14 @@ final class XdsNameResolver extends NameResolver { private ResolveState(String ldsResourceName) { authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; xdsDependencyManager = - new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName, + new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName, nameResolverArgs, scheduler); } + void start() { + xdsDependencyManager.start(this); + } + private void shutdown() { if (stopped) { return; diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 479bde76ce..f9a09f704a 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -18,28 +18,50 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; -import static org.junit.Assert.fail; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; +import com.github.xds.type.v3.TypedStruct; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.protobuf.Any; +import com.google.protobuf.Struct; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.Value; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy; +import io.envoyproxy.envoy.config.cluster.v3.OutlierDetection; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; +import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.grpc.Attributes; +import io.grpc.ChannelLogger; import io.grpc.ConnectivityState; -import io.grpc.EquivalentAddressGroup; -import io.grpc.InsecureChannelCredentials; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; @@ -47,31 +69,25 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; -import io.grpc.internal.ObjectPool; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.FakeClock; +import io.grpc.testing.GrpcCleanupRule; import io.grpc.util.GracefulSwitchLoadBalancerAccessor; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; -import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; +import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; -import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; -import io.grpc.xds.XdsClusterResource.CdsUpdate; -import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.EnvoyProtoData; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import javax.annotation.Nullable; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -90,601 +106,446 @@ import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class CdsLoadBalancer2Test { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private static final String SERVER_NAME = "example.com"; private static final String CLUSTER = "cluster-foo.googleapis.com"; private static final String EDS_SERVICE_NAME = "backend-service-1.googleapis.com"; - private static final String DNS_HOST_NAME = "backend-service-dns.googleapis.com:443"; - private static final ServerInfo LRS_SERVER_INFO = - ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create()); - private static final String SERVER_URI = "trafficdirector.googleapis.com"; - private static final String NODE_ID = - "projects/42/networks/default/nodes/5c85b298-6f5b-4722-b74a-f7d1f0ccf5ad"; - private static final EnvoyProtoData.Node BOOTSTRAP_NODE = - EnvoyProtoData.Node.newBuilder().setId(NODE_ID).build(); - private static final BootstrapInfo BOOTSTRAP_INFO = BootstrapInfo.builder() - .servers(ImmutableList.of( - ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()))) - .node(BOOTSTRAP_NODE) + private static final String NODE_ID = "node-id"; + private final io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = + CommonTlsContextTestsUtil.buildUpstreamTlsContext("cert-instance-name", true); + private static final Cluster EDS_CLUSTER = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) .build(); - private final UpstreamTlsContext upstreamTlsContext = - CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe", true); - private final OutlierDetection outlierDetection = OutlierDetection.create( - null, null, null, null, SuccessRateEjection.create(null, null, null, null), null); - - private static final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new RuntimeException(e); - //throw new AssertionError(e); - } - }); + private final FakeClock fakeClock = new FakeClock(); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); private final List childBalancers = new ArrayList<>(); - private final FakeXdsClient xdsClient = new FakeXdsClient(); - private final ObjectPool xdsClientPool = new ObjectPool() { - @Override - public XdsClient getObject() { - xdsClientRefs++; - return xdsClient; - } - - @Override - public XdsClient returnObject(Object object) { - xdsClientRefs--; - return null; - } - }; + private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService(); + private final XdsClient xdsClient = XdsTestUtils.createXdsClient( + Arrays.asList("control-plane.example.com"), + serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport( + InProcessChannelBuilder + .forName(serverInfo.target()) + .directExecutor() + .build()), + fakeClock); + private final ServerInfo lrsServerInfo = xdsClient.getBootstrapInfo().servers().get(0); + private XdsDependencyManager xdsDepManager; @Mock private Helper helper; @Captor private ArgumentCaptor pickerCaptor; - private int xdsClientRefs; - private CdsLoadBalancer2 loadBalancer; + private CdsLoadBalancer2 loadBalancer; + private XdsConfig lastXdsConfig; @Before - public void setUp() { - when(helper.getSynchronizationContext()).thenReturn(syncContext); + public void setUp() throws Exception { lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider("round_robin")); lbRegistry.register( new FakeLoadBalancerProvider("ring_hash_experimental", new RingHashLoadBalancerProvider())); lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental", new LeastRequestLoadBalancerProvider())); + lbRegistry.register(new FakeLoadBalancerProvider("wrr_locality_experimental", + new WrrLocalityLoadBalancerProvider())); loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes( - // Other attributes not used by cluster_resolver LB are omitted. - Attributes.newBuilder() - .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) - .build()) - .setLoadBalancingPolicyConfig(new CdsConfig(CLUSTER)) - .build()); - assertThat(Iterables.getOnlyElement(xdsClient.watchers.keySet())).isEqualTo(CLUSTER); + + cleanupRule.register(InProcessServerBuilder + .forName("control-plane.example.com") + .addService(controlPlaneService) + .directExecutor() + .build() + .start()); + + SynchronizationContext syncContext = new SynchronizationContext((t, e) -> { + throw new AssertionError(e); + }); + + NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector((address) -> null) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class)) + .setChannelLogger(mock(ChannelLogger.class)) + .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) + .build(); + + xdsDepManager = new XdsDependencyManager( + xdsClient, + syncContext, + SERVER_NAME, + SERVER_NAME, + nameResolverArgs, + fakeClock.getScheduledExecutorService()); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of( + SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route"))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of( + "my-route", XdsTestUtils.buildRouteConfiguration(SERVER_NAME, "my-route", CLUSTER))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, ControlPlaneRule.buildClusterLoadAssignment( + "127.0.0.1", "", 1234, EDS_SERVICE_NAME))); } @After public void tearDown() { - loadBalancer.shutdown(); - assertThat(xdsClient.watchers).isEmpty(); - assertThat(xdsClientRefs).isEqualTo(0); + if (loadBalancer != null) { + shutdownLoadBalancer(); + } assertThat(childBalancers).isEmpty(); + + if (xdsDepManager != null) { + xdsDepManager.shutdown(); + } + xdsClient.shutdown(); + } + + private void shutdownLoadBalancer() { + LoadBalancer lb = this.loadBalancer; + this.loadBalancer = null; // Must avoid calling acceptResolvedAddresses after shutdown + lb.shutdown(); } @Test public void discoverTopLevelEdsCluster() { - CdsUpdate update = - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .setLrsServer(ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .setTransportSocket(TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.newBuilder() + .setCommonTlsContext(upstreamTlsContext.getCommonTlsContext()) + .build()))) + .setOutlierDetection(OutlierDetection.getDefaultInstance()) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, lrsServerInfo, 100L, upstreamTlsContext, + Collections.emptyMap(), io.grpc.xds.EnvoyServerProtoData.OutlierDetection.create( + null, null, null, null, SuccessRateEjection.create(null, null, null, null), + FailurePercentageEjection.create(null, null, null, null))))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) - .isEqualTo("round_robin"); + .isEqualTo("wrr_locality_experimental"); } @Test public void discoverTopLevelLogicalDnsCluster() { - CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - false) - .leastRequestLbPolicy(3).build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.LOGICAL_DNS) + .setLoadAssignment(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .addLbEndpoints(LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("dns.example.com") + .setPortValue(1111))))))) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLbPolicy(Cluster.LbPolicy.LEAST_REQUEST) + .setLrsServer(ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .setTransportSocket(TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.newBuilder() + .setCommonTlsContext(upstreamTlsContext.getCommonTlsContext()) + .build()))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, - DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, null); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forLogicalDns( + CLUSTER, "dns.example.com:1111", lrsServerInfo, 100L, upstreamTlsContext, + Collections.emptyMap()))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) - .isEqualTo("least_request_experimental"); - LeastRequestConfig lrConfig = (LeastRequestConfig) - GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig); - assertThat(lrConfig.choiceCount).isEqualTo(3); + .isEqualTo("wrr_locality_experimental"); } @Test public void nonAggregateCluster_resourceNotExist_returnErrorPicker() { - xdsClient.deliverResourceNotExist(CLUSTER); + startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); + "CDS resource " + CLUSTER + " does not exist nodeID: " + NODE_ID); + assertPickerStatus(pickerCaptor.getValue(), unavailable); assertThat(childBalancers).isEmpty(); } @Test public void nonAggregateCluster_resourceUpdate() { - CdsUpdate update = - CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = EDS_CLUSTER.toBuilder() + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, null, - 100L, upstreamTlsContext, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null))); - update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null, - outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + cluster = EDS_CLUSTER.toBuilder() + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); + childBalancer = Iterables.getOnlyElement(childBalancers); childLbConfig = (ClusterResolverConfig) childBalancer.config; - instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 200L, null, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, 200L, null, Collections.emptyMap(), null))); } @Test public void nonAggregateCluster_resourceRevoked() { - CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, - DNS_HOST_NAME, null, 100L, upstreamTlsContext, null); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null))); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); - xdsClient.deliverResourceNotExist(CLUSTER); assertThat(childBalancer.shutdown).isTrue(); Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); + "CDS resource " + CLUSTER + " does not exist nodeID: " + NODE_ID); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), unavailable, null); + assertPickerStatus(pickerCaptor.getValue(), unavailable); assertThat(childBalancer.shutdown).isTrue(); assertThat(childBalancers).isEmpty(); } + @Test + public void dynamicCluster() { + String clusterName = "cluster2"; + Cluster cluster = EDS_CLUSTER.toBuilder() + .setName(clusterName) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + clusterName, cluster, + CLUSTER, Cluster.newBuilder().setName(CLUSTER).build())); + startXdsDepManager(new CdsConfig(clusterName, /*dynamic=*/ true)); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + clusterName, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null))); + + assertThat(this.lastXdsConfig.getClusters()).containsKey(clusterName); + shutdownLoadBalancer(); + assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); + } + @Test public void discoverAggregateCluster() { String cluster1 = "cluster-01.googleapis.com"; String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .ringHashLbPolicy(100L, 1000L).build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - assertThat(childBalancers).isEmpty(); String cluster3 = "cluster-03.googleapis.com"; String cluster4 = "cluster-04.googleapis.com"; - // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] - CdsUpdate update1 = - CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - assertThat(xdsClient.watchers.keySet()).containsExactly( - CLUSTER, cluster1, cluster2, cluster3, cluster4); - assertThat(childBalancers).isEmpty(); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - assertThat(childBalancers).isEmpty(); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(childBalancers).isEmpty(); - CdsUpdate update4 = - CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster4, update4); - assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS), cluster3 (EDS)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .addClusters(cluster2) + .addClusters(cluster3) + .build()))) + .setLbPolicy(Cluster.LbPolicy.RING_HASH) + .build(), + // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] + cluster1, Cluster.newBuilder() + .setName(cluster1) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster3) + .addClusters(cluster4) + .build()))) + .build(), + cluster2, Cluster.newBuilder() + .setName(cluster2) + .setType(Cluster.DiscoveryType.LOGICAL_DNS) + .setLoadAssignment(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .addLbEndpoints(LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("dns.example.com") + .setPortValue(1111))))))) + .build(), + cluster3, EDS_CLUSTER.toBuilder() + .setName(cluster3) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .build(), + cluster4, EDS_CLUSTER.toBuilder().setName(cluster4).build())); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(3); - // Clusters on higher level has higher priority: [cluster2, cluster3, cluster4] - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, null, 100L, null, null); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster3, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4, - DiscoveryMechanism.Type.EDS, null, null, LRS_SERVER_INFO, 300L, null, outlierDetection); + // Clusters are resolved recursively, later duplicates removed: [cluster3, cluster4, cluster2] + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + cluster3, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null), + DiscoveryMechanism.forEds( + cluster4, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null), + DiscoveryMechanism.forLogicalDns( + cluster2, "dns.example.com:1111", null, null, null, Collections.emptyMap()))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) .isEqualTo("ring_hash_experimental"); // dominated by top-level cluster's config - RingHashConfig ringHashConfig = (RingHashConfig) - GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig); - assertThat(ringHashConfig.minRingSize).isEqualTo(100L); - assertThat(ringHashConfig.maxRingSize).isEqualTo(1000L); + } + + @Test + public void aggregateCluster_noChildren() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .build()))) + .build())); + startXdsDepManager(); + + verify(helper) + .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()) + .contains("aggregate ClusterConfig.clusters must not be empty"); + assertThat(childBalancers).isEmpty(); } @Test public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() { String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - xdsClient.deliverResourceNotExist(cluster1); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (missing)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .build()))) + .setLbPolicy(Cluster.LbPolicy.RING_HASH) + .build())); + startXdsDepManager(); + verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); + Status status = Status.UNAVAILABLE.withDescription( + "CDS resource " + cluster1 + " does not exist nodeID: " + NODE_ID); + assertPickerStatus(pickerCaptor.getValue(), status); assertThat(childBalancers).isEmpty(); } @Test - public void aggregateCluster_descendantClustersRevoked() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(2); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - - // Revoke cluster1, should still be able to proceed with cluster2. - xdsClient.deliverResourceNotExist(cluster1); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - assertDiscoveryMechanism(Iterables.getOnlyElement(childLbConfig.discoveryMechanisms), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - verify(helper, never()).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), any(SubchannelPicker.class)); - - // All revoked. - xdsClient.deliverResourceNotExist(cluster2); + public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_failingPicker() { + Status status = Status.UNAVAILABLE.withDescription("unreachable"); + loadBalancer.handleNameResolutionError(status); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_rootClusterRevoked() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(2); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - - xdsClient.deliverResourceNotExist(CLUSTER); - assertThat(xdsClient.watchers.keySet()) - .containsExactly(CLUSTER); // subscription to all descendant clusters cancelled - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_intermediateClusterChanges() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2); - - // cluster2 (aggr.) -> [cluster3 (EDS)] - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, cluster3, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); - - // cluster2 revoked - xdsClient.deliverResourceNotExist(cluster2); - assertThat(xdsClient.watchers.keySet()) - .containsExactly(CLUSTER, cluster2); // cancelled subscription to cluster3 - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_withLoops() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(cluster1, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - - // cluster2 (aggr.) -> [cluster3 (EDS), cluster1 (parent), cluster2 (self), cluster3 (dup)] - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster1, cluster2, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2, cluster3); - - reset(helper); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: circular aggregate clusters directly under cluster-02.googleapis.com for root" - + " cluster cluster-foo.googleapis.com, named [cluster-01.googleapis.com," - + " cluster-02.googleapis.com], xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - } - - @Test - public void aggregateCluster_withLoops_afterEds() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(cluster1, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - - // cluster2 (aggr.) -> [cluster3 (EDS)] - CdsUpdate update2a = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster1, cluster2, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2a); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2, cluster3); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: circular aggregate clusters directly under cluster-02.googleapis.com for root" - + " cluster cluster-foo.googleapis.com, named [cluster-01.googleapis.com," - + " cluster-02.googleapis.com], xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - } - - @Test - public void aggregateCluster_duplicateChildren() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - String cluster3 = "cluster-03.googleapis.com"; - String cluster4 = "cluster-04.googleapis.com"; - - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // cluster1 (aggr) -> [cluster3 (EDS), cluster2 (aggr), cluster4 (aggr)] - CdsUpdate update1 = - CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster2, cluster4, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - assertThat(xdsClient.watchers.keySet()).containsExactly( - cluster3, cluster4, cluster2, cluster1, CLUSTER); - xdsClient.watchers.values().forEach(list -> assertThat(list.size()).isEqualTo(1)); - - // cluster2 (agg) -> [cluster3 (EDS), cluster4 {agg}] with dups - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster4, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - - // Define EDS cluster - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - - // cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies) - CdsUpdate update4 = - CdsUpdate.forAggregate(cluster4, Arrays.asList(cluster3, cluster3, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster4, update4); - xdsClient.watchers.values().forEach(list -> assertThat(list.size()).isEqualTo(1)); - - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, cluster3, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); - } - - @Test - public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); - xdsClient.deliverError(error); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status expectedError = Status.UNAVAILABLE.withDescription( - "Unable to load CDS cluster-foo.googleapis.com. xDS server returned: " - + "RESOURCE_EXHAUSTED: OOM xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), expectedError, null); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - CdsUpdate update1 = - CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - - Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); - xdsClient.deliverError(error); - assertThat(childLb.upstreamError.getCode()).isEqualTo(Status.Code.UNAVAILABLE); - assertThat(childLb.upstreamError.getDescription()).contains("RESOURCE_EXHAUSTED: OOM"); - assertThat(childLb.shutdown).isFalse(); // child LB may choose to keep working - } - - @Test - public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() { - Status upstreamError = Status.UNAVAILABLE.withDescription( - "unreachable xDS node ID: " + NODE_ID); - loadBalancer.handleNameResolutionError(upstreamError); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), upstreamError, null); + assertPickerStatus(pickerCaptor.getValue(), status); } @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { - CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); + loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(childBalancer.upstreamError.getDescription()).isEqualTo("unreachable"); @@ -694,56 +555,91 @@ public class CdsLoadBalancer2Test { @Test public void unknownLbProvider() { - try { - xdsClient.deliverCdsUpdate(CLUSTER, - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false) - .lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build()); - } catch (Exception e) { - assertThat(e).hasMessageThat().contains("unknownLb"); - return; - } - fail("Expected the unknown LB to cause an exception"); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder() + .addPolicies(Policy.newBuilder() + .setTypedExtensionConfig(TypedExtensionConfig.newBuilder() + .setTypedConfig(Any.pack(TypedStruct.newBuilder() + .setTypeUrl("type.googleapis.com/unknownLb") + .setValue(Struct.getDefaultInstance()) + .build()))))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy"); } @Test public void invalidLbConfig() { - try { - xdsClient.deliverCdsUpdate(CLUSTER, - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false).lbPolicyConfig( - ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) - .build()); - } catch (Exception e) { - assertThat(e).hasMessageThat().contains("Unable to parse"); - return; - } - fail("Expected the invalid config to cause an exception"); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder() + .addPolicies(Policy.newBuilder() + .setTypedExtensionConfig(TypedExtensionConfig.newBuilder() + .setTypedConfig(Any.pack(TypedStruct.newBuilder() + .setTypeUrl("type.googleapis.com/ring_hash_experimental") + .setValue(Struct.newBuilder() + .putFields("minRingSize", Value.newBuilder().setNumberValue(-1).build())) + .build()))))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'"); } - private static void assertPicker(SubchannelPicker picker, Status expectedStatus, - @Nullable Subchannel expectedSubchannel) { + private void startXdsDepManager() { + startXdsDepManager(new CdsConfig(CLUSTER)); + } + + private void startXdsDepManager(final CdsConfig cdsConfig) { + xdsDepManager.start( + xdsConfig -> { + if (!xdsConfig.hasValue()) { + throw new AssertionError("" + xdsConfig.getStatus()); + } + this.lastXdsConfig = xdsConfig.getValue(); + if (loadBalancer == null) { + return; + } + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig.getValue()) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + }); + // trigger does not exist timer, so broken config is more obvious + fakeClock.forwardTime(10, TimeUnit.MINUTES); + } + + private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) { PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription()); - if (actualStatus.isOk()) { - assertThat(result.getSubchannel()).isSameInstanceAs(expectedSubchannel); - } - } - - private static void assertDiscoveryMechanism(DiscoveryMechanism instance, String name, - DiscoveryMechanism.Type type, @Nullable String edsServiceName, @Nullable String dnsHostName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) { - assertThat(instance.cluster).isEqualTo(name); - assertThat(instance.type).isEqualTo(type); - assertThat(instance.edsServiceName).isEqualTo(edsServiceName); - assertThat(instance.dnsHostName).isEqualTo(dnsHostName); - assertThat(instance.lrsServerInfo).isEqualTo(lrsServerInfo); - assertThat(instance.maxConcurrentRequests).isEqualTo(maxConcurrentRequests); - assertThat(instance.tlsContext).isEqualTo(tlsContext); - assertThat(instance.outlierDetection).isEqualTo(outlierDetection); } private final class FakeLoadBalancerProvider extends LoadBalancerProvider { @@ -802,8 +698,9 @@ public class CdsLoadBalancer2Test { } @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { config = resolvedAddresses.getLoadBalancingPolicyConfig(); + return Status.OK; } @Override @@ -817,58 +714,4 @@ public class CdsLoadBalancer2Test { childBalancers.remove(this); } } - - private static final class FakeXdsClient extends XdsClient { - // watchers needs to support any non-cyclic shaped graphs - private final Map>> watchers = new HashMap<>(); - - @Override - @SuppressWarnings("unchecked") - public void watchXdsResource(XdsResourceType type, - String resourceName, - ResourceWatcher watcher, Executor syncContext) { - assertThat(type.typeName()).isEqualTo("CDS"); - watchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) - .add((ResourceWatcher)watcher); - } - - @Override - public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { - assertThat(type.typeName()).isEqualTo("CDS"); - assertThat(watchers).containsKey(resourceName); - List> watcherList = watchers.get(resourceName); - assertThat(watcherList.remove(watcher)).isTrue(); - if (watcherList.isEmpty()) { - watchers.remove(resourceName); - } - } - - @Override - public BootstrapInfo getBootstrapInfo() { - return BOOTSTRAP_INFO; - } - - private void deliverCdsUpdate(String clusterName, CdsUpdate update) { - if (watchers.containsKey(clusterName)) { - List> resourceWatchers = - ImmutableList.copyOf(watchers.get(clusterName)); - resourceWatchers.forEach(w -> w.onChanged(update)); - } - } - - private void deliverResourceNotExist(String clusterName) { - if (watchers.containsKey(clusterName)) { - ImmutableList.copyOf(watchers.get(clusterName)) - .forEach(w -> w.onResourceDoesNotExist(clusterName)); - } - } - - private void deliverError(Status error) { - watchers.values().stream() - .flatMap(List::stream) - .forEach(w -> w.onError(error)); - } - } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 006440be6c..32361684a6 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2211,6 +2211,23 @@ public abstract class GrpcXdsClientImplTestBase { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } + @Test + public void cdsResponseWithEmptyAggregateCluster() { + DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE, + cdsResourceWatcher); + List candidates = Arrays.asList(); + Any clusterAggregate = + Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, null, candidates)); + call.sendResponse(CDS, clusterAggregate, VERSION_1, "0000"); + + // Client sent an ACK CDS request. + String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: " + + "Cluster cluster.googleapis.com: aggregate ClusterConfig.clusters must not be empty"; + call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + } + @Test public void cdsResponseWithCircuitBreakers() { DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE, diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java index 3036db5b09..66c9c5c537 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java @@ -106,7 +106,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -117,7 +117,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -214,7 +214,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -225,7 +225,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index aea1ad66d7..8ac0f8c9d8 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -28,7 +28,6 @@ import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME; import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT; import static io.grpc.xds.XdsTestUtils.RDS_NAME; import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster; -import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -48,28 +47,22 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.grpc.BindableService; import io.grpc.ChannelLogger; -import io.grpc.ManagedChannel; import io.grpc.NameResolver; -import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.StatusOrMatcher; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.internal.GrpcUtil; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; -import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceMetadata; -import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; -import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; import java.io.IOException; import java.util.ArrayDeque; @@ -96,7 +89,6 @@ import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.InOrder; -import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -108,21 +100,20 @@ public class XdsDependencyManagerTest { public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName(); public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName(); - @Mock - private XdsClientMetricReporter xdsClientMetricReporter; - private final SynchronizationContext syncContext = new SynchronizationContext((t, e) -> { throw new AssertionError(e); }); - - private ManagedChannel channel; - private XdsClient xdsClient; - private XdsDependencyManager xdsDependencyManager; - private TestWatcher xdsConfigWatcher; - private Server xdsServer; - private final FakeClock fakeClock = new FakeClock(); + + private XdsClient xdsClient = XdsTestUtils.createXdsClient( + Collections.singletonList("control-plane"), + serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport( + InProcessChannelBuilder.forName(serverInfo.target()).directExecutor().build()), + fakeClock); + + private TestWatcher xdsConfigWatcher; + private final String serverName = "the-service-name"; private final Queue loadReportCalls = new ArrayDeque<>(); private final AtomicBoolean adsEnded = new AtomicBoolean(true); @@ -150,10 +141,12 @@ public class XdsDependencyManagerTest { .build(); private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); + private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager( + xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler); @Before public void setUp() throws Exception { - xdsServer = cleanupRule.register(InProcessServerBuilder + cleanupRule.register(InProcessServerBuilder .forName("control-plane") .addService(controlPlaneService) .addService(lrsService) @@ -163,15 +156,6 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName); - channel = cleanupRule.register( - InProcessChannelBuilder.forName("control-plane").directExecutor().build()); - XdsTransportFactory xdsTransportFactory = - ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel); - - xdsClient = CommonBootstrapperTestUtils.createXdsClient( - Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock, - new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); - testWatcher = new TestWatcher(); xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher)); defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName); @@ -183,9 +167,6 @@ public class XdsDependencyManagerTest { xdsDependencyManager.shutdown(); } xdsClient.shutdown(); - channel.shutdown(); // channel not owned by XdsClient - - xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); assertThat(adsEnded.get()).isTrue(); assertThat(lrsEnded.get()).isTrue(); @@ -194,8 +175,7 @@ public class XdsDependencyManagerTest { @Test public void verify_basic_config() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); testWatcher.verifyStats(1, 0); @@ -203,8 +183,7 @@ public class XdsDependencyManagerTest { @Test public void verify_config_update() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); @@ -221,8 +200,7 @@ public class XdsDependencyManagerTest { @Test public void verify_simple_aggregate() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); @@ -281,8 +259,7 @@ public class XdsDependencyManagerTest { List childNames2 = Arrays.asList("clusterA", "clusterX"); XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); @@ -313,8 +290,7 @@ public class XdsDependencyManagerTest { @Test public void testDelayedSubscription() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String rootName1 = "root_c"; @@ -360,8 +336,7 @@ public class XdsDependencyManagerTest { edsMap.put("garbageEds", clusterLoadAssignment); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); @@ -393,8 +368,9 @@ public class XdsDependencyManagerTest { @Test public void testMissingLds() { String ldsName = "badLdsName"; - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext, serverName, ldsName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -409,8 +385,7 @@ public class XdsDependencyManagerTest { Listener serverListener = ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build(); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -427,8 +402,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, clientListener)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -444,8 +418,7 @@ public class XdsDependencyManagerTest { "wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME); controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); // Update with a config that has a virtual host that doesn't match the server name verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); @@ -460,8 +433,9 @@ public class XdsDependencyManagerTest { String ldsResourceName = "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1"; - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext, serverName, ldsResourceName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate( argThat(StatusOrMatcher.hasStatus( @@ -474,8 +448,7 @@ public class XdsDependencyManagerTest { @Test public void testChangeRdsName_fromLds() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String newRdsName = "newRdsName1"; @@ -530,8 +503,7 @@ public class XdsDependencyManagerTest { // Start the actual test InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); @@ -576,8 +548,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isTrue(); @@ -611,12 +582,12 @@ public class XdsDependencyManagerTest { // The cycle is loaded and detected InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + assertThat(config.getClusters().get("clusterB").hasValue()).isTrue(); // Orphan the cycle and it is discarded routeConfig = @@ -657,8 +628,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); // Start the actual test - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); assertThat(initialConfig.getClusters().keySet()) @@ -675,8 +645,7 @@ public class XdsDependencyManagerTest { @Test public void testChangeRdsName_FromLds_complexTree() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); // Create the same tree as in testMultipleParentsInCdsTree Cluster rootCluster = @@ -721,8 +690,7 @@ public class XdsDependencyManagerTest { public void testChangeAggCluster() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); // Setup initial config A -> A1 -> (A11, A12) @@ -775,8 +743,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig( ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME, Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build())); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); Status status = xdsUpdateCaptor.getValue().getValue() @@ -789,8 +756,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -822,8 +788,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -855,8 +820,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -888,8 +852,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -922,8 +885,7 @@ public class XdsDependencyManagerTest { ENDPOINT_HOSTNAME, ENDPOINT_PORT); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); xdsDependencyManager.shutdown(); diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index ec1ccd7c6d..e85058f0f3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -52,14 +52,20 @@ import io.grpc.BindableService; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.StatusOr; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; import io.grpc.stub.StreamObserver; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.Locality; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; +import io.grpc.xds.client.XdsTransportFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -364,6 +370,32 @@ public class XdsTestUtils { .setApiListener(clientListenerBuilder.build()).build(); } + public static XdsClient createXdsClient( + List serverUris, + XdsTransportFactory xdsTransportFactory, + FakeClock fakeClock) { + return createXdsClient( + CommonBootstrapperTestUtils.buildBootStrap(serverUris), + xdsTransportFactory, + fakeClock, + new XdsClientMetricReporter() {}); + } + + /** Calls {@link CommonBootstrapperTestUtils#createXdsClient} with gRPC-specific values. */ + public static XdsClient createXdsClient( + Bootstrapper.BootstrapInfo bootstrapInfo, + XdsTransportFactory xdsTransportFactory, + FakeClock fakeClock, + XdsClientMetricReporter xdsClientMetricReporter) { + return CommonBootstrapperTestUtils.createXdsClient( + bootstrapInfo, + xdsTransportFactory, + fakeClock, + new ExponentialBackoffPolicy.Provider(), + MessagePrinter.INSTANCE, + xdsClientMetricReporter); + } + /** * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with * the same list of clusterName:clusterServiceName pair.