mirror of https://github.com/grpc/grpc-java.git
Revert "xds: Convert CdsLb to XdsDepManager"
This reverts commit297ab05efe
. b/430347751 shows multiple concerning behaviors in the xDS stack with the new A74 config update model. XdsDepManager and CdsLB2 still seem to be working correctly, but the change is exacerbated issues in other parts of the stack, like RingHashConfig not having equals fixed ina8de9f07ab
. Revert only for the v1.74.x release, leaving it on master.
This commit is contained in:
parent
02023ef9ac
commit
61da156b6c
|
@ -21,30 +21,36 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||||
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
|
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.errorprone.annotations.CheckReturnValue;
|
|
||||||
import io.grpc.InternalLogId;
|
import io.grpc.InternalLogId;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
import io.grpc.LoadBalancerRegistry;
|
import io.grpc.LoadBalancerRegistry;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.SynchronizationContext;
|
||||||
|
import io.grpc.internal.ObjectPool;
|
||||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||||
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
|
||||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
|
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
|
||||||
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
|
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
|
||||||
import io.grpc.xds.XdsConfig.Subscription;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
|
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
|
||||||
import io.grpc.xds.client.XdsLogger;
|
import io.grpc.xds.client.XdsLogger;
|
||||||
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
|
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
|
||||||
|
@ -54,11 +60,13 @@ import java.util.List;
|
||||||
final class CdsLoadBalancer2 extends LoadBalancer {
|
final class CdsLoadBalancer2 extends LoadBalancer {
|
||||||
private final XdsLogger logger;
|
private final XdsLogger logger;
|
||||||
private final Helper helper;
|
private final Helper helper;
|
||||||
|
private final SynchronizationContext syncContext;
|
||||||
private final LoadBalancerRegistry lbRegistry;
|
private final LoadBalancerRegistry lbRegistry;
|
||||||
// Following fields are effectively final.
|
// Following fields are effectively final.
|
||||||
private String clusterName;
|
private ObjectPool<XdsClient> xdsClientPool;
|
||||||
private Subscription clusterSubscription;
|
private XdsClient xdsClient;
|
||||||
private LoadBalancer childLb;
|
private CdsLbState cdsLbState;
|
||||||
|
private ResolvedAddresses resolvedAddresses;
|
||||||
|
|
||||||
CdsLoadBalancer2(Helper helper) {
|
CdsLoadBalancer2(Helper helper) {
|
||||||
this(helper, LoadBalancerRegistry.getDefaultRegistry());
|
this(helper, LoadBalancerRegistry.getDefaultRegistry());
|
||||||
|
@ -67,6 +75,7 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
|
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
|
||||||
this.helper = checkNotNull(helper, "helper");
|
this.helper = checkNotNull(helper, "helper");
|
||||||
|
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
||||||
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
|
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
|
||||||
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
|
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
|
||||||
logger.log(XdsLogLevel.INFO, "Created");
|
logger.log(XdsLogLevel.INFO, "Created");
|
||||||
|
@ -74,115 +83,25 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
if (this.resolvedAddresses != null) {
|
||||||
if (this.clusterName == null) {
|
|
||||||
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
|
||||||
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
|
|
||||||
if (config.isDynamic) {
|
|
||||||
clusterSubscription = resolvedAddresses.getAttributes()
|
|
||||||
.get(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY)
|
|
||||||
.subscribeToCluster(config.name);
|
|
||||||
}
|
|
||||||
this.clusterName = config.name;
|
|
||||||
}
|
|
||||||
XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG);
|
|
||||||
StatusOr<XdsClusterConfig> clusterConfigOr = xdsConfig.getClusters().get(clusterName);
|
|
||||||
if (clusterConfigOr == null) {
|
|
||||||
if (clusterSubscription == null) {
|
|
||||||
// Should be impossible, because XdsDependencyManager wouldn't have generated this
|
|
||||||
return fail(Status.INTERNAL.withDescription(
|
|
||||||
errorPrefix() + "Unable to find non-dynamic root cluster"));
|
|
||||||
}
|
|
||||||
// The dynamic cluster must not have loaded yet
|
|
||||||
return Status.OK;
|
return Status.OK;
|
||||||
}
|
}
|
||||||
if (!clusterConfigOr.hasValue()) {
|
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||||
return fail(clusterConfigOr.getStatus());
|
this.resolvedAddresses = resolvedAddresses;
|
||||||
}
|
xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
|
||||||
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
|
xdsClient = xdsClientPool.getObject();
|
||||||
List<String> leafNames;
|
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||||
if (clusterConfig.getChildren() instanceof AggregateConfig) {
|
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
|
||||||
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
|
cdsLbState = new CdsLbState(config.name);
|
||||||
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
|
cdsLbState.start();
|
||||||
leafNames = ImmutableList.of(clusterName);
|
return Status.OK;
|
||||||
} else {
|
|
||||||
return fail(Status.INTERNAL.withDescription(
|
|
||||||
errorPrefix() + "Unexpected cluster children type: "
|
|
||||||
+ clusterConfig.getChildren().getClass()));
|
|
||||||
}
|
|
||||||
if (leafNames.isEmpty()) {
|
|
||||||
// Should be impossible, because XdsClusterResource validated this
|
|
||||||
return fail(Status.UNAVAILABLE.withDescription(
|
|
||||||
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
|
|
||||||
}
|
|
||||||
|
|
||||||
Status noneFoundError = Status.INTERNAL
|
|
||||||
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
|
|
||||||
List<DiscoveryMechanism> instances = new ArrayList<>();
|
|
||||||
for (String leafName : leafNames) {
|
|
||||||
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
|
|
||||||
if (!leafConfigOr.hasValue()) {
|
|
||||||
noneFoundError = leafConfigOr.getStatus();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
|
|
||||||
noneFoundError = Status.INTERNAL.withDescription(
|
|
||||||
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
|
|
||||||
+ leafConfigOr.getValue().getChildren().getClass());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
|
|
||||||
DiscoveryMechanism instance;
|
|
||||||
if (result.clusterType() == ClusterType.EDS) {
|
|
||||||
instance = DiscoveryMechanism.forEds(
|
|
||||||
leafName,
|
|
||||||
result.edsServiceName(),
|
|
||||||
result.lrsServerInfo(),
|
|
||||||
result.maxConcurrentRequests(),
|
|
||||||
result.upstreamTlsContext(),
|
|
||||||
result.filterMetadata(),
|
|
||||||
result.outlierDetection());
|
|
||||||
} else {
|
|
||||||
instance = DiscoveryMechanism.forLogicalDns(
|
|
||||||
leafName,
|
|
||||||
result.dnsHostName(),
|
|
||||||
result.lrsServerInfo(),
|
|
||||||
result.maxConcurrentRequests(),
|
|
||||||
result.upstreamTlsContext(),
|
|
||||||
result.filterMetadata());
|
|
||||||
}
|
|
||||||
instances.add(instance);
|
|
||||||
}
|
|
||||||
if (instances.isEmpty()) {
|
|
||||||
return fail(noneFoundError);
|
|
||||||
}
|
|
||||||
|
|
||||||
// The LB policy config is provided in service_config.proto/JSON format.
|
|
||||||
NameResolver.ConfigOrError configOrError =
|
|
||||||
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
|
|
||||||
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
|
|
||||||
if (configOrError.getError() != null) {
|
|
||||||
// Should be impossible, because XdsClusterResource validated this
|
|
||||||
return fail(Status.INTERNAL.withDescription(
|
|
||||||
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
|
|
||||||
}
|
|
||||||
|
|
||||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
|
||||||
Collections.unmodifiableList(instances),
|
|
||||||
configOrError.getConfig(),
|
|
||||||
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
|
|
||||||
if (childLb == null) {
|
|
||||||
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
|
||||||
}
|
|
||||||
return childLb.acceptResolvedAddresses(
|
|
||||||
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleNameResolutionError(Status error) {
|
public void handleNameResolutionError(Status error) {
|
||||||
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
|
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
|
||||||
if (childLb != null) {
|
if (cdsLbState != null && cdsLbState.childLb != null) {
|
||||||
childLb.handleNameResolutionError(error);
|
cdsLbState.childLb.handleNameResolutionError(error);
|
||||||
} else {
|
} else {
|
||||||
helper.updateBalancingState(
|
helper.updateBalancingState(
|
||||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
|
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
|
||||||
|
@ -192,28 +111,314 @@ final class CdsLoadBalancer2 extends LoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
logger.log(XdsLogLevel.INFO, "Shutdown");
|
logger.log(XdsLogLevel.INFO, "Shutdown");
|
||||||
if (childLb != null) {
|
if (cdsLbState != null) {
|
||||||
childLb.shutdown();
|
cdsLbState.shutdown();
|
||||||
childLb = null;
|
|
||||||
}
|
}
|
||||||
if (clusterSubscription != null) {
|
if (xdsClientPool != null) {
|
||||||
clusterSubscription.close();
|
xdsClientPool.returnObject(xdsClient);
|
||||||
clusterSubscription = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@CheckReturnValue // don't forget to return up the stack after the fail call
|
/**
|
||||||
private Status fail(Status error) {
|
* The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
|
||||||
|
* receiving the CDS LB policy config with the top-level cluster name.
|
||||||
|
*/
|
||||||
|
private final class CdsLbState {
|
||||||
|
|
||||||
|
private final ClusterState root;
|
||||||
|
private final Map<String, ClusterState> clusterStates = new ConcurrentHashMap<>();
|
||||||
|
private LoadBalancer childLb;
|
||||||
|
|
||||||
|
private CdsLbState(String rootCluster) {
|
||||||
|
root = new ClusterState(rootCluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start() {
|
||||||
|
root.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdown() {
|
||||||
|
root.shutdown();
|
||||||
|
if (childLb != null) {
|
||||||
|
childLb.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleClusterDiscovered() {
|
||||||
|
List<DiscoveryMechanism> instances = new ArrayList<>();
|
||||||
|
|
||||||
|
// Used for loop detection to break the infinite recursion that loops would cause
|
||||||
|
Map<ClusterState, List<ClusterState>> parentClusters = new HashMap<>();
|
||||||
|
Status loopStatus = null;
|
||||||
|
|
||||||
|
// Level-order traversal.
|
||||||
|
// Collect configurations for all non-aggregate (leaf) clusters.
|
||||||
|
Queue<ClusterState> queue = new ArrayDeque<>();
|
||||||
|
queue.add(root);
|
||||||
|
while (!queue.isEmpty()) {
|
||||||
|
int size = queue.size();
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
ClusterState clusterState = queue.remove();
|
||||||
|
if (!clusterState.discovered) {
|
||||||
|
return; // do not proceed until all clusters discovered
|
||||||
|
}
|
||||||
|
if (clusterState.result == null) { // resource revoked or not exists
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (clusterState.isLeaf) {
|
||||||
|
if (instances.stream().map(inst -> inst.cluster).noneMatch(clusterState.name::equals)) {
|
||||||
|
DiscoveryMechanism instance;
|
||||||
|
if (clusterState.result.clusterType() == ClusterType.EDS) {
|
||||||
|
instance = DiscoveryMechanism.forEds(
|
||||||
|
clusterState.name, clusterState.result.edsServiceName(),
|
||||||
|
clusterState.result.lrsServerInfo(),
|
||||||
|
clusterState.result.maxConcurrentRequests(),
|
||||||
|
clusterState.result.upstreamTlsContext(),
|
||||||
|
clusterState.result.filterMetadata(),
|
||||||
|
clusterState.result.outlierDetection());
|
||||||
|
} else { // logical DNS
|
||||||
|
instance = DiscoveryMechanism.forLogicalDns(
|
||||||
|
clusterState.name, clusterState.result.dnsHostName(),
|
||||||
|
clusterState.result.lrsServerInfo(),
|
||||||
|
clusterState.result.maxConcurrentRequests(),
|
||||||
|
clusterState.result.upstreamTlsContext(),
|
||||||
|
clusterState.result.filterMetadata());
|
||||||
|
}
|
||||||
|
instances.add(instance);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (clusterState.childClusterStates == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Do loop detection and break recursion if detected
|
||||||
|
List<String> namesCausingLoops = identifyLoops(clusterState, parentClusters);
|
||||||
|
if (namesCausingLoops.isEmpty()) {
|
||||||
|
queue.addAll(clusterState.childClusterStates.values());
|
||||||
|
} else {
|
||||||
|
// Do cleanup
|
||||||
if (childLb != null) {
|
if (childLb != null) {
|
||||||
childLb.shutdown();
|
childLb.shutdown();
|
||||||
childLb = null;
|
childLb = null;
|
||||||
}
|
}
|
||||||
|
if (loopStatus != null) {
|
||||||
|
logger.log(XdsLogLevel.WARNING,
|
||||||
|
"Multiple loops in CDS config. Old msg: " + loopStatus.getDescription());
|
||||||
|
}
|
||||||
|
loopStatus = Status.UNAVAILABLE.withDescription(String.format(
|
||||||
|
"CDS error: circular aggregate clusters directly under %s for "
|
||||||
|
+ "root cluster %s, named %s, xDS node ID: %s",
|
||||||
|
clusterState.name, root.name, namesCausingLoops,
|
||||||
|
xdsClient.getBootstrapInfo().node().getId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (loopStatus != null) {
|
||||||
helper.updateBalancingState(
|
helper.updateBalancingState(
|
||||||
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
|
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
|
||||||
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String errorPrefix() {
|
if (instances.isEmpty()) { // none of non-aggregate clusters exists
|
||||||
return "CdsLb for " + clusterName + ": ";
|
if (childLb != null) {
|
||||||
|
childLb.shutdown();
|
||||||
|
childLb = null;
|
||||||
|
}
|
||||||
|
Status unavailable = Status.UNAVAILABLE.withDescription(String.format(
|
||||||
|
"CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s"
|
||||||
|
+ " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId()));
|
||||||
|
helper.updateBalancingState(
|
||||||
|
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The LB policy config is provided in service_config.proto/JSON format.
|
||||||
|
NameResolver.ConfigOrError configOrError =
|
||||||
|
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
|
||||||
|
Arrays.asList(root.result.lbPolicyConfig()), lbRegistry);
|
||||||
|
if (configOrError.getError() != null) {
|
||||||
|
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
|
||||||
|
.asRuntimeException();
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||||
|
Collections.unmodifiableList(instances),
|
||||||
|
configOrError.getConfig(),
|
||||||
|
root.result.isHttp11ProxyAvailable());
|
||||||
|
if (childLb == null) {
|
||||||
|
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
|
||||||
|
}
|
||||||
|
childLb.handleResolvedAddresses(
|
||||||
|
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns children that would cause loops and builds up the parentClusters map.
|
||||||
|
**/
|
||||||
|
|
||||||
|
private List<String> identifyLoops(ClusterState clusterState,
|
||||||
|
Map<ClusterState, List<ClusterState>> parentClusters) {
|
||||||
|
Set<String> ancestors = new HashSet<>();
|
||||||
|
ancestors.add(clusterState.name);
|
||||||
|
addAncestors(ancestors, clusterState, parentClusters);
|
||||||
|
|
||||||
|
List<String> namesCausingLoops = new ArrayList<>();
|
||||||
|
for (ClusterState state : clusterState.childClusterStates.values()) {
|
||||||
|
if (ancestors.contains(state.name)) {
|
||||||
|
namesCausingLoops.add(state.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update parent map with entries from remaining children to clusterState
|
||||||
|
clusterState.childClusterStates.values().stream()
|
||||||
|
.filter(child -> !namesCausingLoops.contains(child.name))
|
||||||
|
.forEach(
|
||||||
|
child -> parentClusters.computeIfAbsent(child, k -> new ArrayList<>())
|
||||||
|
.add(clusterState));
|
||||||
|
|
||||||
|
return namesCausingLoops;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Recursively add all parents to the ancestors list. **/
|
||||||
|
private void addAncestors(Set<String> ancestors, ClusterState clusterState,
|
||||||
|
Map<ClusterState, List<ClusterState>> parentClusters) {
|
||||||
|
List<ClusterState> directParents = parentClusters.get(clusterState);
|
||||||
|
if (directParents != null) {
|
||||||
|
directParents.stream().map(c -> c.name).forEach(ancestors::add);
|
||||||
|
directParents.forEach(p -> addAncestors(ancestors, p, parentClusters));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleClusterDiscoveryError(Status error) {
|
||||||
|
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
|
||||||
|
Status errorWithNodeId = error.withDescription(
|
||||||
|
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
|
||||||
|
if (childLb != null) {
|
||||||
|
childLb.handleNameResolutionError(errorWithNodeId);
|
||||||
|
} else {
|
||||||
|
helper.updateBalancingState(
|
||||||
|
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ClusterState implements ResourceWatcher<CdsUpdate> {
|
||||||
|
private final String name;
|
||||||
|
@Nullable
|
||||||
|
private Map<String, ClusterState> childClusterStates;
|
||||||
|
@Nullable
|
||||||
|
private CdsUpdate result;
|
||||||
|
// Following fields are effectively final.
|
||||||
|
private boolean isLeaf;
|
||||||
|
private boolean discovered;
|
||||||
|
private boolean shutdown;
|
||||||
|
|
||||||
|
private ClusterState(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start() {
|
||||||
|
shutdown = false;
|
||||||
|
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdown() {
|
||||||
|
shutdown = true;
|
||||||
|
xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
|
||||||
|
if (childClusterStates != null) {
|
||||||
|
// recursively shut down all descendants
|
||||||
|
childClusterStates.values().stream()
|
||||||
|
.filter(state -> !state.shutdown)
|
||||||
|
.forEach(ClusterState::shutdown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Status error) {
|
||||||
|
Status status = Status.UNAVAILABLE
|
||||||
|
.withDescription(
|
||||||
|
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
|
||||||
|
name, error.getCode(), error.getDescription()))
|
||||||
|
.withCause(error.getCause());
|
||||||
|
if (shutdown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// All watchers should receive the same error, so we only propagate it once.
|
||||||
|
if (ClusterState.this == root) {
|
||||||
|
handleClusterDiscoveryError(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResourceDoesNotExist(String resourceName) {
|
||||||
|
if (shutdown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
discovered = true;
|
||||||
|
result = null;
|
||||||
|
if (childClusterStates != null) {
|
||||||
|
for (ClusterState state : childClusterStates.values()) {
|
||||||
|
state.shutdown();
|
||||||
|
}
|
||||||
|
childClusterStates = null;
|
||||||
|
}
|
||||||
|
handleClusterDiscovered();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onChanged(final CdsUpdate update) {
|
||||||
|
if (shutdown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
|
||||||
|
discovered = true;
|
||||||
|
result = update;
|
||||||
|
if (update.clusterType() == ClusterType.AGGREGATE) {
|
||||||
|
isLeaf = false;
|
||||||
|
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
|
||||||
|
update.clusterName(), update.prioritizedClusterNames());
|
||||||
|
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
|
||||||
|
for (String cluster : update.prioritizedClusterNames()) {
|
||||||
|
if (newChildStates.containsKey(cluster)) {
|
||||||
|
logger.log(XdsLogLevel.WARNING,
|
||||||
|
String.format("duplicate cluster name %s in aggregate %s is being ignored",
|
||||||
|
cluster, update.clusterName()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
|
||||||
|
ClusterState childState;
|
||||||
|
if (clusterStates.containsKey(cluster)) {
|
||||||
|
childState = clusterStates.get(cluster);
|
||||||
|
if (childState.shutdown) {
|
||||||
|
childState.start();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
childState = new ClusterState(cluster);
|
||||||
|
clusterStates.put(cluster, childState);
|
||||||
|
childState.start();
|
||||||
|
}
|
||||||
|
newChildStates.put(cluster, childState);
|
||||||
|
} else {
|
||||||
|
newChildStates.put(cluster, childClusterStates.remove(cluster));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (childClusterStates != null) { // stop subscribing to revoked child clusters
|
||||||
|
for (ClusterState watcher : childClusterStates.values()) {
|
||||||
|
watcher.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
childClusterStates = newChildStates;
|
||||||
|
} else if (update.clusterType() == ClusterType.EDS) {
|
||||||
|
isLeaf = true;
|
||||||
|
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
|
||||||
|
update.clusterName(), update.edsServiceName());
|
||||||
|
} else { // logical DNS
|
||||||
|
isLeaf = true;
|
||||||
|
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
|
||||||
|
}
|
||||||
|
handleClusterDiscovered();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,8 @@ import java.util.Map;
|
||||||
@Internal
|
@Internal
|
||||||
public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
|
|
||||||
|
private static final String CLUSTER_KEY = "cluster";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isAvailable() {
|
public boolean isAvailable() {
|
||||||
return true;
|
return true;
|
||||||
|
@ -68,12 +70,9 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
*/
|
*/
|
||||||
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
|
static ConfigOrError parseLoadBalancingConfigPolicy(Map<String, ?> rawLoadBalancingPolicyConfig) {
|
||||||
try {
|
try {
|
||||||
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
|
String cluster =
|
||||||
Boolean isDynamic = JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, "is_dynamic");
|
JsonUtil.getString(rawLoadBalancingPolicyConfig, CLUSTER_KEY);
|
||||||
if (isDynamic == null) {
|
return ConfigOrError.fromConfig(new CdsConfig(cluster));
|
||||||
isDynamic = Boolean.FALSE;
|
|
||||||
}
|
|
||||||
return ConfigOrError.fromConfig(new CdsConfig(cluster, isDynamic));
|
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
return ConfigOrError.fromError(
|
return ConfigOrError.fromError(
|
||||||
Status.UNAVAILABLE.withCause(e).withDescription(
|
Status.UNAVAILABLE.withCause(e).withDescription(
|
||||||
|
@ -90,28 +89,15 @@ public class CdsLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
* Name of cluster to query CDS for.
|
* Name of cluster to query CDS for.
|
||||||
*/
|
*/
|
||||||
final String name;
|
final String name;
|
||||||
/**
|
|
||||||
* Whether this cluster was dynamically chosen, so the XdsDependencyManager may be unaware of
|
|
||||||
* it without an explicit cluster subscription.
|
|
||||||
*/
|
|
||||||
final boolean isDynamic;
|
|
||||||
|
|
||||||
CdsConfig(String name) {
|
CdsConfig(String name) {
|
||||||
this(name, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
CdsConfig(String name, boolean isDynamic) {
|
|
||||||
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
|
checkArgument(name != null && !name.isEmpty(), "name is null or empty");
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.isDynamic = isDynamic;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return MoreObjects.toStringHelper(this)
|
return MoreObjects.toStringHelper(this).add("name", name).toString();
|
||||||
.add("name", name)
|
|
||||||
.add("isDynamic", isDynamic)
|
|
||||||
.toString();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ public final class RingHashLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
}
|
}
|
||||||
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
|
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
|
||||||
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
|
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
|
||||||
"Invalid 'minRingSize'/'maxRingSize'"));
|
"Invalid 'mingRingSize'/'maxRingSize'"));
|
||||||
}
|
}
|
||||||
return ConfigOrError.fromConfig(
|
return ConfigOrError.fromConfig(
|
||||||
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
|
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
|
||||||
|
|
|
@ -172,9 +172,7 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
|
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
|
||||||
lbConfig.getRawConfigValue());
|
lbConfig.getRawConfigValue());
|
||||||
if (configOrError.getError() != null) {
|
if (configOrError.getError() != null) {
|
||||||
throw new ResourceInvalidException(
|
throw new ResourceInvalidException(structOrError.getErrorDetail());
|
||||||
"Failed to parse lb config for cluster '" + cluster.getName() + "': "
|
|
||||||
+ configOrError.getError());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updateBuilder.lbPolicyConfig(lbPolicyConfig);
|
updateBuilder.lbPolicyConfig(lbPolicyConfig);
|
||||||
|
@ -211,10 +209,6 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> {
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
|
return StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
|
||||||
}
|
}
|
||||||
if (clusterConfig.getClustersList().isEmpty()) {
|
|
||||||
return StructOrError.fromError("Cluster " + clusterName
|
|
||||||
+ ": aggregate ClusterConfig.clusters must not be empty");
|
|
||||||
}
|
|
||||||
return StructOrError.fromStruct(CdsUpdate.forAggregate(
|
return StructOrError.fromStruct(CdsUpdate.forAggregate(
|
||||||
clusterName, clusterConfig.getClustersList()));
|
clusterName, clusterConfig.getClustersList()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,12 +254,6 @@ final class XdsConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface XdsClusterSubscriptionRegistry {
|
public interface XdsClusterSubscriptionRegistry {
|
||||||
Subscription subscribeToCluster(String clusterName);
|
Closeable subscribeToCluster(String clusterName);
|
||||||
}
|
|
||||||
|
|
||||||
public interface Subscription extends Closeable {
|
|
||||||
/** Release resources without throwing exceptions. */
|
|
||||||
@Override
|
|
||||||
void close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ package io.grpc.xds;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static com.google.common.base.Preconditions.checkState;
|
|
||||||
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
|
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -35,6 +34,8 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
||||||
import io.grpc.xds.client.XdsClient;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -55,43 +56,39 @@ import javax.annotation.Nullable;
|
||||||
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
||||||
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
|
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
|
||||||
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
|
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
|
||||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
|
||||||
private final String listenerName;
|
private final String listenerName;
|
||||||
private final XdsClient xdsClient;
|
private final XdsClient xdsClient;
|
||||||
|
private final XdsConfigWatcher xdsConfigWatcher;
|
||||||
private final SynchronizationContext syncContext;
|
private final SynchronizationContext syncContext;
|
||||||
private final String dataPlaneAuthority;
|
private final String dataPlaneAuthority;
|
||||||
private XdsConfigWatcher xdsConfigWatcher;
|
|
||||||
|
|
||||||
private StatusOr<XdsConfig> lastUpdate = null;
|
private StatusOr<XdsConfig> lastUpdate = null;
|
||||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
|
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
|
||||||
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
||||||
|
|
||||||
XdsDependencyManager(XdsClient xdsClient,
|
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
|
||||||
SynchronizationContext syncContext, String dataPlaneAuthority,
|
SynchronizationContext syncContext, String dataPlaneAuthority,
|
||||||
String listenerName, NameResolver.Args nameResolverArgs,
|
String listenerName, NameResolver.Args nameResolverArgs,
|
||||||
ScheduledExecutorService scheduler) {
|
ScheduledExecutorService scheduler) {
|
||||||
this.listenerName = checkNotNull(listenerName, "listenerName");
|
this.listenerName = checkNotNull(listenerName, "listenerName");
|
||||||
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
|
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
|
||||||
|
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
||||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||||
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
|
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
|
||||||
checkNotNull(nameResolverArgs, "nameResolverArgs");
|
checkNotNull(nameResolverArgs, "nameResolverArgs");
|
||||||
checkNotNull(scheduler, "scheduler");
|
checkNotNull(scheduler, "scheduler");
|
||||||
|
|
||||||
|
// start the ball rolling
|
||||||
|
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String toContextStr(String typeName, String resourceName) {
|
public static String toContextStr(String typeName, String resourceName) {
|
||||||
return typeName + " resource " + resourceName;
|
return typeName + " resource " + resourceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(XdsConfigWatcher xdsConfigWatcher) {
|
|
||||||
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
|
|
||||||
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
|
||||||
// start the ball rolling
|
|
||||||
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XdsConfig.Subscription subscribeToCluster(String clusterName) {
|
public Closeable subscribeToCluster(String clusterName) {
|
||||||
checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
|
|
||||||
checkNotNull(clusterName, "clusterName");
|
checkNotNull(clusterName, "clusterName");
|
||||||
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
||||||
|
|
||||||
|
@ -294,17 +291,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
addConfigForCluster(clusters, childCluster, ancestors, tracer);
|
addConfigForCluster(clusters, childCluster, ancestors, tracer);
|
||||||
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
|
StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
|
||||||
if (!config.hasValue()) {
|
if (!config.hasValue()) {
|
||||||
// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
|
clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
|
||||||
// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
|
"Unable to get leaves for " + clusterName + ": "
|
||||||
// watchers reports a transient ADS stream error, the policy should report that it is in
|
+ config.getStatus().getDescription())));
|
||||||
// TRANSIENT_FAILURE if it has never passed a config to its child.
|
return;
|
||||||
//
|
|
||||||
// But there's currently disagreement about whether that is actually what we want, and
|
|
||||||
// that was not originally implemented in gRPC Java. So we're keeping Java's old
|
|
||||||
// behavior for now and only failing the "leaves" (which is a bit arbitrary for a
|
|
||||||
// cycle).
|
|
||||||
leafNames.add(childCluster);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
|
XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
|
||||||
if (children instanceof AggregateConfig) {
|
if (children instanceof AggregateConfig) {
|
||||||
|
@ -336,11 +326,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
||||||
"Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
|
"Unknown type in cluster " + clusterName + " " + cdsUpdate.clusterType())));
|
||||||
}
|
}
|
||||||
if (clusters.containsKey(clusterName)) {
|
|
||||||
// If a cycle is detected, we'll have detected it while recursing, so now there will be a key
|
|
||||||
// present. We don't want to overwrite it with a non-error value.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
clusters.put(clusterName, StatusOr.fromValue(
|
clusters.put(clusterName, StatusOr.fromValue(
|
||||||
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
||||||
}
|
}
|
||||||
|
@ -422,7 +407,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
void onUpdate(StatusOr<XdsConfig> config);
|
void onUpdate(StatusOr<XdsConfig> config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class ClusterSubscription implements XdsConfig.Subscription {
|
private final class ClusterSubscription implements Closeable {
|
||||||
private final String clusterName;
|
private final String clusterName;
|
||||||
boolean closed; // Accessed from syncContext
|
boolean closed; // Accessed from syncContext
|
||||||
|
|
||||||
|
@ -435,7 +420,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() throws IOException {
|
||||||
releaseSubscription(this);
|
releaseSubscription(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,8 +230,7 @@ final class XdsNameResolver extends NameResolver {
|
||||||
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
|
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
|
||||||
callCounterProvider = SharedCallCounterMap.getInstance();
|
callCounterProvider = SharedCallCounterMap.getInstance();
|
||||||
|
|
||||||
resolveState = new ResolveState(ldsResourceName);
|
resolveState = new ResolveState(ldsResourceName); // auto starts
|
||||||
resolveState.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String expandPercentS(String template, String replacement) {
|
private static String expandPercentS(String template, String replacement) {
|
||||||
|
@ -654,14 +653,10 @@ final class XdsNameResolver extends NameResolver {
|
||||||
private ResolveState(String ldsResourceName) {
|
private ResolveState(String ldsResourceName) {
|
||||||
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
|
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
|
||||||
xdsDependencyManager =
|
xdsDependencyManager =
|
||||||
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
|
new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName,
|
||||||
nameResolverArgs, scheduler);
|
nameResolverArgs, scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
void start() {
|
|
||||||
xdsDependencyManager.start(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void shutdown() {
|
private void shutdown() {
|
||||||
if (stopped) {
|
if (stopped) {
|
||||||
return;
|
return;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2211,23 +2211,6 @@ public abstract class GrpcXdsClientImplTestBase {
|
||||||
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void cdsResponseWithEmptyAggregateCluster() {
|
|
||||||
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
|
||||||
cdsResourceWatcher);
|
|
||||||
List<String> candidates = Arrays.asList();
|
|
||||||
Any clusterAggregate =
|
|
||||||
Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, null, candidates));
|
|
||||||
call.sendResponse(CDS, clusterAggregate, VERSION_1, "0000");
|
|
||||||
|
|
||||||
// Client sent an ACK CDS request.
|
|
||||||
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
|
|
||||||
+ "Cluster cluster.googleapis.com: aggregate ClusterConfig.clusters must not be empty";
|
|
||||||
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
|
|
||||||
verify(cdsResourceWatcher).onError(errorCaptor.capture());
|
|
||||||
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void cdsResponseWithCircuitBreakers() {
|
public void cdsResponseWithCircuitBreakers() {
|
||||||
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
|
||||||
|
|
|
@ -106,7 +106,7 @@ public class RingHashLoadBalancerProviderTest {
|
||||||
assertThat(configOrError.getError()).isNotNull();
|
assertThat(configOrError.getError()).isNotNull();
|
||||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||||
assertThat(configOrError.getError().getDescription())
|
assertThat(configOrError.getError().getDescription())
|
||||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -117,7 +117,7 @@ public class RingHashLoadBalancerProviderTest {
|
||||||
assertThat(configOrError.getError()).isNotNull();
|
assertThat(configOrError.getError()).isNotNull();
|
||||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||||
assertThat(configOrError.getError().getDescription())
|
assertThat(configOrError.getError().getDescription())
|
||||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -214,7 +214,7 @@ public class RingHashLoadBalancerProviderTest {
|
||||||
assertThat(configOrError.getError()).isNotNull();
|
assertThat(configOrError.getError()).isNotNull();
|
||||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||||
assertThat(configOrError.getError().getDescription())
|
assertThat(configOrError.getError().getDescription())
|
||||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -225,7 +225,7 @@ public class RingHashLoadBalancerProviderTest {
|
||||||
assertThat(configOrError.getError()).isNotNull();
|
assertThat(configOrError.getError()).isNotNull();
|
||||||
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
assertThat(configOrError.getError().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||||
assertThat(configOrError.getError().getDescription())
|
assertThat(configOrError.getError().getDescription())
|
||||||
.isEqualTo("Invalid 'minRingSize'/'maxRingSize'");
|
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -28,6 +28,7 @@ import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
|
||||||
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
|
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
|
||||||
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
|
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
|
||||||
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
|
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
|
||||||
|
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
|
||||||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
|
@ -47,22 +48,28 @@ import io.envoyproxy.envoy.config.listener.v3.Listener;
|
||||||
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
||||||
import io.grpc.BindableService;
|
import io.grpc.BindableService;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
|
import io.grpc.Server;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.StatusOrMatcher;
|
import io.grpc.StatusOrMatcher;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||||
import io.grpc.inprocess.InProcessServerBuilder;
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
|
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.GrpcUtil;
|
import io.grpc.internal.GrpcUtil;
|
||||||
import io.grpc.testing.GrpcCleanupRule;
|
import io.grpc.testing.GrpcCleanupRule;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||||
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||||
|
import io.grpc.xds.client.CommonBootstrapperTestUtils;
|
||||||
import io.grpc.xds.client.XdsClient;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceMetadata;
|
import io.grpc.xds.client.XdsClient.ResourceMetadata;
|
||||||
|
import io.grpc.xds.client.XdsClientMetricReporter;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
|
import io.grpc.xds.client.XdsTransportFactory;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
|
@ -89,6 +96,7 @@ import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
import org.mockito.Captor;
|
import org.mockito.Captor;
|
||||||
import org.mockito.InOrder;
|
import org.mockito.InOrder;
|
||||||
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.junit.MockitoJUnit;
|
import org.mockito.junit.MockitoJUnit;
|
||||||
import org.mockito.junit.MockitoRule;
|
import org.mockito.junit.MockitoRule;
|
||||||
|
@ -100,20 +108,21 @@ public class XdsDependencyManagerTest {
|
||||||
public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName();
|
public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName();
|
||||||
public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName();
|
public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName();
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private XdsClientMetricReporter xdsClientMetricReporter;
|
||||||
|
|
||||||
private final SynchronizationContext syncContext =
|
private final SynchronizationContext syncContext =
|
||||||
new SynchronizationContext((t, e) -> {
|
new SynchronizationContext((t, e) -> {
|
||||||
throw new AssertionError(e);
|
throw new AssertionError(e);
|
||||||
});
|
});
|
||||||
private final FakeClock fakeClock = new FakeClock();
|
|
||||||
|
|
||||||
private XdsClient xdsClient = XdsTestUtils.createXdsClient(
|
|
||||||
Collections.singletonList("control-plane"),
|
|
||||||
serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport(
|
|
||||||
InProcessChannelBuilder.forName(serverInfo.target()).directExecutor().build()),
|
|
||||||
fakeClock);
|
|
||||||
|
|
||||||
|
private ManagedChannel channel;
|
||||||
|
private XdsClient xdsClient;
|
||||||
|
private XdsDependencyManager xdsDependencyManager;
|
||||||
private TestWatcher xdsConfigWatcher;
|
private TestWatcher xdsConfigWatcher;
|
||||||
|
private Server xdsServer;
|
||||||
|
|
||||||
|
private final FakeClock fakeClock = new FakeClock();
|
||||||
private final String serverName = "the-service-name";
|
private final String serverName = "the-service-name";
|
||||||
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
|
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
|
||||||
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
|
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
|
||||||
|
@ -141,12 +150,10 @@ public class XdsDependencyManagerTest {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
|
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
|
||||||
private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
|
|
||||||
xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler);
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
cleanupRule.register(InProcessServerBuilder
|
xdsServer = cleanupRule.register(InProcessServerBuilder
|
||||||
.forName("control-plane")
|
.forName("control-plane")
|
||||||
.addService(controlPlaneService)
|
.addService(controlPlaneService)
|
||||||
.addService(lrsService)
|
.addService(lrsService)
|
||||||
|
@ -156,6 +163,15 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName);
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName);
|
||||||
|
|
||||||
|
channel = cleanupRule.register(
|
||||||
|
InProcessChannelBuilder.forName("control-plane").directExecutor().build());
|
||||||
|
XdsTransportFactory xdsTransportFactory =
|
||||||
|
ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel);
|
||||||
|
|
||||||
|
xdsClient = CommonBootstrapperTestUtils.createXdsClient(
|
||||||
|
Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock,
|
||||||
|
new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter);
|
||||||
|
|
||||||
testWatcher = new TestWatcher();
|
testWatcher = new TestWatcher();
|
||||||
xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher));
|
xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher));
|
||||||
defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName);
|
defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName);
|
||||||
|
@ -167,6 +183,9 @@ public class XdsDependencyManagerTest {
|
||||||
xdsDependencyManager.shutdown();
|
xdsDependencyManager.shutdown();
|
||||||
}
|
}
|
||||||
xdsClient.shutdown();
|
xdsClient.shutdown();
|
||||||
|
channel.shutdown(); // channel not owned by XdsClient
|
||||||
|
|
||||||
|
xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
assertThat(adsEnded.get()).isTrue();
|
assertThat(adsEnded.get()).isTrue();
|
||||||
assertThat(lrsEnded.get()).isTrue();
|
assertThat(lrsEnded.get()).isTrue();
|
||||||
|
@ -175,7 +194,8 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void verify_basic_config() {
|
public void verify_basic_config() {
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||||
testWatcher.verifyStats(1, 0);
|
testWatcher.verifyStats(1, 0);
|
||||||
|
@ -183,7 +203,8 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void verify_config_update() {
|
public void verify_config_update() {
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||||
|
@ -200,7 +221,8 @@ public class XdsDependencyManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void verify_simple_aggregate() {
|
public void verify_simple_aggregate() {
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||||
|
|
||||||
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
|
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
|
||||||
|
@ -259,7 +281,8 @@ public class XdsDependencyManagerTest {
|
||||||
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
|
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
|
||||||
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
|
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
|
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
|
||||||
|
@ -290,7 +313,8 @@ public class XdsDependencyManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testDelayedSubscription() {
|
public void testDelayedSubscription() {
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||||
|
|
||||||
String rootName1 = "root_c";
|
String rootName1 = "root_c";
|
||||||
|
@ -336,7 +360,8 @@ public class XdsDependencyManagerTest {
|
||||||
edsMap.put("garbageEds", clusterLoadAssignment);
|
edsMap.put("garbageEds", clusterLoadAssignment);
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
|
@ -368,9 +393,8 @@ public class XdsDependencyManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testMissingLds() {
|
public void testMissingLds() {
|
||||||
String ldsName = "badLdsName";
|
String ldsName = "badLdsName";
|
||||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
serverName, ldsName, nameResolverArgs, scheduler);
|
serverName, ldsName, nameResolverArgs, scheduler);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
|
||||||
|
|
||||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||||
verify(xdsConfigWatcher).onUpdate(
|
verify(xdsConfigWatcher).onUpdate(
|
||||||
|
@ -385,7 +409,8 @@ public class XdsDependencyManagerTest {
|
||||||
Listener serverListener =
|
Listener serverListener =
|
||||||
ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build();
|
ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build();
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener));
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener));
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||||
verify(xdsConfigWatcher).onUpdate(
|
verify(xdsConfigWatcher).onUpdate(
|
||||||
|
@ -402,7 +427,8 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
|
||||||
ImmutableMap.of(serverName, clientListener));
|
ImmutableMap.of(serverName, clientListener));
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||||
verify(xdsConfigWatcher).onUpdate(
|
verify(xdsConfigWatcher).onUpdate(
|
||||||
|
@ -418,7 +444,8 @@ public class XdsDependencyManagerTest {
|
||||||
"wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME);
|
"wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME);
|
||||||
controlPlaneService.setXdsConfig(
|
controlPlaneService.setXdsConfig(
|
||||||
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
|
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
// Update with a config that has a virtual host that doesn't match the server name
|
// Update with a config that has a virtual host that doesn't match the server name
|
||||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
|
@ -433,9 +460,8 @@ public class XdsDependencyManagerTest {
|
||||||
String ldsResourceName =
|
String ldsResourceName =
|
||||||
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
|
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
|
||||||
|
|
||||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
serverName, ldsResourceName, nameResolverArgs, scheduler);
|
serverName, ldsResourceName, nameResolverArgs, scheduler);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(
|
verify(xdsConfigWatcher).onUpdate(
|
||||||
argThat(StatusOrMatcher.hasStatus(
|
argThat(StatusOrMatcher.hasStatus(
|
||||||
|
@ -448,7 +474,8 @@ public class XdsDependencyManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testChangeRdsName_fromLds() {
|
public void testChangeRdsName_fromLds() {
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
inOrder.verify(xdsConfigWatcher).onUpdate(StatusOr.fromValue(defaultXdsConfig));
|
||||||
|
|
||||||
String newRdsName = "newRdsName1";
|
String newRdsName = "newRdsName1";
|
||||||
|
@ -503,7 +530,8 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
// Start the actual test
|
// Start the actual test
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
||||||
|
|
||||||
|
@ -548,7 +576,8 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||||
|
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
assertThat(config.getClusters().get("clusterA").hasValue()).isTrue();
|
assertThat(config.getClusters().get("clusterA").hasValue()).isTrue();
|
||||||
|
@ -582,12 +611,12 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
// The cycle is loaded and detected
|
// The cycle is loaded and detected
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
|
assertThat(config.getClusters().get("clusterA").hasValue()).isFalse();
|
||||||
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
|
assertThat(config.getClusters().get("clusterA").getStatus().getDescription()).contains("cycle");
|
||||||
assertThat(config.getClusters().get("clusterB").hasValue()).isTrue();
|
|
||||||
|
|
||||||
// Orphan the cycle and it is discarded
|
// Orphan the cycle and it is discarded
|
||||||
routeConfig =
|
routeConfig =
|
||||||
|
@ -628,7 +657,8 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
|
||||||
|
|
||||||
// Start the actual test
|
// Start the actual test
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
|
||||||
assertThat(initialConfig.getClusters().keySet())
|
assertThat(initialConfig.getClusters().keySet())
|
||||||
|
@ -645,7 +675,8 @@ public class XdsDependencyManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChangeRdsName_FromLds_complexTree() {
|
public void testChangeRdsName_FromLds_complexTree() {
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
// Create the same tree as in testMultipleParentsInCdsTree
|
// Create the same tree as in testMultipleParentsInCdsTree
|
||||||
Cluster rootCluster =
|
Cluster rootCluster =
|
||||||
|
@ -690,7 +721,8 @@ public class XdsDependencyManagerTest {
|
||||||
public void testChangeAggCluster() {
|
public void testChangeAggCluster() {
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
// Setup initial config A -> A1 -> (A11, A12)
|
// Setup initial config A -> A1 -> (A11, A12)
|
||||||
|
@ -743,7 +775,8 @@ public class XdsDependencyManagerTest {
|
||||||
controlPlaneService.setXdsConfig(
|
controlPlaneService.setXdsConfig(
|
||||||
ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
|
ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
|
||||||
Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build()));
|
Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build()));
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
Status status = xdsUpdateCaptor.getValue().getValue()
|
Status status = xdsUpdateCaptor.getValue().getValue()
|
||||||
|
@ -756,7 +789,8 @@ public class XdsDependencyManagerTest {
|
||||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(any());
|
verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
|
@ -788,7 +822,8 @@ public class XdsDependencyManagerTest {
|
||||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(any());
|
verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
|
@ -820,7 +855,8 @@ public class XdsDependencyManagerTest {
|
||||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(any());
|
verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
|
@ -852,7 +888,8 @@ public class XdsDependencyManagerTest {
|
||||||
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(any());
|
verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
|
||||||
|
@ -885,7 +922,8 @@ public class XdsDependencyManagerTest {
|
||||||
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||||
xdsDependencyManager.shutdown();
|
xdsDependencyManager.shutdown();
|
||||||
|
|
||||||
|
|
|
@ -52,20 +52,14 @@ import io.grpc.BindableService;
|
||||||
import io.grpc.Context;
|
import io.grpc.Context;
|
||||||
import io.grpc.Context.CancellationListener;
|
import io.grpc.Context.CancellationListener;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
|
||||||
import io.grpc.internal.FakeClock;
|
|
||||||
import io.grpc.internal.JsonParser;
|
import io.grpc.internal.JsonParser;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.xds.Endpoints.LbEndpoint;
|
import io.grpc.xds.Endpoints.LbEndpoint;
|
||||||
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
||||||
import io.grpc.xds.client.Bootstrapper;
|
import io.grpc.xds.client.Bootstrapper;
|
||||||
import io.grpc.xds.client.CommonBootstrapperTestUtils;
|
|
||||||
import io.grpc.xds.client.Locality;
|
import io.grpc.xds.client.Locality;
|
||||||
import io.grpc.xds.client.XdsClient;
|
|
||||||
import io.grpc.xds.client.XdsClientMetricReporter;
|
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
import io.grpc.xds.client.XdsTransportFactory;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -370,32 +364,6 @@ public class XdsTestUtils {
|
||||||
.setApiListener(clientListenerBuilder.build()).build();
|
.setApiListener(clientListenerBuilder.build()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static XdsClient createXdsClient(
|
|
||||||
List<String> serverUris,
|
|
||||||
XdsTransportFactory xdsTransportFactory,
|
|
||||||
FakeClock fakeClock) {
|
|
||||||
return createXdsClient(
|
|
||||||
CommonBootstrapperTestUtils.buildBootStrap(serverUris),
|
|
||||||
xdsTransportFactory,
|
|
||||||
fakeClock,
|
|
||||||
new XdsClientMetricReporter() {});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link CommonBootstrapperTestUtils#createXdsClient} with gRPC-specific values. */
|
|
||||||
public static XdsClient createXdsClient(
|
|
||||||
Bootstrapper.BootstrapInfo bootstrapInfo,
|
|
||||||
XdsTransportFactory xdsTransportFactory,
|
|
||||||
FakeClock fakeClock,
|
|
||||||
XdsClientMetricReporter xdsClientMetricReporter) {
|
|
||||||
return CommonBootstrapperTestUtils.createXdsClient(
|
|
||||||
bootstrapInfo,
|
|
||||||
xdsTransportFactory,
|
|
||||||
fakeClock,
|
|
||||||
new ExponentialBackoffPolicy.Provider(),
|
|
||||||
MessagePrinter.INSTANCE,
|
|
||||||
xdsClientMetricReporter);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
|
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
|
||||||
* the same list of clusterName:clusterServiceName pair.
|
* the same list of clusterName:clusterServiceName pair.
|
||||||
|
|
Loading…
Reference in New Issue