diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index bb44071a48..c50f844d38 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -21,36 +21,30 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.CheckReturnValue; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.internal.ObjectPool; +import io.grpc.StatusOr; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; -import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.XdsConfig.Subscription; +import io.grpc.xds.XdsConfig.XdsClusterConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; /** * Load balancer for cds_experimental LB policy. One instance per top-level cluster. @@ -60,13 +54,11 @@ import javax.annotation.Nullable; final class CdsLoadBalancer2 extends LoadBalancer { private final XdsLogger logger; private final Helper helper; - private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; // Following fields are effectively final. - private ObjectPool xdsClientPool; - private XdsClient xdsClient; - private CdsLbState cdsLbState; - private ResolvedAddresses resolvedAddresses; + private String clusterName; + private Subscription clusterSubscription; + private LoadBalancer childLb; CdsLoadBalancer2(Helper helper) { this(helper, LoadBalancerRegistry.getDefaultRegistry()); @@ -75,7 +67,6 @@ final class CdsLoadBalancer2 extends LoadBalancer { @VisibleForTesting CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) { this.helper = checkNotNull(helper, "helper"); - this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); @@ -83,25 +74,115 @@ final class CdsLoadBalancer2 extends LoadBalancer { @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - if (this.resolvedAddresses != null) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + if (this.clusterName == null) { + CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + logger.log(XdsLogLevel.INFO, "Config: {0}", config); + if (config.isDynamic) { + clusterSubscription = resolvedAddresses.getAttributes() + .get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY) + .subscribeToCluster(config.name); + } + this.clusterName = config.name; + } + XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG); + StatusOr clusterConfigOr = xdsConfig.getClusters().get(clusterName); + if (clusterConfigOr == null) { + if (clusterSubscription == null) { + // Should be impossible, because XdsDependencyManager wouldn't have generated this + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unable to find non-dynamic root cluster")); + } + // The dynamic cluster must not have loaded yet return Status.OK; } - logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); - this.resolvedAddresses = resolvedAddresses; - xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL); - xdsClient = xdsClientPool.getObject(); - CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - logger.log(XdsLogLevel.INFO, "Config: {0}", config); - cdsLbState = new CdsLbState(config.name); - cdsLbState.start(); - return Status.OK; + if (!clusterConfigOr.hasValue()) { + return fail(clusterConfigOr.getStatus()); + } + XdsClusterConfig clusterConfig = clusterConfigOr.getValue(); + List leafNames; + if (clusterConfig.getChildren() instanceof AggregateConfig) { + leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames(); + } else if (clusterConfig.getChildren() instanceof EndpointConfig) { + leafNames = ImmutableList.of(clusterName); + } else { + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unexpected cluster children type: " + + clusterConfig.getChildren().getClass())); + } + if (leafNames.isEmpty()) { + // Should be impossible, because XdsClusterResource validated this + return fail(Status.UNAVAILABLE.withDescription( + errorPrefix() + "Zero leaf clusters for root cluster " + clusterName)); + } + + Status noneFoundError = Status.INTERNAL + .withDescription(errorPrefix() + "No leaves and no error; this is a bug"); + List instances = new ArrayList<>(); + for (String leafName : leafNames) { + StatusOr leafConfigOr = xdsConfig.getClusters().get(leafName); + if (!leafConfigOr.hasValue()) { + noneFoundError = leafConfigOr.getStatus(); + continue; + } + if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) { + noneFoundError = Status.INTERNAL.withDescription( + errorPrefix() + "Unexpected child " + leafName + " cluster children type: " + + leafConfigOr.getValue().getChildren().getClass()); + continue; + } + CdsUpdate result = leafConfigOr.getValue().getClusterResource(); + DiscoveryMechanism instance; + if (result.clusterType() == ClusterType.EDS) { + instance = DiscoveryMechanism.forEds( + leafName, + result.edsServiceName(), + result.lrsServerInfo(), + result.maxConcurrentRequests(), + result.upstreamTlsContext(), + result.filterMetadata(), + result.outlierDetection()); + } else { + instance = DiscoveryMechanism.forLogicalDns( + leafName, + result.dnsHostName(), + result.lrsServerInfo(), + result.maxConcurrentRequests(), + result.upstreamTlsContext(), + result.filterMetadata()); + } + instances.add(instance); + } + if (instances.isEmpty()) { + return fail(noneFoundError); + } + + // The LB policy config is provided in service_config.proto/JSON format. + NameResolver.ConfigOrError configOrError = + GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig( + Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry); + if (configOrError.getError() != null) { + // Should be impossible, because XdsClusterResource validated this + return fail(Status.INTERNAL.withDescription( + errorPrefix() + "Unable to parse the LB config: " + configOrError.getError())); + } + + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.unmodifiableList(instances), + configOrError.getConfig(), + clusterConfig.getClusterResource().isHttp11ProxyAvailable()); + if (childLb == null) { + childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); + } + return childLb.acceptResolvedAddresses( + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build()); } @Override public void handleNameResolutionError(Status error) { logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); - if (cdsLbState != null && cdsLbState.childLb != null) { - cdsLbState.childLb.handleNameResolutionError(error); + if (childLb != null) { + childLb.handleNameResolutionError(error); } else { helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); @@ -111,314 +192,28 @@ final class CdsLoadBalancer2 extends LoadBalancer { @Override public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); - if (cdsLbState != null) { - cdsLbState.shutdown(); + if (childLb != null) { + childLb.shutdown(); + childLb = null; } - if (xdsClientPool != null) { - xdsClientPool.returnObject(xdsClient); + if (clusterSubscription != null) { + clusterSubscription.close(); + clusterSubscription = null; } } - /** - * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when - * receiving the CDS LB policy config with the top-level cluster name. - */ - private final class CdsLbState { - - private final ClusterState root; - private final Map clusterStates = new ConcurrentHashMap<>(); - private LoadBalancer childLb; - - private CdsLbState(String rootCluster) { - root = new ClusterState(rootCluster); + @CheckReturnValue // don't forget to return up the stack after the fail call + private Status fail(Status error) { + if (childLb != null) { + childLb.shutdown(); + childLb = null; } + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); + return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter + } - private void start() { - root.start(); - } - - private void shutdown() { - root.shutdown(); - if (childLb != null) { - childLb.shutdown(); - } - } - - private void handleClusterDiscovered() { - List instances = new ArrayList<>(); - - // Used for loop detection to break the infinite recursion that loops would cause - Map> parentClusters = new HashMap<>(); - Status loopStatus = null; - - // Level-order traversal. - // Collect configurations for all non-aggregate (leaf) clusters. - Queue queue = new ArrayDeque<>(); - queue.add(root); - while (!queue.isEmpty()) { - int size = queue.size(); - for (int i = 0; i < size; i++) { - ClusterState clusterState = queue.remove(); - if (!clusterState.discovered) { - return; // do not proceed until all clusters discovered - } - if (clusterState.result == null) { // resource revoked or not exists - continue; - } - if (clusterState.isLeaf) { - if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) { - DiscoveryMechanism instance; - if (clusterState.result.clusterType() == ClusterType.EDS) { - instance = DiscoveryMechanism.forEds( - clusterState.name, clusterState.result.edsServiceName(), - clusterState.result.lrsServerInfo(), - clusterState.result.maxConcurrentRequests(), - clusterState.result.upstreamTlsContext(), - clusterState.result.filterMetadata(), - clusterState.result.outlierDetection()); - } else { // logical DNS - instance = DiscoveryMechanism.forLogicalDns( - clusterState.name, clusterState.result.dnsHostName(), - clusterState.result.lrsServerInfo(), - clusterState.result.maxConcurrentRequests(), - clusterState.result.upstreamTlsContext(), - clusterState.result.filterMetadata()); - } - instances.add(instance); - } - } else { - if (clusterState.childClusterStates == null) { - continue; - } - // Do loop detection and break recursion if detected - List namesCausingLoops = identifyLoops(clusterState, parentClusters); - if (namesCausingLoops.isEmpty()) { - queue.addAll(clusterState.childClusterStates.values()); - } else { - // Do cleanup - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - if (loopStatus != null) { - logger.log(XdsLogLevel.WARNING, - "Multiple loops in CDS config. Old msg: " + loopStatus.getDescription()); - } - loopStatus = Status.UNAVAILABLE.withDescription(String.format( - "CDS error: circular aggregate clusters directly under %s for " - + "root cluster %s, named %s, xDS node ID: %s", - clusterState.name, root.name, namesCausingLoops, - xdsClient.getBootstrapInfo().node().getId())); - } - } - } - } - - if (loopStatus != null) { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus))); - return; - } - - if (instances.isEmpty()) { // none of non-aggregate clusters exists - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - Status unavailable = Status.UNAVAILABLE.withDescription(String.format( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s" - + " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId())); - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable))); - return; - } - - // The LB policy config is provided in service_config.proto/JSON format. - NameResolver.ConfigOrError configOrError = - GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig( - Arrays.asList(root.result.lbPolicyConfig()), lbRegistry); - if (configOrError.getError() != null) { - throw configOrError.getError().augmentDescription("Unable to parse the LB config") - .asRuntimeException(); - } - - ClusterResolverConfig config = new ClusterResolverConfig( - Collections.unmodifiableList(instances), - configOrError.getConfig(), - root.result.isHttp11ProxyAvailable()); - if (childLb == null) { - childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); - } - childLb.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build()); - } - - /** - * Returns children that would cause loops and builds up the parentClusters map. - **/ - - private List identifyLoops(ClusterState clusterState, - Map> parentClusters) { - Set ancestors = new HashSet<>(); - ancestors.add(clusterState.name); - addAncestors(ancestors, clusterState, parentClusters); - - List namesCausingLoops = new ArrayList<>(); - for (ClusterState state : clusterState.childClusterStates.values()) { - if (ancestors.contains(state.name)) { - namesCausingLoops.add(state.name); - } - } - - // Update parent map with entries from remaining children to clusterState - clusterState.childClusterStates.values().stream() - .filter(child -> !namesCausingLoops.contains(child.name)) - .forEach( - child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>()) - .add(clusterState)); - - return namesCausingLoops; - } - - /** Recursively add all parents to the ancestors list. **/ - private void addAncestors(Set ancestors, ClusterState clusterState, - Map> parentClusters) { - List directParents = parentClusters.get(clusterState); - if (directParents != null) { - directParents.stream().map(c -> c.name).forEach(ancestors::add); - directParents.forEach(p -> addAncestors(ancestors, p, parentClusters)); - } - } - - private void handleClusterDiscoveryError(Status error) { - String description = error.getDescription() == null ? "" : error.getDescription() + " "; - Status errorWithNodeId = error.withDescription( - description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); - if (childLb != null) { - childLb.handleNameResolutionError(errorWithNodeId); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId))); - } - } - - private final class ClusterState implements ResourceWatcher { - private final String name; - @Nullable - private Map childClusterStates; - @Nullable - private CdsUpdate result; - // Following fields are effectively final. - private boolean isLeaf; - private boolean discovered; - private boolean shutdown; - - private ClusterState(String name) { - this.name = name; - } - - private void start() { - shutdown = false; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext); - } - - void shutdown() { - shutdown = true; - xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this); - if (childClusterStates != null) { - // recursively shut down all descendants - childClusterStates.values().stream() - .filter(state -> !state.shutdown) - .forEach(ClusterState::shutdown); - } - } - - @Override - public void onError(Status error) { - Status status = Status.UNAVAILABLE - .withDescription( - String.format("Unable to load CDS %s. xDS server returned: %s: %s", - name, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - if (shutdown) { - return; - } - // All watchers should receive the same error, so we only propagate it once. - if (ClusterState.this == root) { - handleClusterDiscoveryError(status); - } - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - if (shutdown) { - return; - } - discovered = true; - result = null; - if (childClusterStates != null) { - for (ClusterState state : childClusterStates.values()) { - state.shutdown(); - } - childClusterStates = null; - } - handleClusterDiscovered(); - } - - @Override - public void onChanged(final CdsUpdate update) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); - discovered = true; - result = update; - if (update.clusterType() == ClusterType.AGGREGATE) { - isLeaf = false; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", - update.clusterName(), update.prioritizedClusterNames()); - Map newChildStates = new LinkedHashMap<>(); - for (String cluster : update.prioritizedClusterNames()) { - if (newChildStates.containsKey(cluster)) { - logger.log(XdsLogLevel.WARNING, - String.format("duplicate cluster name %s in aggregate %s is being ignored", - cluster, update.clusterName())); - continue; - } - if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { - ClusterState childState; - if (clusterStates.containsKey(cluster)) { - childState = clusterStates.get(cluster); - if (childState.shutdown) { - childState.start(); - } - } else { - childState = new ClusterState(cluster); - clusterStates.put(cluster, childState); - childState.start(); - } - newChildStates.put(cluster, childState); - } else { - newChildStates.put(cluster, childClusterStates.remove(cluster)); - } - } - if (childClusterStates != null) { // stop subscribing to revoked child clusters - for (ClusterState watcher : childClusterStates.values()) { - watcher.shutdown(); - } - } - childClusterStates = newChildStates; - } else if (update.clusterType() == ClusterType.EDS) { - isLeaf = true; - logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName(), update.edsServiceName()); - } else { // logical DNS - isLeaf = true; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); - } - handleClusterDiscovered(); - } - - } + private String errorPrefix() { + return "CdsLb for " + clusterName + ": "; } } diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java index 01bd2ab27f..9b242822f6 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java @@ -36,8 +36,6 @@ import java.util.Map; @Internal public class CdsLoadBalancerProvider extends LoadBalancerProvider { - private static final String CLUSTER_KEY = "cluster"; - @Override public boolean isAvailable() { return true; @@ -70,9 +68,12 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider { */ static ConfigOrError parseLoadBalancingConfigPolicy(Map rawLoadBalancingPolicyConfig) { try { - String cluster = - JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY); - return ConfigOrError.fromConfig(new CdsConfig(cluster)); + String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster"); + Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic"); + if (isDynamic == null) { + isDynamic = Boolean.FALSE; + } + return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic)); } catch (RuntimeException e) { return ConfigOrError.fromError( Status.UNAVAILABLE.withCause(e).withDescription( @@ -89,15 +90,28 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider { * Name of cluster to query CDS for. */ final String name; + /** + * Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of + * it without an explicit cluster subscription. + */ + final boolean isDynamic; CdsConfig(String name) { + this(name, false); + } + + CdsConfig(String name, boolean isDynamic) { checkArgument(name != null && !name.isEmpty(), "name is null or empty"); this.name = name; + this.isDynamic = isDynamic; } @Override public String toString() { - return MoreObjects.toStringHelper(this).add("name", name).toString(); + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("isDynamic", isDynamic) + .toString(); } } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java index 035ff76c58..bb4f8de5a5 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java @@ -104,7 +104,7 @@ public final class RingHashLoadBalancerProvider extends LoadBalancerProvider { } if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) { return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription( - "Invalid 'mingRingSize'/'maxRingSize'")); + "Invalid 'minRingSize'/'maxRingSize'")); } return ConfigOrError.fromConfig( new RingHashConfig(minRingSize, maxRingSize, requestHashHeader)); diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index 3f2b2d8fd7..a5220515b6 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -172,7 +172,9 @@ class XdsClusterResource extends XdsResourceType { lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig( lbConfig.getRawConfigValue()); if (configOrError.getError() != null) { - throw new ResourceInvalidException(structOrError.getErrorDetail()); + throw new ResourceInvalidException( + "Failed to parse lb config for cluster '" + cluster.getName() + "': " + + configOrError.getError()); } updateBuilder.lbPolicyConfig(lbPolicyConfig); @@ -209,6 +211,10 @@ class XdsClusterResource extends XdsResourceType { } catch (InvalidProtocolBufferException e) { return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e); } + if (clusterConfig.getClustersList().isEmpty()) { + return StructOrError.fromError("Cluster " + clusterName + + ": aggregate ClusterConfig.clusters must not be empty"); + } return StructOrError.fromStruct(CdsUpdate.forAggregate( clusterName, clusterConfig.getClustersList())); } diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index 1f464aa132..d184f08de5 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -254,6 +254,12 @@ final class XdsConfig { } public interface XdsClusterSubscriptionRegistry { - Closeable subscribeToCluster(String clusterName); + Subscription subscribeToCluster(String clusterName); + } + + public interface Subscription extends Closeable { + /** Release resources without throwing exceptions. */ + @Override + void close(); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 45ae7074d1..0428852648 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.xds.client.XdsClient.ResourceUpdate; import com.google.common.annotations.VisibleForTesting; @@ -34,8 +35,6 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsResourceType; -import java.io.Closeable; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -56,39 +55,43 @@ import javax.annotation.Nullable; final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry { public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); - private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ + private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37 private final String listenerName; private final XdsClient xdsClient; - private final XdsConfigWatcher xdsConfigWatcher; private final SynchronizationContext syncContext; private final String dataPlaneAuthority; + private XdsConfigWatcher xdsConfigWatcher; private StatusOr lastUpdate = null; private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); private final Set subscriptions = new HashSet<>(); - XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, + XdsDependencyManager(XdsClient xdsClient, SynchronizationContext syncContext, String dataPlaneAuthority, String listenerName, NameResolver.Args nameResolverArgs, ScheduledExecutorService scheduler) { this.listenerName = checkNotNull(listenerName, "listenerName"); this.xdsClient = checkNotNull(xdsClient, "xdsClient"); - this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority"); checkNotNull(nameResolverArgs, "nameResolverArgs"); checkNotNull(scheduler, "scheduler"); - - // start the ball rolling - syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); } public static String toContextStr(String typeName, String resourceName) { return typeName + " resource " + resourceName; } + public void start(XdsConfigWatcher xdsConfigWatcher) { + checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted"); + this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); + // start the ball rolling + syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); + } + @Override - public Closeable subscribeToCluster(String clusterName) { + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + checkState(this.xdsConfigWatcher != null, "dep manager must first be started"); checkNotNull(clusterName, "clusterName"); ClusterSubscription subscription = new ClusterSubscription(clusterName); @@ -291,10 +294,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi addConfigForCluster(clusters, childCluster, ancestors, tracer); StatusOr config = clusters.get(childCluster); if (!config.hasValue()) { - clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription( - "Unable to get leaves for " + clusterName + ": " - + config.getStatus().getDescription()))); - return; + // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not + // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the + // watchers reports a transient ADS stream error, the policy should report that it is in + // TRANSIENT_FAILURE if it has never passed a config to its child. + // + // But there's currently disagreement about whether that is actually what we want, and + // that was not originally implemented in gRPC Java. So we're keeping Java's old + // behavior for now and only failing the "leaves" (which is a bit arbitrary for a + // cycle). + leafNames.add(childCluster); + continue; } XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren(); if (children instanceof AggregateConfig) { @@ -325,6 +335,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi default: throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); } + if (clusters.containsKey(clusterName)) { + // If a cycle is detected, we'll have detected it while recursing, so now there will be a key + // present. We don't want to overwrite it with a non-error value. + return; + } clusters.put(clusterName, StatusOr.fromValue( new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); } @@ -406,7 +421,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi void onUpdate(StatusOr config); } - private final class ClusterSubscription implements Closeable { + private final class ClusterSubscription implements XdsConfig.Subscription { private final String clusterName; boolean closed; // Accessed from syncContext @@ -419,7 +434,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } @Override - public void close() throws IOException { + public void close() { releaseSubscription(this); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index a14abf95f4..37a8e19ef3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -230,7 +230,8 @@ final class XdsNameResolver extends NameResolver { ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName); callCounterProvider = SharedCallCounterMap.getInstance(); - resolveState = new ResolveState(ldsResourceName); // auto starts + resolveState = new ResolveState(ldsResourceName); + resolveState.start(); } private static String expandPercentS(String template, String replacement) { @@ -653,10 +654,14 @@ final class XdsNameResolver extends NameResolver { private ResolveState(String ldsResourceName) { authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; xdsDependencyManager = - new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName, + new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName, nameResolverArgs, scheduler); } + void start() { + xdsDependencyManager.start(this); + } + private void shutdown() { if (stopped) { return; diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 479bde76ce..f9a09f704a 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -18,28 +18,50 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; -import static org.junit.Assert.fail; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; +import com.github.xds.type.v3.TypedStruct; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.protobuf.Any; +import com.google.protobuf.Struct; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.Value; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy; +import io.envoyproxy.envoy.config.cluster.v3.OutlierDetection; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; +import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.grpc.Attributes; +import io.grpc.ChannelLogger; import io.grpc.ConnectivityState; -import io.grpc.EquivalentAddressGroup; -import io.grpc.InsecureChannelCredentials; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; @@ -47,31 +69,25 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; -import io.grpc.internal.ObjectPool; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.FakeClock; +import io.grpc.testing.GrpcCleanupRule; import io.grpc.util.GracefulSwitchLoadBalancerAccessor; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; -import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; +import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; -import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; -import io.grpc.xds.XdsClusterResource.CdsUpdate; -import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.EnvoyProtoData; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import javax.annotation.Nullable; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -90,601 +106,446 @@ import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class CdsLoadBalancer2Test { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private static final String SERVER_NAME = "example.com"; private static final String CLUSTER = "cluster-foo.googleapis.com"; private static final String EDS_SERVICE_NAME = "backend-service-1.googleapis.com"; - private static final String DNS_HOST_NAME = "backend-service-dns.googleapis.com:443"; - private static final ServerInfo LRS_SERVER_INFO = - ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create()); - private static final String SERVER_URI = "trafficdirector.googleapis.com"; - private static final String NODE_ID = - "projects/42/networks/default/nodes/5c85b298-6f5b-4722-b74a-f7d1f0ccf5ad"; - private static final EnvoyProtoData.Node BOOTSTRAP_NODE = - EnvoyProtoData.Node.newBuilder().setId(NODE_ID).build(); - private static final BootstrapInfo BOOTSTRAP_INFO = BootstrapInfo.builder() - .servers(ImmutableList.of( - ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()))) - .node(BOOTSTRAP_NODE) + private static final String NODE_ID = "node-id"; + private final io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = + CommonTlsContextTestsUtil.buildUpstreamTlsContext("cert-instance-name", true); + private static final Cluster EDS_CLUSTER = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) .build(); - private final UpstreamTlsContext upstreamTlsContext = - CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe", true); - private final OutlierDetection outlierDetection = OutlierDetection.create( - null, null, null, null, SuccessRateEjection.create(null, null, null, null), null); - - private static final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new RuntimeException(e); - //throw new AssertionError(e); - } - }); + private final FakeClock fakeClock = new FakeClock(); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); private final List childBalancers = new ArrayList<>(); - private final FakeXdsClient xdsClient = new FakeXdsClient(); - private final ObjectPool xdsClientPool = new ObjectPool() { - @Override - public XdsClient getObject() { - xdsClientRefs++; - return xdsClient; - } - - @Override - public XdsClient returnObject(Object object) { - xdsClientRefs--; - return null; - } - }; + private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService(); + private final XdsClient xdsClient = XdsTestUtils.createXdsClient( + Arrays.asList("control-plane.example.com"), + serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport( + InProcessChannelBuilder + .forName(serverInfo.target()) + .directExecutor() + .build()), + fakeClock); + private final ServerInfo lrsServerInfo = xdsClient.getBootstrapInfo().servers().get(0); + private XdsDependencyManager xdsDepManager; @Mock private Helper helper; @Captor private ArgumentCaptor pickerCaptor; - private int xdsClientRefs; - private CdsLoadBalancer2 loadBalancer; + private CdsLoadBalancer2 loadBalancer; + private XdsConfig lastXdsConfig; @Before - public void setUp() { - when(helper.getSynchronizationContext()).thenReturn(syncContext); + public void setUp() throws Exception { lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider("round_robin")); lbRegistry.register( new FakeLoadBalancerProvider("ring_hash_experimental", new RingHashLoadBalancerProvider())); lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental", new LeastRequestLoadBalancerProvider())); + lbRegistry.register(new FakeLoadBalancerProvider("wrr_locality_experimental", + new WrrLocalityLoadBalancerProvider())); loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes( - // Other attributes not used by cluster_resolver LB are omitted. - Attributes.newBuilder() - .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) - .build()) - .setLoadBalancingPolicyConfig(new CdsConfig(CLUSTER)) - .build()); - assertThat(Iterables.getOnlyElement(xdsClient.watchers.keySet())).isEqualTo(CLUSTER); + + cleanupRule.register(InProcessServerBuilder + .forName("control-plane.example.com") + .addService(controlPlaneService) + .directExecutor() + .build() + .start()); + + SynchronizationContext syncContext = new SynchronizationContext((t, e) -> { + throw new AssertionError(e); + }); + + NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector((address) -> null) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class)) + .setChannelLogger(mock(ChannelLogger.class)) + .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) + .build(); + + xdsDepManager = new XdsDependencyManager( + xdsClient, + syncContext, + SERVER_NAME, + SERVER_NAME, + nameResolverArgs, + fakeClock.getScheduledExecutorService()); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of( + SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route"))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of( + "my-route", XdsTestUtils.buildRouteConfiguration(SERVER_NAME, "my-route", CLUSTER))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, ControlPlaneRule.buildClusterLoadAssignment( + "127.0.0.1", "", 1234, EDS_SERVICE_NAME))); } @After public void tearDown() { - loadBalancer.shutdown(); - assertThat(xdsClient.watchers).isEmpty(); - assertThat(xdsClientRefs).isEqualTo(0); + if (loadBalancer != null) { + shutdownLoadBalancer(); + } assertThat(childBalancers).isEmpty(); + + if (xdsDepManager != null) { + xdsDepManager.shutdown(); + } + xdsClient.shutdown(); + } + + private void shutdownLoadBalancer() { + LoadBalancer lb = this.loadBalancer; + this.loadBalancer = null; // Must avoid calling acceptResolvedAddresses after shutdown + lb.shutdown(); } @Test public void discoverTopLevelEdsCluster() { - CdsUpdate update = - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .setLrsServer(ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .setTransportSocket(TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.newBuilder() + .setCommonTlsContext(upstreamTlsContext.getCommonTlsContext()) + .build()))) + .setOutlierDetection(OutlierDetection.getDefaultInstance()) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, lrsServerInfo, 100L, upstreamTlsContext, + Collections.emptyMap(), io.grpc.xds.EnvoyServerProtoData.OutlierDetection.create( + null, null, null, null, SuccessRateEjection.create(null, null, null, null), + FailurePercentageEjection.create(null, null, null, null))))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) - .isEqualTo("round_robin"); + .isEqualTo("wrr_locality_experimental"); } @Test public void discoverTopLevelLogicalDnsCluster() { - CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - false) - .leastRequestLbPolicy(3).build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.LOGICAL_DNS) + .setLoadAssignment(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .addLbEndpoints(LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("dns.example.com") + .setPortValue(1111))))))) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLbPolicy(Cluster.LbPolicy.LEAST_REQUEST) + .setLrsServer(ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .setTransportSocket(TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.newBuilder() + .setCommonTlsContext(upstreamTlsContext.getCommonTlsContext()) + .build()))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, - DNS_HOST_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, null); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forLogicalDns( + CLUSTER, "dns.example.com:1111", lrsServerInfo, 100L, upstreamTlsContext, + Collections.emptyMap()))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) - .isEqualTo("least_request_experimental"); - LeastRequestConfig lrConfig = (LeastRequestConfig) - GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig); - assertThat(lrConfig.choiceCount).isEqualTo(3); + .isEqualTo("wrr_locality_experimental"); } @Test public void nonAggregateCluster_resourceNotExist_returnErrorPicker() { - xdsClient.deliverResourceNotExist(CLUSTER); + startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); + "CDS resource " + CLUSTER + " does not exist nodeID: " + NODE_ID); + assertPickerStatus(pickerCaptor.getValue(), unavailable); assertThat(childBalancers).isEmpty(); } @Test public void nonAggregateCluster_resourceUpdate() { - CdsUpdate update = - CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext, outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = EDS_CLUSTER.toBuilder() + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, null, - 100L, upstreamTlsContext, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null))); - update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, null, - outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + cluster = EDS_CLUSTER.toBuilder() + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); + childBalancer = Iterables.getOnlyElement(childBalancers); childLbConfig = (ClusterResolverConfig) childBalancer.config; - instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 200L, null, outlierDetection); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, 200L, null, Collections.emptyMap(), null))); } @Test public void nonAggregateCluster_resourceRevoked() { - CdsUpdate update = - CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, - DNS_HOST_NAME, null, 100L, upstreamTlsContext, null); + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + CLUSTER, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null))); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); - xdsClient.deliverResourceNotExist(CLUSTER); assertThat(childBalancer.shutdown).isTrue(); Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); + "CDS resource " + CLUSTER + " does not exist nodeID: " + NODE_ID); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), unavailable, null); + assertPickerStatus(pickerCaptor.getValue(), unavailable); assertThat(childBalancer.shutdown).isTrue(); assertThat(childBalancers).isEmpty(); } + @Test + public void dynamicCluster() { + String clusterName = "cluster2"; + Cluster cluster = EDS_CLUSTER.toBuilder() + .setName(clusterName) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + clusterName, cluster, + CLUSTER, Cluster.newBuilder().setName(CLUSTER).build())); + startXdsDepManager(new CdsConfig(clusterName, /*dynamic=*/ true)); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + clusterName, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null))); + + assertThat(this.lastXdsConfig.getClusters()).containsKey(clusterName); + shutdownLoadBalancer(); + assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); + } + @Test public void discoverAggregateCluster() { String cluster1 = "cluster-01.googleapis.com"; String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .ringHashLbPolicy(100L, 1000L).build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - assertThat(childBalancers).isEmpty(); String cluster3 = "cluster-03.googleapis.com"; String cluster4 = "cluster-04.googleapis.com"; - // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] - CdsUpdate update1 = - CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - assertThat(xdsClient.watchers.keySet()).containsExactly( - CLUSTER, cluster1, cluster2, cluster3, cluster4); - assertThat(childBalancers).isEmpty(); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - assertThat(childBalancers).isEmpty(); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(childBalancers).isEmpty(); - CdsUpdate update4 = - CdsUpdate.forEds(cluster4, null, LRS_SERVER_INFO, 300L, null, outlierDetection, false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster4, update4); - assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS), cluster3 (EDS)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .addClusters(cluster2) + .addClusters(cluster3) + .build()))) + .setLbPolicy(Cluster.LbPolicy.RING_HASH) + .build(), + // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] + cluster1, Cluster.newBuilder() + .setName(cluster1) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster3) + .addClusters(cluster4) + .build()))) + .build(), + cluster2, Cluster.newBuilder() + .setName(cluster2) + .setType(Cluster.DiscoveryType.LOGICAL_DNS) + .setLoadAssignment(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .addLbEndpoints(LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("dns.example.com") + .setPortValue(1111))))))) + .build(), + cluster3, EDS_CLUSTER.toBuilder() + .setName(cluster3) + .setCircuitBreakers(CircuitBreakers.newBuilder() + .addThresholds(CircuitBreakers.Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(100)))) + .build(), + cluster4, EDS_CLUSTER.toBuilder().setName(cluster4).build())); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(3); - // Clusters on higher level has higher priority: [cluster2, cluster3, cluster4] - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, null, 100L, null, null); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster3, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4, - DiscoveryMechanism.Type.EDS, null, null, LRS_SERVER_INFO, 300L, null, outlierDetection); + // Clusters are resolved recursively, later duplicates removed: [cluster3, cluster4, cluster2] + assertThat(childLbConfig.discoveryMechanisms).isEqualTo( + Arrays.asList( + DiscoveryMechanism.forEds( + cluster3, EDS_SERVICE_NAME, null, 100L, null, Collections.emptyMap(), null), + DiscoveryMechanism.forEds( + cluster4, EDS_SERVICE_NAME, null, null, null, Collections.emptyMap(), null), + DiscoveryMechanism.forLogicalDns( + cluster2, "dns.example.com:1111", null, null, null, Collections.emptyMap()))); assertThat( GracefulSwitchLoadBalancerAccessor.getChildProvider(childLbConfig.lbConfig).getPolicyName()) .isEqualTo("ring_hash_experimental"); // dominated by top-level cluster's config - RingHashConfig ringHashConfig = (RingHashConfig) - GracefulSwitchLoadBalancerAccessor.getChildConfig(childLbConfig.lbConfig); - assertThat(ringHashConfig.minRingSize).isEqualTo(100L); - assertThat(ringHashConfig.maxRingSize).isEqualTo(1000L); + } + + @Test + public void aggregateCluster_noChildren() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .build()))) + .build())); + startXdsDepManager(); + + verify(helper) + .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()) + .contains("aggregate ClusterConfig.clusters must not be empty"); + assertThat(childBalancers).isEmpty(); } @Test public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() { String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - xdsClient.deliverResourceNotExist(cluster1); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (missing)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .build()))) + .setLbPolicy(Cluster.LbPolicy.RING_HASH) + .build())); + startXdsDepManager(); + verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); + Status status = Status.UNAVAILABLE.withDescription( + "CDS resource " + cluster1 + " does not exist nodeID: " + NODE_ID); + assertPickerStatus(pickerCaptor.getValue(), status); assertThat(childBalancers).isEmpty(); } @Test - public void aggregateCluster_descendantClustersRevoked() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(2); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - - // Revoke cluster1, should still be able to proceed with cluster2. - xdsClient.deliverResourceNotExist(cluster1); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - assertDiscoveryMechanism(Iterables.getOnlyElement(childLbConfig.discoveryMechanisms), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - verify(helper, never()).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), any(SubchannelPicker.class)); - - // All revoked. - xdsClient.deliverResourceNotExist(cluster2); + public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_failingPicker() { + Status status = Status.UNAVAILABLE.withDescription("unreachable"); + loadBalancer.handleNameResolutionError(status); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_rootClusterRevoked() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - CdsUpdate update1 = CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - CdsUpdate update2 = - CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(2); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1, - DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, null, LRS_SERVER_INFO, 200L, - upstreamTlsContext, outlierDetection); - assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2, - DiscoveryMechanism.Type.LOGICAL_DNS, null, DNS_HOST_NAME, LRS_SERVER_INFO, 100L, null, - null); - - xdsClient.deliverResourceNotExist(CLUSTER); - assertThat(xdsClient.watchers.keySet()) - .containsExactly(CLUSTER); // subscription to all descendant clusters cancelled - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_intermediateClusterChanges() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2); - - // cluster2 (aggr.) -> [cluster3 (EDS)] - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, cluster3, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); - - // cluster2 revoked - xdsClient.deliverResourceNotExist(cluster2); - assertThat(xdsClient.watchers.keySet()) - .containsExactly(CLUSTER, cluster2); // cancelled subscription to cluster3 - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + CLUSTER - + " xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - assertThat(childBalancer.shutdown).isTrue(); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_withLoops() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(cluster1, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - - // cluster2 (aggr.) -> [cluster3 (EDS), cluster1 (parent), cluster2 (self), cluster3 (dup)] - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster1, cluster2, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2, cluster3); - - reset(helper); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: circular aggregate clusters directly under cluster-02.googleapis.com for root" - + " cluster cluster-foo.googleapis.com, named [cluster-01.googleapis.com," - + " cluster-02.googleapis.com], xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - } - - @Test - public void aggregateCluster_withLoops_afterEds() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // CLUSTER (aggr.) -> [cluster2 (aggr.)] - String cluster2 = "cluster-02.googleapis.com"; - update = - CdsUpdate.forAggregate(cluster1, Collections.singletonList(cluster2)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - - String cluster3 = "cluster-03.googleapis.com"; - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - - // cluster2 (aggr.) -> [cluster3 (EDS)] - CdsUpdate update2a = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster1, cluster2, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2a); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2, cluster3); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status unavailable = Status.UNAVAILABLE.withDescription( - "CDS error: circular aggregate clusters directly under cluster-02.googleapis.com for root" - + " cluster cluster-foo.googleapis.com, named [cluster-01.googleapis.com," - + " cluster-02.googleapis.com], xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), unavailable, null); - } - - @Test - public void aggregateCluster_duplicateChildren() { - String cluster1 = "cluster-01.googleapis.com"; - String cluster2 = "cluster-02.googleapis.com"; - String cluster3 = "cluster-03.googleapis.com"; - String cluster4 = "cluster-04.googleapis.com"; - - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - - // cluster1 (aggr) -> [cluster3 (EDS), cluster2 (aggr), cluster4 (aggr)] - CdsUpdate update1 = - CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster2, cluster4, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - assertThat(xdsClient.watchers.keySet()).containsExactly( - cluster3, cluster4, cluster2, cluster1, CLUSTER); - xdsClient.watchers.values().forEach(list -> assertThat(list.size()).isEqualTo(1)); - - // cluster2 (agg) -> [cluster3 (EDS), cluster4 {agg}] with dups - CdsUpdate update2 = - CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster4, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster2, update2); - - // Define EDS cluster - CdsUpdate update3 = CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster3, update3); - - // cluster4 (agg) -> [cluster3 (EDS)] with dups (3 copies) - CdsUpdate update4 = - CdsUpdate.forAggregate(cluster4, Arrays.asList(cluster3, cluster3, cluster3)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster4, update4); - xdsClient.watchers.values().forEach(list -> assertThat(list.size()).isEqualTo(1)); - - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); - assertDiscoveryMechanism(instance, cluster3, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, - null, LRS_SERVER_INFO, 100L, upstreamTlsContext, outlierDetection); - } - - @Test - public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); - Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); - xdsClient.deliverError(error); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status expectedError = Status.UNAVAILABLE.withDescription( - "Unable to load CDS cluster-foo.googleapis.com. xDS server returned: " - + "RESOURCE_EXHAUSTED: OOM xDS node ID: " + NODE_ID); - assertPicker(pickerCaptor.getValue(), expectedError, null); - assertThat(childBalancers).isEmpty(); - } - - @Test - public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() { - String cluster1 = "cluster-01.googleapis.com"; - // CLUSTER (aggr.) -> [cluster1 (logical DNS)] - CdsUpdate update = - CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); - CdsUpdate update1 = - CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_INFO, 200L, null, - false) - .roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(cluster1, update1); - FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers); - ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config; - assertThat(childLbConfig.discoveryMechanisms).hasSize(1); - - Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); - xdsClient.deliverError(error); - assertThat(childLb.upstreamError.getCode()).isEqualTo(Status.Code.UNAVAILABLE); - assertThat(childLb.upstreamError.getDescription()).contains("RESOURCE_EXHAUSTED: OOM"); - assertThat(childLb.shutdown).isFalse(); // child LB may choose to keep working - } - - @Test - public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() { - Status upstreamError = Status.UNAVAILABLE.withDescription( - "unreachable xDS node ID: " + NODE_ID); - loadBalancer.handleNameResolutionError(upstreamError); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), upstreamError, null); + assertPickerStatus(pickerCaptor.getValue(), status); } @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { - CdsUpdate update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, - upstreamTlsContext, outlierDetection, false).roundRobinLbPolicy().build(); - xdsClient.deliverCdsUpdate(CLUSTER, update); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); + loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(childBalancer.upstreamError.getDescription()).isEqualTo("unreachable"); @@ -694,56 +555,91 @@ public class CdsLoadBalancer2Test { @Test public void unknownLbProvider() { - try { - xdsClient.deliverCdsUpdate(CLUSTER, - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false) - .lbPolicyConfig(ImmutableMap.of("unknownLb", ImmutableMap.of("foo", "bar"))).build()); - } catch (Exception e) { - assertThat(e).hasMessageThat().contains("unknownLb"); - return; - } - fail("Expected the unknown LB to cause an exception"); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder() + .addPolicies(Policy.newBuilder() + .setTypedExtensionConfig(TypedExtensionConfig.newBuilder() + .setTypedConfig(Any.pack(TypedStruct.newBuilder() + .setTypeUrl("type.googleapis.com/unknownLb") + .setValue(Struct.getDefaultInstance()) + .build()))))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy"); } @Test public void invalidLbConfig() { - try { - xdsClient.deliverCdsUpdate(CLUSTER, - CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext, - outlierDetection, false).lbPolicyConfig( - ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) - .build()); - } catch (Exception e) { - assertThat(e).hasMessageThat().contains("Unable to parse"); - return; - } - fail("Expected the invalid config to cause an exception"); + Cluster cluster = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder() + .addPolicies(Policy.newBuilder() + .setTypedExtensionConfig(TypedExtensionConfig.newBuilder() + .setTypedConfig(Any.pack(TypedStruct.newBuilder() + .setTypeUrl("type.googleapis.com/ring_hash_experimental") + .setValue(Struct.newBuilder() + .putFields("minRingSize", Value.newBuilder().setNumberValue(-1).build())) + .build()))))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, cluster)); + startXdsDepManager(); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + Status actualStatus = result.getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'"); } - private static void assertPicker(SubchannelPicker picker, Status expectedStatus, - @Nullable Subchannel expectedSubchannel) { + private void startXdsDepManager() { + startXdsDepManager(new CdsConfig(CLUSTER)); + } + + private void startXdsDepManager(final CdsConfig cdsConfig) { + xdsDepManager.start( + xdsConfig -> { + if (!xdsConfig.hasValue()) { + throw new AssertionError("" + xdsConfig.getStatus()); + } + this.lastXdsConfig = xdsConfig.getValue(); + if (loadBalancer == null) { + return; + } + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig.getValue()) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + }); + // trigger does not exist timer, so broken config is more obvious + fakeClock.forwardTime(10, TimeUnit.MINUTES); + } + + private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) { PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription()); - if (actualStatus.isOk()) { - assertThat(result.getSubchannel()).isSameInstanceAs(expectedSubchannel); - } - } - - private static void assertDiscoveryMechanism(DiscoveryMechanism instance, String name, - DiscoveryMechanism.Type type, @Nullable String edsServiceName, @Nullable String dnsHostName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) { - assertThat(instance.cluster).isEqualTo(name); - assertThat(instance.type).isEqualTo(type); - assertThat(instance.edsServiceName).isEqualTo(edsServiceName); - assertThat(instance.dnsHostName).isEqualTo(dnsHostName); - assertThat(instance.lrsServerInfo).isEqualTo(lrsServerInfo); - assertThat(instance.maxConcurrentRequests).isEqualTo(maxConcurrentRequests); - assertThat(instance.tlsContext).isEqualTo(tlsContext); - assertThat(instance.outlierDetection).isEqualTo(outlierDetection); } private final class FakeLoadBalancerProvider extends LoadBalancerProvider { @@ -802,8 +698,9 @@ public class CdsLoadBalancer2Test { } @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { config = resolvedAddresses.getLoadBalancingPolicyConfig(); + return Status.OK; } @Override @@ -817,58 +714,4 @@ public class CdsLoadBalancer2Test { childBalancers.remove(this); } } - - private static final class FakeXdsClient extends XdsClient { - // watchers needs to support any non-cyclic shaped graphs - private final Map>> watchers = new HashMap<>(); - - @Override - @SuppressWarnings("unchecked") - public void watchXdsResource(XdsResourceType type, - String resourceName, - ResourceWatcher watcher, Executor syncContext) { - assertThat(type.typeName()).isEqualTo("CDS"); - watchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) - .add((ResourceWatcher)watcher); - } - - @Override - public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { - assertThat(type.typeName()).isEqualTo("CDS"); - assertThat(watchers).containsKey(resourceName); - List> watcherList = watchers.get(resourceName); - assertThat(watcherList.remove(watcher)).isTrue(); - if (watcherList.isEmpty()) { - watchers.remove(resourceName); - } - } - - @Override - public BootstrapInfo getBootstrapInfo() { - return BOOTSTRAP_INFO; - } - - private void deliverCdsUpdate(String clusterName, CdsUpdate update) { - if (watchers.containsKey(clusterName)) { - List> resourceWatchers = - ImmutableList.copyOf(watchers.get(clusterName)); - resourceWatchers.forEach(w -> w.onChanged(update)); - } - } - - private void deliverResourceNotExist(String clusterName) { - if (watchers.containsKey(clusterName)) { - ImmutableList.copyOf(watchers.get(clusterName)) - .forEach(w -> w.onResourceDoesNotExist(clusterName)); - } - } - - private void deliverError(Status error) { - watchers.values().stream() - .flatMap(List::stream) - .forEach(w -> w.onError(error)); - } - } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 006440be6c..32361684a6 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2211,6 +2211,23 @@ public abstract class GrpcXdsClientImplTestBase { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } + @Test + public void cdsResponseWithEmptyAggregateCluster() { + DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE, + cdsResourceWatcher); + List candidates = Arrays.asList(); + Any clusterAggregate = + Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, null, candidates)); + call.sendResponse(CDS, clusterAggregate, VERSION_1, "0000"); + + // Client sent an ACK CDS request. + String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: " + + "Cluster cluster.googleapis.com: aggregate ClusterConfig.clusters must not be empty"; + call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + } + @Test public void cdsResponseWithCircuitBreakers() { DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE, diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java index 3036db5b09..66c9c5c537 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java @@ -106,7 +106,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -117,7 +117,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -214,7 +214,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test @@ -225,7 +225,7 @@ public class RingHashLoadBalancerProviderTest { assertThat(configOrError.getError()).isNotNull(); assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(configOrError.getError().getDescription()) - .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + .isEqualTo("Invalid 'minRingSize'/'maxRingSize'"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index aea1ad66d7..8ac0f8c9d8 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -28,7 +28,6 @@ import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME; import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT; import static io.grpc.xds.XdsTestUtils.RDS_NAME; import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster; -import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -48,28 +47,22 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.grpc.BindableService; import io.grpc.ChannelLogger; -import io.grpc.ManagedChannel; import io.grpc.NameResolver; -import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.StatusOrMatcher; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.internal.GrpcUtil; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; -import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceMetadata; -import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; -import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; import java.io.IOException; import java.util.ArrayDeque; @@ -96,7 +89,6 @@ import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.InOrder; -import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -108,21 +100,20 @@ public class XdsDependencyManagerTest { public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName(); public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName(); - @Mock - private XdsClientMetricReporter xdsClientMetricReporter; - private final SynchronizationContext syncContext = new SynchronizationContext((t, e) -> { throw new AssertionError(e); }); - - private ManagedChannel channel; - private XdsClient xdsClient; - private XdsDependencyManager xdsDependencyManager; - private TestWatcher xdsConfigWatcher; - private Server xdsServer; - private final FakeClock fakeClock = new FakeClock(); + + private XdsClient xdsClient = XdsTestUtils.createXdsClient( + Collections.singletonList("control-plane"), + serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport( + InProcessChannelBuilder.forName(serverInfo.target()).directExecutor().build()), + fakeClock); + + private TestWatcher xdsConfigWatcher; + private final String serverName = "the-service-name"; private final Queue loadReportCalls = new ArrayDeque<>(); private final AtomicBoolean adsEnded = new AtomicBoolean(true); @@ -150,10 +141,12 @@ public class XdsDependencyManagerTest { .build(); private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); + private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager( + xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler); @Before public void setUp() throws Exception { - xdsServer = cleanupRule.register(InProcessServerBuilder + cleanupRule.register(InProcessServerBuilder .forName("control-plane") .addService(controlPlaneService) .addService(lrsService) @@ -163,15 +156,6 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName); - channel = cleanupRule.register( - InProcessChannelBuilder.forName("control-plane").directExecutor().build()); - XdsTransportFactory xdsTransportFactory = - ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel); - - xdsClient = CommonBootstrapperTestUtils.createXdsClient( - Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock, - new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); - testWatcher = new TestWatcher(); xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher)); defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName); @@ -183,9 +167,6 @@ public class XdsDependencyManagerTest { xdsDependencyManager.shutdown(); } xdsClient.shutdown(); - channel.shutdown(); // channel not owned by XdsClient - - xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); assertThat(adsEnded.get()).isTrue(); assertThat(lrsEnded.get()).isTrue(); @@ -194,8 +175,7 @@ public class XdsDependencyManagerTest { @Test public void verify_basic_config() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); testWatcher.verifyStats(1, 0); @@ -203,8 +183,7 @@ public class XdsDependencyManagerTest { @Test public void verify_config_update() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); @@ -221,8 +200,7 @@ public class XdsDependencyManagerTest { @Test public void verify_simple_aggregate() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); @@ -281,8 +259,7 @@ public class XdsDependencyManagerTest { List childNames2 = Arrays.asList("clusterA", "clusterX"); XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); @@ -313,8 +290,7 @@ public class XdsDependencyManagerTest { @Test public void testDelayedSubscription() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String rootName1 = "root_c"; @@ -360,8 +336,7 @@ public class XdsDependencyManagerTest { edsMap.put("garbageEds", clusterLoadAssignment); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); @@ -393,8 +368,9 @@ public class XdsDependencyManagerTest { @Test public void testMissingLds() { String ldsName = "badLdsName"; - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext, serverName, ldsName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -409,8 +385,7 @@ public class XdsDependencyManagerTest { Listener serverListener = ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build(); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -427,8 +402,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, clientListener)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher).onUpdate( @@ -444,8 +418,7 @@ public class XdsDependencyManagerTest { "wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME); controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); // Update with a config that has a virtual host that doesn't match the server name verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); @@ -460,8 +433,9 @@ public class XdsDependencyManagerTest { String ldsResourceName = "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1"; - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext, serverName, ldsResourceName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate( argThat(StatusOrMatcher.hasStatus( @@ -474,8 +448,7 @@ public class XdsDependencyManagerTest { @Test public void testChangeRdsName_fromLds() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String newRdsName = "newRdsName1"; @@ -530,8 +503,7 @@ public class XdsDependencyManagerTest { // Start the actual test InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); @@ -576,8 +548,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isTrue(); @@ -611,12 +582,12 @@ public class XdsDependencyManagerTest { // The cycle is loaded and detected InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getClusters().get("clusterA").hasValue()).isFalse(); assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle"); + assertThat(config.getClusters().get("clusterB").hasValue()).isTrue(); // Orphan the cycle and it is discarded routeConfig = @@ -657,8 +628,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); // Start the actual test - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); assertThat(initialConfig.getClusters().keySet()) @@ -675,8 +645,7 @@ public class XdsDependencyManagerTest { @Test public void testChangeRdsName_FromLds_complexTree() { - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); // Create the same tree as in testMultipleParentsInCdsTree Cluster rootCluster = @@ -721,8 +690,7 @@ public class XdsDependencyManagerTest { public void testChangeAggCluster() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); // Setup initial config A -> A1 -> (A11, A12) @@ -775,8 +743,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig( ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME, Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build())); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); Status status = xdsUpdateCaptor.getValue().getValue() @@ -789,8 +756,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -822,8 +788,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -855,8 +820,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -888,8 +852,7 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", ENDPOINT_HOSTNAME, ENDPOINT_PORT); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); verify(xdsConfigWatcher).onUpdate(any()); @@ -922,8 +885,7 @@ public class XdsDependencyManagerTest { ENDPOINT_HOSTNAME, ENDPOINT_PORT); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, - serverName, serverName, nameResolverArgs, scheduler); + xdsDependencyManager.start(xdsConfigWatcher); inOrder.verify(xdsConfigWatcher).onUpdate(any()); xdsDependencyManager.shutdown(); diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index ec1ccd7c6d..e85058f0f3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -52,14 +52,20 @@ import io.grpc.BindableService; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.StatusOr; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; import io.grpc.stub.StreamObserver; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.Locality; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; +import io.grpc.xds.client.XdsTransportFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -364,6 +370,32 @@ public class XdsTestUtils { .setApiListener(clientListenerBuilder.build()).build(); } + public static XdsClient createXdsClient( + List serverUris, + XdsTransportFactory xdsTransportFactory, + FakeClock fakeClock) { + return createXdsClient( + CommonBootstrapperTestUtils.buildBootStrap(serverUris), + xdsTransportFactory, + fakeClock, + new XdsClientMetricReporter() {}); + } + + /** Calls {@link CommonBootstrapperTestUtils#createXdsClient} with gRPC-specific values. */ + public static XdsClient createXdsClient( + Bootstrapper.BootstrapInfo bootstrapInfo, + XdsTransportFactory xdsTransportFactory, + FakeClock fakeClock, + XdsClientMetricReporter xdsClientMetricReporter) { + return CommonBootstrapperTestUtils.createXdsClient( + bootstrapInfo, + xdsTransportFactory, + fakeClock, + new ExponentialBackoffPolicy.Provider(), + MessagePrinter.INSTANCE, + xdsClientMetricReporter); + } + /** * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with * the same list of clusterName:clusterServiceName pair.