mirror of https://github.com/grpc/grpc-java.git
xds: Convert CdsLb to XdsDepManager
I noticed we deviated from gRFC A37 in some ways. It turned out those were added to the gRFC later in https://github.com/grpc/proposal/pull/344: - NACKing empty aggregate clusters - Failing aggregate cluster when children could not be loaded - Recusion limit of 16. We had this behavior already, but it was ascribed to matching C++ There's disagreement on whether we should actually fail the aggregate cluster for bad children, so I'm preserving the pre-existing behavior for now. The code is now doing a depth-first leaf traversal, not breadth-first. This was odd to see, but the code was also pretty old, so the reasoning seems lost to history. Since we haven't seen more than a single level of aggregate clusters in practice, this wouldn't have been noticed by users. XdsDependencyManager.start() was created to guarantee that the callback could not be called before returning from the constructor. Otherwise XDS_CLUSTER_SUBSCRIPT_REGISTRY could potentially be null.
This commit is contained in:
parent
a16d655919
commit
297ab05efe
|
@ -21,36 +21,30 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
|||
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.errorprone.annotations.CheckReturnValue;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.StatusOr;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
|
||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
|
||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
|
||||
import io.grpc.xds.client.XdsClient;
|
||||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||
import io.grpc.xds.XdsConfig.Subscription;
|
||||
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
|
||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
||||
import io.grpc.xds.client.XdsLogger;
|
||||
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
|
||||
|
@ -60,13 +54,11 @@ import javax.annotation.Nullable;
|
|||
final class CdsLoadBalancer2 extends LoadBalancer {
|
||||
private final XdsLogger logger;
|
||||
private final Helper helper;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final LoadBalancerRegistry lbRegistry;
|
||||
// Following fields are effectively final.
|
||||
private ObjectPool<XdsClient> xdsClientPool;
|
||||
private XdsClient xdsClient;
|
||||
private CdsLbState cdsLbState;
|
||||
private ResolvedAddresses resolvedAddresses;
|
||||
private String clusterName;
|
||||
private Subscription clusterSubscription;
|
||||
private LoadBalancer childLb;
|
||||
|
||||
CdsLoadBalancer2(Helper helper) {
|
||||
this(helper, LoadBalancerRegistry.getDefaultRegistry());
|
||||
|
@ -75,7 +67,6 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
@VisibleForTesting
|
||||
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
|
||||
this.helper = checkNotNull(helper, "helper");
|
||||
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
||||
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
|
||||
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
|
||||
logger.log(XdsLogLevel.INFO, "Created");
|
||||
|
@ -83,25 +74,115 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
|
||||
@Override
|
||||
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
if (this.resolvedAddresses != null) {
|
||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||
if (this.clusterName == null) {
|
||||
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
|
||||
if (config.isDynamic) {
|
||||
clusterSubscription = resolvedAddresses.getAttributes()
|
||||
.get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
|
||||
.subscribeToCluster(config.name);
|
||||
}
|
||||
this.clusterName = config.name;
|
||||
}
|
||||
XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
|
||||
StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
|
||||
if (clusterConfigOr == null) {
|
||||
if (clusterSubscription == null) {
|
||||
// Should be impossible, because XdsDependencyManager wouldn't have generated this
|
||||
return fail(Status.INTERNAL.withDescription(
|
||||
errorPrefix() + "Unable to find non-dynamic root cluster"));
|
||||
}
|
||||
// The dynamic cluster must not have loaded yet
|
||||
return Status.OK;
|
||||
}
|
||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||
this.resolvedAddresses = resolvedAddresses;
|
||||
xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
|
||||
xdsClient = xdsClientPool.getObject();
|
||||
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
|
||||
cdsLbState = new CdsLbState(config.name);
|
||||
cdsLbState.start();
|
||||
return Status.OK;
|
||||
if (!clusterConfigOr.hasValue()) {
|
||||
return fail(clusterConfigOr.getStatus());
|
||||
}
|
||||
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
|
||||
List<String> leafNames;
|
||||
if (clusterConfig.getChildren() instanceof AggregateConfig) {
|
||||
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
|
||||
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
|
||||
leafNames = ImmutableList.of(clusterName);
|
||||
} else {
|
||||
return fail(Status.INTERNAL.withDescription(
|
||||
errorPrefix() + "Unexpected cluster children type: "
|
||||
+ clusterConfig.getChildren().getClass()));
|
||||
}
|
||||
if (leafNames.isEmpty()) {
|
||||
// Should be impossible, because XdsClusterResource validated this
|
||||
return fail(Status.UNAVAILABLE.withDescription(
|
||||
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
|
||||
}
|
||||
|
||||
Status noneFoundError = Status.INTERNAL
|
||||
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
|
||||
List<DiscoveryMechanism> instances = new ArrayList<>();
|
||||
for (String leafName : leafNames) {
|
||||
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
|
||||
if (!leafConfigOr.hasValue()) {
|
||||
noneFoundError = leafConfigOr.getStatus();
|
||||
continue;
|
||||
}
|
||||
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
|
||||
noneFoundError = Status.INTERNAL.withDescription(
|
||||
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
|
||||
+ leafConfigOr.getValue().getChildren().getClass());
|
||||
continue;
|
||||
}
|
||||
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
|
||||
DiscoveryMechanism instance;
|
||||
if (result.clusterType() == ClusterType.EDS) {
|
||||
instance = DiscoveryMechanism.forEds(
|
||||
leafName,
|
||||
result.edsServiceName(),
|
||||
result.lrsServerInfo(),
|
||||
result.maxConcurrentRequests(),
|
||||
result.upstreamTlsContext(),
|
||||
result.filterMetadata(),
|
||||
result.outlierDetection());
|
||||
} else {
|
||||
instance = DiscoveryMechanism.forLogicalDns(
|
||||
leafName,
|
||||
result.dnsHostName(),
|
||||
result.lrsServerInfo(),
|
||||
result.maxConcurrentRequests(),
|
||||
result.upstreamTlsContext(),
|
||||
result.filterMetadata());
|
||||
}
|
||||
instances.add(instance);
|
||||
}
|
||||
if (instances.isEmpty()) {
|
||||
return fail(noneFoundError);
|
||||
}
|
||||
|
||||
// The LB policy config is provided in service_config.proto/JSON format.
|
||||
NameResolver.ConfigOrError configOrError =
|
||||
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
|
||||
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
|
||||
if (configOrError.getError() != null) {
|
||||
// Should be impossible, because XdsClusterResource validated this
|
||||
return fail(Status.INTERNAL.withDescription(
|
||||
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
|
||||
}
|
||||
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.unmodifiableList(instances),
|
||||
configOrError.getConfig(),
|
||||
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
|
||||
if (childLb == null) {
|
||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||
}
|
||||
return childLb.acceptResolvedAddresses(
|
||||
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
|
||||
if (cdsLbState != null && cdsLbState.childLb != null) {
|
||||
cdsLbState.childLb.handleNameResolutionError(error);
|
||||
if (childLb != null) {
|
||||
childLb.handleNameResolutionError(error);
|
||||
} else {
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
|
||||
|
@ -111,314 +192,28 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
|||
@Override
|
||||
public void shutdown() {
|
||||
logger.log(XdsLogLevel.INFO, "Shutdown");
|
||||
if (cdsLbState != null) {
|
||||
cdsLbState.shutdown();
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
childLb = null;
|
||||
}
|
||||
if (xdsClientPool != null) {
|
||||
xdsClientPool.returnObject(xdsClient);
|
||||
if (clusterSubscription != null) {
|
||||
clusterSubscription.close();
|
||||
clusterSubscription = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
|
||||
* receiving the CDS LB policy config with the top-level cluster name.
|
||||
*/
|
||||
private final class CdsLbState {
|
||||
|
||||
private final ClusterState root;
|
||||
private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
|
||||
private LoadBalancer childLb;
|
||||
|
||||
private CdsLbState(String rootCluster) {
|
||||
root = new ClusterState(rootCluster);
|
||||
@CheckReturnValue // don't forget to return up the stack after the fail call
|
||||
private Status fail(Status error) {
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
childLb = null;
|
||||
}
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
|
||||
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
|
||||
}
|
||||
|
||||
private void start() {
|
||||
root.start();
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
root.shutdown();
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClusterDiscovered() {
|
||||
List<DiscoveryMechanism> instances = new ArrayList<>();
|
||||
|
||||
// Used for loop detection to break the infinite recursion that loops would cause
|
||||
Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
|
||||
Status loopStatus = null;
|
||||
|
||||
// Level-order traversal.
|
||||
// Collect configurations for all non-aggregate (leaf) clusters.
|
||||
Queue<ClusterState> queue = new ArrayDeque<>();
|
||||
queue.add(root);
|
||||
while (!queue.isEmpty()) {
|
||||
int size = queue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ClusterState clusterState = queue.remove();
|
||||
if (!clusterState.discovered) {
|
||||
return; // do not proceed until all clusters discovered
|
||||
}
|
||||
if (clusterState.result == null) { // resource revoked or not exists
|
||||
continue;
|
||||
}
|
||||
if (clusterState.isLeaf) {
|
||||
if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
|
||||
DiscoveryMechanism instance;
|
||||
if (clusterState.result.clusterType() == ClusterType.EDS) {
|
||||
instance = DiscoveryMechanism.forEds(
|
||||
clusterState.name, clusterState.result.edsServiceName(),
|
||||
clusterState.result.lrsServerInfo(),
|
||||
clusterState.result.maxConcurrentRequests(),
|
||||
clusterState.result.upstreamTlsContext(),
|
||||
clusterState.result.filterMetadata(),
|
||||
clusterState.result.outlierDetection());
|
||||
} else { // logical DNS
|
||||
instance = DiscoveryMechanism.forLogicalDns(
|
||||
clusterState.name, clusterState.result.dnsHostName(),
|
||||
clusterState.result.lrsServerInfo(),
|
||||
clusterState.result.maxConcurrentRequests(),
|
||||
clusterState.result.upstreamTlsContext(),
|
||||
clusterState.result.filterMetadata());
|
||||
}
|
||||
instances.add(instance);
|
||||
}
|
||||
} else {
|
||||
if (clusterState.childClusterStates == null) {
|
||||
continue;
|
||||
}
|
||||
// Do loop detection and break recursion if detected
|
||||
List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
|
||||
if (namesCausingLoops.isEmpty()) {
|
||||
queue.addAll(clusterState.childClusterStates.values());
|
||||
} else {
|
||||
// Do cleanup
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
childLb = null;
|
||||
}
|
||||
if (loopStatus != null) {
|
||||
logger.log(XdsLogLevel.WARNING,
|
||||
"Multiple loops in CDS config. Old msg: " + loopStatus.getDescription());
|
||||
}
|
||||
loopStatus = Status.UNAVAILABLE.withDescription(String.format(
|
||||
"CDS error: circular aggregate clusters directly under %s for "
|
||||
+ "root cluster %s, named %s, xDS node ID: %s",
|
||||
clusterState.name, root.name, namesCausingLoops,
|
||||
xdsClient.getBootstrapInfo().node().getId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (loopStatus != null) {
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
|
||||
return;
|
||||
}
|
||||
|
||||
if (instances.isEmpty()) { // none of non-aggregate clusters exists
|
||||
if (childLb != null) {
|
||||
childLb.shutdown();
|
||||
childLb = null;
|
||||
}
|
||||
Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
|
||||
"CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
|
||||
+ " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
|
||||
return;
|
||||
}
|
||||
|
||||
// The LB policy config is provided in service_config.proto/JSON format.
|
||||
NameResolver.ConfigOrError configOrError =
|
||||
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
|
||||
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
|
||||
if (configOrError.getError() != null) {
|
||||
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Collections.unmodifiableList(instances),
|
||||
configOrError.getConfig(),
|
||||
root.result.isHttp11ProxyAvailable());
|
||||
if (childLb == null) {
|
||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||
}
|
||||
childLb.handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns children that would cause loops and builds up the parentClusters map.
|
||||
**/
|
||||
|
||||
private List<String> identifyLoops(ClusterState clusterState,
|
||||
Map<ClusterState, List<ClusterState>> parentClusters) {
|
||||
Set<String> ancestors = new HashSet<>();
|
||||
ancestors.add(clusterState.name);
|
||||
addAncestors(ancestors, clusterState, parentClusters);
|
||||
|
||||
List<String> namesCausingLoops = new ArrayList<>();
|
||||
for (ClusterState state : clusterState.childClusterStates.values()) {
|
||||
if (ancestors.contains(state.name)) {
|
||||
namesCausingLoops.add(state.name);
|
||||
}
|
||||
}
|
||||
|
||||
// Update parent map with entries from remaining children to clusterState
|
||||
clusterState.childClusterStates.values().stream()
|
||||
.filter(child -> !namesCausingLoops.contains(child.name))
|
||||
.forEach(
|
||||
child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
|
||||
.add(clusterState));
|
||||
|
||||
return namesCausingLoops;
|
||||
}
|
||||
|
||||
/** Recursively add all parents to the ancestors list. **/
|
||||
private void addAncestors(Set<String> ancestors, ClusterState clusterState,
|
||||
Map<ClusterState, List<ClusterState>> parentClusters) {
|
||||
List<ClusterState> directParents = parentClusters.get(clusterState);
|
||||
if (directParents != null) {
|
||||
directParents.stream().map(c -> c.name).forEach(ancestors::add);
|
||||
directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClusterDiscoveryError(Status error) {
|
||||
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
|
||||
Status errorWithNodeId = error.withDescription(
|
||||
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
|
||||
if (childLb != null) {
|
||||
childLb.handleNameResolutionError(errorWithNodeId);
|
||||
} else {
|
||||
helper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId)));
|
||||
}
|
||||
}
|
||||
|
||||
private final class ClusterState implements ResourceWatcher<CdsUpdate> {
|
||||
private final String name;
|
||||
@Nullable
|
||||
private Map<String, ClusterState> childClusterStates;
|
||||
@Nullable
|
||||
private CdsUpdate result;
|
||||
// Following fields are effectively final.
|
||||
private boolean isLeaf;
|
||||
private boolean discovered;
|
||||
private boolean shutdown;
|
||||
|
||||
private ClusterState(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
private void start() {
|
||||
shutdown = false;
|
||||
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
shutdown = true;
|
||||
xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
|
||||
if (childClusterStates != null) {
|
||||
// recursively shut down all descendants
|
||||
childClusterStates.values().stream()
|
||||
.filter(state -> !state.shutdown)
|
||||
.forEach(ClusterState::shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Status error) {
|
||||
Status status = Status.UNAVAILABLE
|
||||
.withDescription(
|
||||
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
|
||||
name, error.getCode(), error.getDescription()))
|
||||
.withCause(error.getCause());
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
// All watchers should receive the same error, so we only propagate it once.
|
||||
if (ClusterState.this == root) {
|
||||
handleClusterDiscoveryError(status);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(String resourceName) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
discovered = true;
|
||||
result = null;
|
||||
if (childClusterStates != null) {
|
||||
for (ClusterState state : childClusterStates.values()) {
|
||||
state.shutdown();
|
||||
}
|
||||
childClusterStates = null;
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChanged(final CdsUpdate update) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
|
||||
discovered = true;
|
||||
result = update;
|
||||
if (update.clusterType() == ClusterType.AGGREGATE) {
|
||||
isLeaf = false;
|
||||
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
|
||||
update.clusterName(), update.prioritizedClusterNames());
|
||||
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
|
||||
for (String cluster : update.prioritizedClusterNames()) {
|
||||
if (newChildStates.containsKey(cluster)) {
|
||||
logger.log(XdsLogLevel.WARNING,
|
||||
String.format("duplicate cluster name %s in aggregate %s is being ignored",
|
||||
cluster, update.clusterName()));
|
||||
continue;
|
||||
}
|
||||
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
|
||||
ClusterState childState;
|
||||
if (clusterStates.containsKey(cluster)) {
|
||||
childState = clusterStates.get(cluster);
|
||||
if (childState.shutdown) {
|
||||
childState.start();
|
||||
}
|
||||
} else {
|
||||
childState = new ClusterState(cluster);
|
||||
clusterStates.put(cluster, childState);
|
||||
childState.start();
|
||||
}
|
||||
newChildStates.put(cluster, childState);
|
||||
} else {
|
||||
newChildStates.put(cluster, childClusterStates.remove(cluster));
|
||||
}
|
||||
}
|
||||
if (childClusterStates != null) { // stop subscribing to revoked child clusters
|
||||
for (ClusterState watcher : childClusterStates.values()) {
|
||||
watcher.shutdown();
|
||||
}
|
||||
}
|
||||
childClusterStates = newChildStates;
|
||||
} else if (update.clusterType() == ClusterType.EDS) {
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
|
||||
update.clusterName(), update.edsServiceName());
|
||||
} else { // logical DNS
|
||||
isLeaf = true;
|
||||
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
|
||||
}
|
||||
handleClusterDiscovered();
|
||||
}
|
||||
|
||||
}
|
||||
private String errorPrefix() {
|
||||
return "CdsLb for " + clusterName + ": ";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,6 @@ import java.util.Map;
|
|||
@Internal
|
||||
public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
||||
|
||||
private static final String CLUSTER_KEY = "cluster";
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
|
@ -70,9 +68,12 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
|||
*/
|
||||
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
|
||||
try {
|
||||
String cluster =
|
||||
JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY);
|
||||
return ConfigOrError.fromConfig(new CdsConfig(cluster));
|
||||
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
|
||||
Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic");
|
||||
if (isDynamic == null) {
|
||||
isDynamic = Boolean.FALSE;
|
||||
}
|
||||
return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic));
|
||||
} catch (RuntimeException e) {
|
||||
return ConfigOrError.fromError(
|
||||
Status.UNAVAILABLE.withCause(e).withDescription(
|
||||
|
@ -89,15 +90,28 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
|||
* Name of cluster to query CDS for.
|
||||
*/
|
||||
final String name;
|
||||
/**
|
||||
* Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of
|
||||
* it without an explicit cluster subscription.
|
||||
*/
|
||||
final boolean isDynamic;
|
||||
|
||||
CdsConfig(String name) {
|
||||
this(name, false);
|
||||
}
|
||||
|
||||
CdsConfig(String name, boolean isDynamic) {
|
||||
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
|
||||
this.name = name;
|
||||
this.isDynamic = isDynamic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this).add("name", name).toString();
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("name", name)
|
||||
.add("isDynamic", isDynamic)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ public final class RingHashLoadBalancerProvider extends LoadBalancerProvider {
|
|||
}
|
||||
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
|
||||
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
|
||||
"Invalid 'mingRingSize'/'maxRingSize'"));
|
||||
"Invalid 'minRingSize'/'maxRingSize'"));
|
||||
}
|
||||
return ConfigOrError.fromConfig(
|
||||
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
|
||||
|
|
|
@ -172,7 +172,9 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
|
||||
lbConfig.getRawConfigValue());
|
||||
if (configOrError.getError() != null) {
|
||||
throw new ResourceInvalidException(structOrError.getErrorDetail());
|
||||
throw new ResourceInvalidException(
|
||||
"Failed to parse lb config for cluster '" + cluster.getName() + "': "
|
||||
+ configOrError.getError());
|
||||
}
|
||||
|
||||
updateBuilder.lbPolicyConfig(lbPolicyConfig);
|
||||
|
@ -209,6 +211,10 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
|||
} catch (InvalidProtocolBufferException e) {
|
||||
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
|
||||
}
|
||||
if (clusterConfig.getClustersList().isEmpty()) {
|
||||
return StructOrError.fromError("Cluster " + clusterName
|
||||
+ ": aggregate ClusterConfig.clusters must not be empty");
|
||||
}
|
||||
return StructOrError.fromStruct(CdsUpdate.forAggregate(
|
||||
clusterName, clusterConfig.getClustersList()));
|
||||
}
|
||||
|
|
|
@ -254,6 +254,12 @@ final class XdsConfig {
|
|||
}
|
||||
|
||||
public interface XdsClusterSubscriptionRegistry {
|
||||
Closeable subscribeToCluster(String clusterName);
|
||||
Subscription subscribeToCluster(String clusterName);
|
||||
}
|
||||
|
||||
public interface Subscription extends Closeable {
|
||||
/** Release resources without throwing exceptions. */
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package io.grpc.xds;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -34,8 +35,6 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
|||
import io.grpc.xds.client.XdsClient;
|
||||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -56,39 +55,43 @@ import javax.annotation.Nullable;
|
|||
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
||||
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
|
||||
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
|
||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
|
||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
||||
private final String listenerName;
|
||||
private final XdsClient xdsClient;
|
||||
private final XdsConfigWatcher xdsConfigWatcher;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final String dataPlaneAuthority;
|
||||
private XdsConfigWatcher xdsConfigWatcher;
|
||||
|
||||
private StatusOr<XdsConfig> lastUpdate = null;
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
|
||||
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
||||
|
||||
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
|
||||
XdsDependencyManager(XdsClient xdsClient,
|
||||
SynchronizationContext syncContext, String dataPlaneAuthority,
|
||||
String listenerName, NameResolver.Args nameResolverArgs,
|
||||
ScheduledExecutorService scheduler) {
|
||||
this.listenerName = checkNotNull(listenerName, "listenerName");
|
||||
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
|
||||
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
|
||||
checkNotNull(nameResolverArgs, "nameResolverArgs");
|
||||
checkNotNull(scheduler, "scheduler");
|
||||
|
||||
// start the ball rolling
|
||||
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
||||
}
|
||||
|
||||
public static String toContextStr(String typeName, String resourceName) {
|
||||
return typeName + " resource " + resourceName;
|
||||
}
|
||||
|
||||
public void start(XdsConfigWatcher xdsConfigWatcher) {
|
||||
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
|
||||
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
||||
// start the ball rolling
|
||||
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Closeable subscribeToCluster(String clusterName) {
|
||||
public XdsConfig.Subscription subscribeToCluster(String clusterName) {
|
||||
checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
|
||||
checkNotNull(clusterName, "clusterName");
|
||||
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
||||
|
||||
|
@ -291,10 +294,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
addConfigForCluster(clusters, childCluster, ancestors, tracer);
|
||||
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
|
||||
if (!config.hasValue()) {
|
||||
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
||||
"Unable to get leaves for " + clusterName + ": "
|
||||
+ config.getStatus().getDescription())));
|
||||
return;
|
||||
// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
|
||||
// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
|
||||
// watchers reports a transient ADS stream error, the policy should report that it is in
|
||||
// TRANSIENT_FAILURE if it has never passed a config to its child.
|
||||
//
|
||||
// But there's currently disagreement about whether that is actually what we want, and
|
||||
// that was not originally implemented in gRPC Java. So we're keeping Java's old
|
||||
// behavior for now and only failing the "leaves" (which is a bit arbitrary for a
|
||||
// cycle).
|
||||
leafNames.add(childCluster);
|
||||
continue;
|
||||
}
|
||||
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
|
||||
if (children instanceof AggregateConfig) {
|
||||
|
@ -325,6 +335,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
default:
|
||||
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
|
||||
}
|
||||
if (clusters.containsKey(clusterName)) {
|
||||
// If a cycle is detected, we'll have detected it while recursing, so now there will be a key
|
||||
// present. We don't want to overwrite it with a non-error value.
|
||||
return;
|
||||
}
|
||||
clusters.put(clusterName, StatusOr.fromValue(
|
||||
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
||||
}
|
||||
|
@ -406,7 +421,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
void onUpdate(StatusOr<XdsConfig> config);
|
||||
}
|
||||
|
||||
private final class ClusterSubscription implements Closeable {
|
||||
private final class ClusterSubscription implements XdsConfig.Subscription {
|
||||
private final String clusterName;
|
||||
boolean closed; // Accessed from syncContext
|
||||
|
||||
|
@ -419,7 +434,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void close() {
|
||||
releaseSubscription(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,7 +230,8 @@ final class XdsNameResolver extends NameResolver {
|
|||
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
|
||||
callCounterProvider = SharedCallCounterMap.getInstance();
|
||||
|
||||
resolveState = new ResolveState(ldsResourceName); // auto starts
|
||||
resolveState = new ResolveState(ldsResourceName);
|
||||
resolveState.start();
|
||||
}
|
||||
|
||||
private static String expandPercentS(String template, String replacement) {
|
||||
|
@ -653,10 +654,14 @@ final class XdsNameResolver extends NameResolver {
|
|||
private ResolveState(String ldsResourceName) {
|
||||
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
|
||||
xdsDependencyManager =
|
||||
new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName,
|
||||
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
|
||||
nameResolverArgs, scheduler);
|
||||
}
|
||||
|
||||
void start() {
|
||||
xdsDependencyManager.start(this);
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
if (stopped) {
|
||||
return;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2211,6 +2211,23 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cdsResponseWithEmptyAggregateCluster() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
||||
cdsResourceWatcher);
|
||||
List<String> candidates = Arrays.asList();
|
||||
Any clusterAggregate =
|
||||
Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, null, candidates));
|
||||
call.sendResponse(CDS, clusterAggregate, VERSION_1, "0000");
|
||||
|
||||
// Client sent an ACK CDS request.
|
||||
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
|
||||
+ "Cluster cluster.googleapis.com: aggregate ClusterConfig.clusters must not be empty";
|
||||
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
|
||||
verify(cdsResourceWatcher).onError(errorCaptor.capture());
|
||||
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cdsResponseWithCircuitBreakers() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
||||
|
|
|
@ -106,7 +106,7 @@ public class RingHashLoadBalancerProviderTest {
|
|||
assertThat(configOrError.getError()).isNotNull();
|
||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(configOrError.getError().getDescription())
|
||||
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -117,7 +117,7 @@ public class RingHashLoadBalancerProviderTest {
|
|||
assertThat(configOrError.getError()).isNotNull();
|
||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(configOrError.getError().getDescription())
|
||||
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -214,7 +214,7 @@ public class RingHashLoadBalancerProviderTest {
|
|||
assertThat(configOrError.getError()).isNotNull();
|
||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(configOrError.getError().getDescription())
|
||||
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -225,7 +225,7 @@ public class RingHashLoadBalancerProviderTest {
|
|||
assertThat(configOrError.getError()).isNotNull();
|
||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(configOrError.getError().getDescription())
|
||||
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -28,7 +28,6 @@ import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
|
|||
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
|
||||
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
|
||||
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
|
||||
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
|
||||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
|
@ -48,28 +47,22 @@ import io.envoyproxy.envoy.config.listener.v3.Listener;
|
|||
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusOr;
|
||||
import io.grpc.StatusOrMatcher;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.testing.GrpcCleanupRule;
|
||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||
import io.grpc.xds.client.CommonBootstrapperTestUtils;
|
||||
import io.grpc.xds.client.XdsClient;
|
||||
import io.grpc.xds.client.XdsClient.ResourceMetadata;
|
||||
import io.grpc.xds.client.XdsClientMetricReporter;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import io.grpc.xds.client.XdsTransportFactory;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
|
@ -96,7 +89,6 @@ import org.mockito.ArgumentMatcher;
|
|||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
|
@ -108,21 +100,20 @@ public class XdsDependencyManagerTest {
|
|||
public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName();
|
||||
public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName();
|
||||
|
||||
@Mock
|
||||
private XdsClientMetricReporter xdsClientMetricReporter;
|
||||
|
||||
private final SynchronizationContext syncContext =
|
||||
new SynchronizationContext((t, e) -> {
|
||||
throw new AssertionError(e);
|
||||
});
|
||||
|
||||
private ManagedChannel channel;
|
||||
private XdsClient xdsClient;
|
||||
private XdsDependencyManager xdsDependencyManager;
|
||||
private TestWatcher xdsConfigWatcher;
|
||||
private Server xdsServer;
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
|
||||
private XdsClient xdsClient = XdsTestUtils.createXdsClient(
|
||||
Collections.singletonList("control-plane"),
|
||||
serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport(
|
||||
InProcessChannelBuilder.forName(serverInfo.target()).directExecutor().build()),
|
||||
fakeClock);
|
||||
|
||||
private TestWatcher xdsConfigWatcher;
|
||||
|
||||
private final String serverName = "the-service-name";
|
||||
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
|
||||
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
|
||||
|
@ -150,10 +141,12 @@ public class XdsDependencyManagerTest {
|
|||
.build();
|
||||
|
||||
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
|
||||
private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
|
||||
xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
xdsServer = cleanupRule.register(InProcessServerBuilder
|
||||
cleanupRule.register(InProcessServerBuilder
|
||||
.forName("control-plane")
|
||||
.addService(controlPlaneService)
|
||||
.addService(lrsService)
|
||||
|
@ -163,15 +156,6 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName);
|
||||
|
||||
channel = cleanupRule.register(
|
||||
InProcessChannelBuilder.forName("control-plane").directExecutor().build());
|
||||
XdsTransportFactory xdsTransportFactory =
|
||||
ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel);
|
||||
|
||||
xdsClient = CommonBootstrapperTestUtils.createXdsClient(
|
||||
Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock,
|
||||
new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter);
|
||||
|
||||
testWatcher = new TestWatcher();
|
||||
xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher));
|
||||
defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName);
|
||||
|
@ -183,9 +167,6 @@ public class XdsDependencyManagerTest {
|
|||
xdsDependencyManager.shutdown();
|
||||
}
|
||||
xdsClient.shutdown();
|
||||
channel.shutdown(); // channel not owned by XdsClient
|
||||
|
||||
xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(adsEnded.get()).isTrue();
|
||||
assertThat(lrsEnded.get()).isTrue();
|
||||
|
@ -194,8 +175,7 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
@Test
|
||||
public void verify_basic_config() {
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||
testWatcher.verifyStats(1, 0);
|
||||
|
@ -203,8 +183,7 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
@Test
|
||||
public void verify_config_update() {
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||
|
@ -221,8 +200,7 @@ public class XdsDependencyManagerTest {
|
|||
@Test
|
||||
public void verify_simple_aggregate() {
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||
|
||||
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
|
||||
|
@ -281,8 +259,7 @@ public class XdsDependencyManagerTest {
|
|||
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
|
||||
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
|
||||
|
@ -313,8 +290,7 @@ public class XdsDependencyManagerTest {
|
|||
@Test
|
||||
public void testDelayedSubscription() {
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||
|
||||
String rootName1 = "root_c";
|
||||
|
@ -360,8 +336,7 @@ public class XdsDependencyManagerTest {
|
|||
edsMap.put("garbageEds", clusterLoadAssignment);
|
||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
|
@ -393,8 +368,9 @@ public class XdsDependencyManagerTest {
|
|||
@Test
|
||||
public void testMissingLds() {
|
||||
String ldsName = "badLdsName";
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
||||
serverName, ldsName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||
verify(xdsConfigWatcher).onUpdate(
|
||||
|
@ -409,8 +385,7 @@ public class XdsDependencyManagerTest {
|
|||
Listener serverListener =
|
||||
ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build();
|
||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener));
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||
verify(xdsConfigWatcher).onUpdate(
|
||||
|
@ -427,8 +402,7 @@ public class XdsDependencyManagerTest {
|
|||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
|
||||
ImmutableMap.of(serverName, clientListener));
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||
verify(xdsConfigWatcher).onUpdate(
|
||||
|
@ -444,8 +418,7 @@ public class XdsDependencyManagerTest {
|
|||
"wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME);
|
||||
controlPlaneService.setXdsConfig(
|
||||
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
// Update with a config that has a virtual host that doesn't match the server name
|
||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
|
@ -460,8 +433,9 @@ public class XdsDependencyManagerTest {
|
|||
String ldsResourceName =
|
||||
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
||||
serverName, ldsResourceName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(
|
||||
argThat(StatusOrMatcher.hasStatus(
|
||||
|
@ -474,8 +448,7 @@ public class XdsDependencyManagerTest {
|
|||
@Test
|
||||
public void testChangeRdsName_fromLds() {
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||
|
||||
String newRdsName = "newRdsName1";
|
||||
|
@ -530,8 +503,7 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
// Start the actual test
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
||||
|
||||
|
@ -576,8 +548,7 @@ public class XdsDependencyManagerTest {
|
|||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||
assertThat(config.getClusters().get("clusterA").hasValue()).isTrue();
|
||||
|
@ -611,12 +582,12 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
// The cycle is loaded and detected
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
|
||||
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
|
||||
assertThat(config.getClusters().get("clusterB").hasValue()).isTrue();
|
||||
|
||||
// Orphan the cycle and it is discarded
|
||||
routeConfig =
|
||||
|
@ -657,8 +628,7 @@ public class XdsDependencyManagerTest {
|
|||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||
|
||||
// Start the actual test
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
||||
assertThat(initialConfig.getClusters().keySet())
|
||||
|
@ -675,8 +645,7 @@ public class XdsDependencyManagerTest {
|
|||
|
||||
@Test
|
||||
public void testChangeRdsName_FromLds_complexTree() {
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
// Create the same tree as in testMultipleParentsInCdsTree
|
||||
Cluster rootCluster =
|
||||
|
@ -721,8 +690,7 @@ public class XdsDependencyManagerTest {
|
|||
public void testChangeAggCluster() {
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
// Setup initial config A -> A1 -> (A11, A12)
|
||||
|
@ -775,8 +743,7 @@ public class XdsDependencyManagerTest {
|
|||
controlPlaneService.setXdsConfig(
|
||||
ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
|
||||
Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build()));
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||
Status status = xdsUpdateCaptor.getValue().getValue()
|
||||
|
@ -789,8 +756,7 @@ public class XdsDependencyManagerTest {
|
|||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
|
@ -822,8 +788,7 @@ public class XdsDependencyManagerTest {
|
|||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
|
@ -855,8 +820,7 @@ public class XdsDependencyManagerTest {
|
|||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
|
@ -888,8 +852,7 @@ public class XdsDependencyManagerTest {
|
|||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
|
||||
verify(xdsConfigWatcher).onUpdate(any());
|
||||
|
||||
|
@ -922,8 +885,7 @@ public class XdsDependencyManagerTest {
|
|||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||
|
||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||
serverName, serverName, nameResolverArgs, scheduler);
|
||||
xdsDependencyManager.start(xdsConfigWatcher);
|
||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||
xdsDependencyManager.shutdown();
|
||||
|
||||
|
|
|
@ -52,14 +52,20 @@ import io.grpc.BindableService;
|
|||
import io.grpc.Context;
|
||||
import io.grpc.Context.CancellationListener;
|
||||
import io.grpc.StatusOr;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.JsonParser;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.xds.Endpoints.LbEndpoint;
|
||||
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
||||
import io.grpc.xds.client.Bootstrapper;
|
||||
import io.grpc.xds.client.CommonBootstrapperTestUtils;
|
||||
import io.grpc.xds.client.Locality;
|
||||
import io.grpc.xds.client.XdsClient;
|
||||
import io.grpc.xds.client.XdsClientMetricReporter;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import io.grpc.xds.client.XdsTransportFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -364,6 +370,32 @@ public class XdsTestUtils {
|
|||
.setApiListener(clientListenerBuilder.build()).build();
|
||||
}
|
||||
|
||||
public static XdsClient createXdsClient(
|
||||
List<String> serverUris,
|
||||
XdsTransportFactory xdsTransportFactory,
|
||||
FakeClock fakeClock) {
|
||||
return createXdsClient(
|
||||
CommonBootstrapperTestUtils.buildBootStrap(serverUris),
|
||||
xdsTransportFactory,
|
||||
fakeClock,
|
||||
new XdsClientMetricReporter() {});
|
||||
}
|
||||
|
||||
/** Calls {@link CommonBootstrapperTestUtils#createXdsClient} with gRPC-specific values. */
|
||||
public static XdsClient createXdsClient(
|
||||
Bootstrapper.BootstrapInfo bootstrapInfo,
|
||||
XdsTransportFactory xdsTransportFactory,
|
||||
FakeClock fakeClock,
|
||||
XdsClientMetricReporter xdsClientMetricReporter) {
|
||||
return CommonBootstrapperTestUtils.createXdsClient(
|
||||
bootstrapInfo,
|
||||
xdsTransportFactory,
|
||||
fakeClock,
|
||||
new ExponentialBackoffPolicy.Provider(),
|
||||
MessagePrinter.INSTANCE,
|
||||
xdsClientMetricReporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
|
||||
* the same list of clusterName:clusterServiceName pair.
|
||||
|
|
Loading…
Reference in New Issue