mirror of https://github.com/grpc/grpc-java.git
xds: Add logical dns cluster support to XdsDepManager
ClusterResolverLb gets the NameResolverRegistry from LoadBalancer.Helper, so a new API was added in NameResover.Args to propagate the same object to the name resolver tree. RetryingNameResolver was exposed to xds. This is expected to be temporary, as the retrying is being removed from ManagedChannelImpl and moved into the resolvers. At that point, DnsNameResolverProvider would wrap DnsNameResolver with a similar API to RetryingNameResolver and xds would no longer be responsible.
This commit is contained in:
parent
f07eb47cac
commit
d2d8ed8efa
|
@ -303,6 +303,7 @@ public abstract class NameResolver {
|
||||||
@Nullable private final Executor executor;
|
@Nullable private final Executor executor;
|
||||||
@Nullable private final String overrideAuthority;
|
@Nullable private final String overrideAuthority;
|
||||||
@Nullable private final MetricRecorder metricRecorder;
|
@Nullable private final MetricRecorder metricRecorder;
|
||||||
|
@Nullable private final NameResolverRegistry nameResolverRegistry;
|
||||||
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
|
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
|
||||||
|
|
||||||
private Args(Builder builder) {
|
private Args(Builder builder) {
|
||||||
|
@ -316,6 +317,7 @@ public abstract class NameResolver {
|
||||||
this.executor = builder.executor;
|
this.executor = builder.executor;
|
||||||
this.overrideAuthority = builder.overrideAuthority;
|
this.overrideAuthority = builder.overrideAuthority;
|
||||||
this.metricRecorder = builder.metricRecorder;
|
this.metricRecorder = builder.metricRecorder;
|
||||||
|
this.nameResolverRegistry = builder.nameResolverRegistry;
|
||||||
this.customArgs = cloneCustomArgs(builder.customArgs);
|
this.customArgs = cloneCustomArgs(builder.customArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,6 +449,18 @@ public abstract class NameResolver {
|
||||||
return metricRecorder;
|
return metricRecorder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link NameResolverRegistry} that the Channel uses to look for {@link
|
||||||
|
* NameResolver}s.
|
||||||
|
*
|
||||||
|
* @since 1.74.0
|
||||||
|
*/
|
||||||
|
public NameResolverRegistry getNameResolverRegistry() {
|
||||||
|
if (nameResolverRegistry == null) {
|
||||||
|
throw new IllegalStateException("NameResolverRegistry is not set in Builder");
|
||||||
|
}
|
||||||
|
return nameResolverRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
@ -461,6 +475,7 @@ public abstract class NameResolver {
|
||||||
.add("executor", executor)
|
.add("executor", executor)
|
||||||
.add("overrideAuthority", overrideAuthority)
|
.add("overrideAuthority", overrideAuthority)
|
||||||
.add("metricRecorder", metricRecorder)
|
.add("metricRecorder", metricRecorder)
|
||||||
|
.add("nameResolverRegistry", nameResolverRegistry)
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,6 +495,7 @@ public abstract class NameResolver {
|
||||||
builder.setOffloadExecutor(executor);
|
builder.setOffloadExecutor(executor);
|
||||||
builder.setOverrideAuthority(overrideAuthority);
|
builder.setOverrideAuthority(overrideAuthority);
|
||||||
builder.setMetricRecorder(metricRecorder);
|
builder.setMetricRecorder(metricRecorder);
|
||||||
|
builder.setNameResolverRegistry(nameResolverRegistry);
|
||||||
builder.customArgs = cloneCustomArgs(customArgs);
|
builder.customArgs = cloneCustomArgs(customArgs);
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
@ -508,6 +524,7 @@ public abstract class NameResolver {
|
||||||
private Executor executor;
|
private Executor executor;
|
||||||
private String overrideAuthority;
|
private String overrideAuthority;
|
||||||
private MetricRecorder metricRecorder;
|
private MetricRecorder metricRecorder;
|
||||||
|
private NameResolverRegistry nameResolverRegistry;
|
||||||
private IdentityHashMap<Key<?>, Object> customArgs;
|
private IdentityHashMap<Key<?>, Object> customArgs;
|
||||||
|
|
||||||
Builder() {
|
Builder() {
|
||||||
|
@ -614,6 +631,16 @@ public abstract class NameResolver {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See {@link Args#getNameResolverRegistry}. This is an optional field.
|
||||||
|
*
|
||||||
|
* @since 1.74.0
|
||||||
|
*/
|
||||||
|
public Builder setNameResolverRegistry(NameResolverRegistry registry) {
|
||||||
|
this.nameResolverRegistry = registry;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds an {@link Args}.
|
* Builds an {@link Args}.
|
||||||
*
|
*
|
||||||
|
|
|
@ -597,7 +597,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
.setChannelLogger(channelLogger)
|
.setChannelLogger(channelLogger)
|
||||||
.setOffloadExecutor(this.offloadExecutorHolder)
|
.setOffloadExecutor(this.offloadExecutorHolder)
|
||||||
.setOverrideAuthority(this.authorityOverride)
|
.setOverrideAuthority(this.authorityOverride)
|
||||||
.setMetricRecorder(this.metricRecorder);
|
.setMetricRecorder(this.metricRecorder)
|
||||||
|
.setNameResolverRegistry(builder.nameResolverRegistry);
|
||||||
builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
|
builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
|
||||||
this.nameResolverArgs = nameResolverArgsBuilder.build();
|
this.nameResolverArgs = nameResolverArgsBuilder.build();
|
||||||
this.nameResolver = getNameResolver(
|
this.nameResolver = getNameResolver(
|
||||||
|
@ -685,11 +686,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
// We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
|
// We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
|
||||||
// TODO: After a transition period, all NameResolver implementations that need retry should use
|
// TODO: After a transition period, all NameResolver implementations that need retry should use
|
||||||
// RetryingNameResolver directly and this step can be removed.
|
// RetryingNameResolver directly and this step can be removed.
|
||||||
NameResolver usedNameResolver = new RetryingNameResolver(resolver,
|
NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
|
||||||
new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
|
|
||||||
nameResolverArgs.getScheduledExecutorService(),
|
|
||||||
nameResolverArgs.getSynchronizationContext()),
|
|
||||||
nameResolverArgs.getSynchronizationContext());
|
|
||||||
|
|
||||||
if (overrideAuthority == null) {
|
if (overrideAuthority == null) {
|
||||||
return usedNameResolver;
|
return usedNameResolver;
|
||||||
|
|
|
@ -27,13 +27,22 @@ import io.grpc.SynchronizationContext;
|
||||||
*
|
*
|
||||||
* <p>The {@link NameResolver} used with this
|
* <p>The {@link NameResolver} used with this
|
||||||
*/
|
*/
|
||||||
final class RetryingNameResolver extends ForwardingNameResolver {
|
public final class RetryingNameResolver extends ForwardingNameResolver {
|
||||||
|
public static NameResolver wrap(NameResolver retriedNameResolver, Args args) {
|
||||||
|
// For migration, this might become conditional
|
||||||
|
return new RetryingNameResolver(
|
||||||
|
retriedNameResolver,
|
||||||
|
new BackoffPolicyRetryScheduler(
|
||||||
|
new ExponentialBackoffPolicy.Provider(),
|
||||||
|
args.getScheduledExecutorService(),
|
||||||
|
args.getSynchronizationContext()),
|
||||||
|
args.getSynchronizationContext());
|
||||||
|
}
|
||||||
|
|
||||||
private final NameResolver retriedNameResolver;
|
private final NameResolver retriedNameResolver;
|
||||||
private final RetryScheduler retryScheduler;
|
private final RetryScheduler retryScheduler;
|
||||||
private final SynchronizationContext syncContext;
|
private final SynchronizationContext syncContext;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link RetryingNameResolver}.
|
* Creates a new {@link RetryingNameResolver}.
|
||||||
*
|
*
|
||||||
|
|
|
@ -207,14 +207,7 @@ public class DnsNameResolverTest {
|
||||||
|
|
||||||
// In practice the DNS name resolver provider always wraps the resolver in a
|
// In practice the DNS name resolver provider always wraps the resolver in a
|
||||||
// RetryingNameResolver which adds retry capabilities to it. We use the same setup here.
|
// RetryingNameResolver which adds retry capabilities to it. We use the same setup here.
|
||||||
return new RetryingNameResolver(
|
return (RetryingNameResolver) RetryingNameResolver.wrap(dnsResolver, args);
|
||||||
dnsResolver,
|
|
||||||
new BackoffPolicyRetryScheduler(
|
|
||||||
new ExponentialBackoffPolicy.Provider(),
|
|
||||||
fakeExecutor.getScheduledExecutorService(),
|
|
||||||
syncContext
|
|
||||||
),
|
|
||||||
syncContext);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
|
@ -23,18 +23,27 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
|
import io.grpc.NameResolverProvider;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
|
import io.grpc.internal.RetryingNameResolver;
|
||||||
|
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||||
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
|
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
|
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
||||||
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
||||||
|
import io.grpc.xds.client.Locality;
|
||||||
import io.grpc.xds.client.XdsClient;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -44,7 +53,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -55,7 +63,7 @@ import javax.annotation.Nullable;
|
||||||
*/
|
*/
|
||||||
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
||||||
private enum TrackedWatcherTypeEnum {
|
private enum TrackedWatcherTypeEnum {
|
||||||
LDS, RDS, CDS, EDS
|
LDS, RDS, CDS, EDS, DNS
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
|
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
|
||||||
|
@ -66,12 +74,19 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
|
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
|
||||||
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
|
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
|
||||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
|
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
|
||||||
|
private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
|
||||||
|
new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
|
||||||
|
|
||||||
|
// DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
|
||||||
|
// to an empty locality.
|
||||||
|
private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
|
||||||
|
|
||||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
||||||
private final String listenerName;
|
private final String listenerName;
|
||||||
private final XdsClient xdsClient;
|
private final XdsClient xdsClient;
|
||||||
private final SynchronizationContext syncContext;
|
private final SynchronizationContext syncContext;
|
||||||
private final String dataPlaneAuthority;
|
private final String dataPlaneAuthority;
|
||||||
|
private final NameResolver.Args nameResolverArgs;
|
||||||
private XdsConfigWatcher xdsConfigWatcher;
|
private XdsConfigWatcher xdsConfigWatcher;
|
||||||
|
|
||||||
private StatusOr<XdsConfig> lastUpdate = null;
|
private StatusOr<XdsConfig> lastUpdate = null;
|
||||||
|
@ -79,16 +94,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
new EnumMap<>(TrackedWatcherTypeEnum.class);
|
new EnumMap<>(TrackedWatcherTypeEnum.class);
|
||||||
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
||||||
|
|
||||||
XdsDependencyManager(XdsClient xdsClient,
|
XdsDependencyManager(
|
||||||
SynchronizationContext syncContext, String dataPlaneAuthority,
|
XdsClient xdsClient,
|
||||||
String listenerName, NameResolver.Args nameResolverArgs,
|
SynchronizationContext syncContext,
|
||||||
ScheduledExecutorService scheduler) {
|
String dataPlaneAuthority,
|
||||||
|
String listenerName,
|
||||||
|
NameResolver.Args nameResolverArgs) {
|
||||||
this.listenerName = checkNotNull(listenerName, "listenerName");
|
this.listenerName = checkNotNull(listenerName, "listenerName");
|
||||||
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
|
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
|
||||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||||
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
|
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
|
||||||
checkNotNull(nameResolverArgs, "nameResolverArgs");
|
this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
|
||||||
checkNotNull(scheduler, "scheduler");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String toContextStr(String typeName, String resourceName) {
|
public static String toContextStr(String typeName, String resourceName) {
|
||||||
|
@ -120,6 +136,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
return subscription;
|
return subscription;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For all logical dns clusters refresh their results.
|
||||||
|
*/
|
||||||
|
public void requestReresolution() {
|
||||||
|
syncContext.execute(() -> {
|
||||||
|
for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
|
||||||
|
DnsWatcher dnsWatcher = (DnsWatcher) watcher;
|
||||||
|
dnsWatcher.refresh();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private <T extends ResourceUpdate> void addWatcher(
|
private <T extends ResourceUpdate> void addWatcher(
|
||||||
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
|
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
|
||||||
syncContext.throwIfNotInThisSynchronizationContext();
|
syncContext.throwIfNotInThisSynchronizationContext();
|
||||||
|
@ -335,9 +363,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case LOGICAL_DNS:
|
case LOGICAL_DNS:
|
||||||
// TODO get the resolved endpoint configuration
|
TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(
|
tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
|
||||||
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
|
child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
||||||
|
@ -352,6 +380,24 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
|
||||||
|
StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
|
||||||
|
if (!dnsData.hasValue()) {
|
||||||
|
return StatusOr.fromStatus(dnsData.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Endpoints.LbEndpoint> endpoints = new ArrayList<>();
|
||||||
|
for (EquivalentAddressGroup eag : dnsData.getValue()) {
|
||||||
|
endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
|
||||||
|
}
|
||||||
|
LocalityLbEndpoints lbEndpoints =
|
||||||
|
LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
|
||||||
|
return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
|
||||||
|
"fakeEds_logicalDns",
|
||||||
|
Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
|
||||||
|
new ArrayList<>()));
|
||||||
|
}
|
||||||
|
|
||||||
private void addRdsWatcher(String resourceName) {
|
private void addRdsWatcher(String resourceName) {
|
||||||
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
|
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
|
||||||
return;
|
return;
|
||||||
|
@ -376,6 +422,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
|
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addDnsWatcher(String dnsHostName) {
|
||||||
|
syncContext.throwIfNotInThisSynchronizationContext();
|
||||||
|
if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
|
||||||
|
getWatchers(DNS_TYPE).put(dnsHostName, watcher);
|
||||||
|
watcher.start();
|
||||||
|
}
|
||||||
|
|
||||||
private void updateRoutes(List<VirtualHost> virtualHosts) {
|
private void updateRoutes(List<VirtualHost> virtualHosts) {
|
||||||
VirtualHost virtualHost =
|
VirtualHost virtualHost =
|
||||||
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
|
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
|
||||||
|
@ -411,6 +468,33 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
return clusters;
|
return clusters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static NameResolver createNameResolver(
|
||||||
|
String dnsHostName,
|
||||||
|
NameResolver.Args nameResolverArgs) {
|
||||||
|
URI uri;
|
||||||
|
try {
|
||||||
|
uri = new URI("dns", "", "/" + dnsHostName, null);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
return new FailingNameResolver(
|
||||||
|
Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
|
||||||
|
.withCause(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
NameResolverProvider provider =
|
||||||
|
nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
|
||||||
|
if (provider == null) {
|
||||||
|
return new FailingNameResolver(
|
||||||
|
Status.INTERNAL.withDescription("Could not find dns name resolver"));
|
||||||
|
}
|
||||||
|
|
||||||
|
NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
|
||||||
|
if (bareResolver == null) {
|
||||||
|
return new FailingNameResolver(
|
||||||
|
Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
|
||||||
|
}
|
||||||
|
return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TypeWatchers<T> {
|
private static class TypeWatchers<T> {
|
||||||
// Key is resource name
|
// Key is resource name
|
||||||
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
|
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
|
||||||
|
@ -722,7 +806,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
addEdsWatcher(getEdsServiceName());
|
addEdsWatcher(getEdsServiceName());
|
||||||
break;
|
break;
|
||||||
case LOGICAL_DNS:
|
case LOGICAL_DNS:
|
||||||
// no eds needed
|
addDnsWatcher(update.dnsHostName());
|
||||||
break;
|
break;
|
||||||
case AGGREGATE:
|
case AGGREGATE:
|
||||||
update.prioritizedClusterNames()
|
update.prioritizedClusterNames()
|
||||||
|
@ -751,4 +835,101 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
@Override
|
@Override
|
||||||
public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
|
public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
|
||||||
|
private final NameResolver resolver;
|
||||||
|
@Nullable
|
||||||
|
private StatusOr<List<EquivalentAddressGroup>> data;
|
||||||
|
private boolean cancelled;
|
||||||
|
|
||||||
|
public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
|
||||||
|
this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
resolver.start(new NameResolverListener());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refresh() {
|
||||||
|
if (cancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resolver.refresh();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public StatusOr<List<EquivalentAddressGroup>> getData() {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (cancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cancelled = true;
|
||||||
|
resolver.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class NameResolverListener extends NameResolver.Listener2 {
|
||||||
|
@Override
|
||||||
|
public void onResult(final NameResolver.ResolutionResult resolutionResult) {
|
||||||
|
syncContext.execute(() -> onResult2(resolutionResult));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
|
||||||
|
if (cancelled) {
|
||||||
|
return Status.OK;
|
||||||
|
}
|
||||||
|
data = resolutionResult.getAddressesOrError();
|
||||||
|
maybePublishConfig();
|
||||||
|
return resolutionResult.getAddressesOrError().getStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(final Status error) {
|
||||||
|
syncContext.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (cancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// DnsNameResolver cannot distinguish between address-not-found and transient errors.
|
||||||
|
// Assume it is a transient error.
|
||||||
|
// TODO: Once the resolution note API is available, don't throw away the error if
|
||||||
|
// hasDataValue(); pass it as the note instead
|
||||||
|
if (!hasDataValue()) {
|
||||||
|
data = StatusOr.fromStatus(error);
|
||||||
|
maybePublishConfig();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class FailingNameResolver extends NameResolver {
|
||||||
|
private final Status status;
|
||||||
|
|
||||||
|
public FailingNameResolver(Status status) {
|
||||||
|
checkNotNull(status, "status");
|
||||||
|
checkArgument(!status.isOk(), "Status must not be OK");
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(Listener2 listener) {
|
||||||
|
listener.onError(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getServiceAuthority() {
|
||||||
|
return "bug-if-you-see-this-authority";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -655,7 +655,7 @@ final class XdsNameResolver extends NameResolver {
|
||||||
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
|
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
|
||||||
xdsDependencyManager =
|
xdsDependencyManager =
|
||||||
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
|
new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
|
||||||
nameResolverArgs, scheduler);
|
nameResolverArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
|
|
|
@ -66,6 +66,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
|
||||||
import io.grpc.LoadBalancerProvider;
|
import io.grpc.LoadBalancerProvider;
|
||||||
import io.grpc.LoadBalancerRegistry;
|
import io.grpc.LoadBalancerRegistry;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
|
import io.grpc.NameResolverRegistry;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.Status.Code;
|
import io.grpc.Status.Code;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
|
@ -176,6 +177,7 @@ public class CdsLoadBalancer2Test {
|
||||||
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
|
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
|
||||||
.setChannelLogger(mock(ChannelLogger.class))
|
.setChannelLogger(mock(ChannelLogger.class))
|
||||||
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
|
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
|
||||||
|
.setNameResolverRegistry(new NameResolverRegistry())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
xdsDepManager = new XdsDependencyManager(
|
xdsDepManager = new XdsDependencyManager(
|
||||||
|
@ -183,8 +185,7 @@ public class CdsLoadBalancer2Test {
|
||||||
syncContext,
|
syncContext,
|
||||||
SERVER_NAME,
|
SERVER_NAME,
|
||||||
SERVER_NAME,
|
SERVER_NAME,
|
||||||
nameResolverArgs,
|
nameResolverArgs);
|
||||||
fakeClock.getScheduledExecutorService());
|
|
||||||
|
|
||||||
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(
|
||||||
SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route")));
|
SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route")));
|
||||||
|
|
|
@ -42,12 +42,19 @@ import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||||
|
import io.envoyproxy.envoy.config.core.v3.Address;
|
||||||
|
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
|
||||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||||
|
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
|
||||||
|
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
|
||||||
|
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
|
||||||
import io.envoyproxy.envoy.config.listener.v3.Listener;
|
import io.envoyproxy.envoy.config.listener.v3.Listener;
|
||||||
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
||||||
import io.grpc.BindableService;
|
import io.grpc.BindableService;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
|
import io.grpc.NameResolverRegistry;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusOr;
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.StatusOrMatcher;
|
import io.grpc.StatusOrMatcher;
|
||||||
|
@ -56,10 +63,12 @@ import io.grpc.inprocess.InProcessChannelBuilder;
|
||||||
import io.grpc.inprocess.InProcessServerBuilder;
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.GrpcUtil;
|
import io.grpc.internal.GrpcUtil;
|
||||||
|
import io.grpc.internal.testing.FakeNameResolverProvider;
|
||||||
import io.grpc.testing.GrpcCleanupRule;
|
import io.grpc.testing.GrpcCleanupRule;
|
||||||
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||||
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||||
|
import io.grpc.xds.client.Locality;
|
||||||
import io.grpc.xds.client.XdsClient;
|
import io.grpc.xds.client.XdsClient;
|
||||||
import io.grpc.xds.client.XdsClient.ResourceMetadata;
|
import io.grpc.xds.client.XdsClient.ResourceMetadata;
|
||||||
import io.grpc.xds.client.XdsResourceType;
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
|
@ -74,7 +83,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
@ -121,6 +129,7 @@ public class XdsDependencyManagerTest {
|
||||||
private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService();
|
private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService();
|
||||||
private final BindableService lrsService =
|
private final BindableService lrsService =
|
||||||
XdsTestUtils.createLrsService(lrsEnded, loadReportCalls);
|
XdsTestUtils.createLrsService(lrsEnded, loadReportCalls);
|
||||||
|
private final NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
||||||
|
@ -138,11 +147,11 @@ public class XdsDependencyManagerTest {
|
||||||
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
|
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
|
||||||
.setChannelLogger(mock(ChannelLogger.class))
|
.setChannelLogger(mock(ChannelLogger.class))
|
||||||
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
|
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
|
||||||
|
.setNameResolverRegistry(nameResolverRegistry)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
|
|
||||||
private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
|
private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
|
||||||
xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler);
|
xdsClient, syncContext, serverName, serverName, nameResolverArgs);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
@ -369,7 +378,7 @@ public class XdsDependencyManagerTest {
|
||||||
public void testMissingLds() {
|
public void testMissingLds() {
|
||||||
String ldsName = "badLdsName";
|
String ldsName = "badLdsName";
|
||||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
||||||
serverName, ldsName, nameResolverArgs, scheduler);
|
serverName, ldsName, nameResolverArgs);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager.start(xdsConfigWatcher);
|
||||||
|
|
||||||
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
fakeClock.forwardTime(16, TimeUnit.SECONDS);
|
||||||
|
@ -434,7 +443,7 @@ public class XdsDependencyManagerTest {
|
||||||
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
|
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
|
||||||
|
|
||||||
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
|
||||||
serverName, ldsResourceName, nameResolverArgs, scheduler);
|
serverName, ldsResourceName, nameResolverArgs);
|
||||||
xdsDependencyManager.start(xdsConfigWatcher);
|
xdsDependencyManager.start(xdsConfigWatcher);
|
||||||
|
|
||||||
verify(xdsConfigWatcher).onUpdate(
|
verify(xdsConfigWatcher).onUpdate(
|
||||||
|
@ -738,6 +747,75 @@ public class XdsDependencyManagerTest {
|
||||||
inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher));
|
inOrder.verify(xdsConfigWatcher).onUpdate(argThat(nameMatcher));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLogicalDns_success() {
|
||||||
|
FakeSocketAddress fakeAddress = new FakeSocketAddress();
|
||||||
|
nameResolverRegistry.register(new FakeNameResolverProvider(
|
||||||
|
"dns:///dns.example.com:1111", fakeAddress));
|
||||||
|
Cluster cluster = Cluster.newBuilder()
|
||||||
|
.setName(CLUSTER_NAME)
|
||||||
|
.setType(Cluster.DiscoveryType.LOGICAL_DNS)
|
||||||
|
.setLoadAssignment(ClusterLoadAssignment.newBuilder()
|
||||||
|
.addEndpoints(LocalityLbEndpoints.newBuilder()
|
||||||
|
.addLbEndpoints(LbEndpoint.newBuilder()
|
||||||
|
.setEndpoint(Endpoint.newBuilder()
|
||||||
|
.setAddress(Address.newBuilder()
|
||||||
|
.setSocketAddress(SocketAddress.newBuilder()
|
||||||
|
.setAddress("dns.example.com")
|
||||||
|
.setPortValue(1111)))))))
|
||||||
|
.build();
|
||||||
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
|
||||||
|
ImmutableMap.of(CLUSTER_NAME, cluster));
|
||||||
|
xdsDependencyManager.start(xdsConfigWatcher);
|
||||||
|
|
||||||
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
|
XdsClusterConfig.ClusterChild clusterChild =
|
||||||
|
config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
|
||||||
|
assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
|
||||||
|
StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
|
||||||
|
assertThat(endpointOr.getStatus()).isEqualTo(Status.OK);
|
||||||
|
assertThat(endpointOr.getValue()).isEqualTo(new EdsUpdate(
|
||||||
|
"fakeEds_logicalDns",
|
||||||
|
ImmutableMap.of(
|
||||||
|
Locality.create("", "", ""),
|
||||||
|
Endpoints.LocalityLbEndpoints.create(
|
||||||
|
Arrays.asList(Endpoints.LbEndpoint.create(
|
||||||
|
new EquivalentAddressGroup(fakeAddress),
|
||||||
|
1, true, "dns.example.com:1111", ImmutableMap.of())),
|
||||||
|
1, 0, ImmutableMap.of())),
|
||||||
|
Arrays.asList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLogicalDns_noDnsNr() {
|
||||||
|
Cluster cluster = Cluster.newBuilder()
|
||||||
|
.setName(CLUSTER_NAME)
|
||||||
|
.setType(Cluster.DiscoveryType.LOGICAL_DNS)
|
||||||
|
.setLoadAssignment(ClusterLoadAssignment.newBuilder()
|
||||||
|
.addEndpoints(LocalityLbEndpoints.newBuilder()
|
||||||
|
.addLbEndpoints(LbEndpoint.newBuilder()
|
||||||
|
.setEndpoint(Endpoint.newBuilder()
|
||||||
|
.setAddress(Address.newBuilder()
|
||||||
|
.setSocketAddress(SocketAddress.newBuilder()
|
||||||
|
.setAddress("dns.example.com")
|
||||||
|
.setPortValue(1111)))))))
|
||||||
|
.build();
|
||||||
|
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
|
||||||
|
ImmutableMap.of(CLUSTER_NAME, cluster));
|
||||||
|
xdsDependencyManager.start(xdsConfigWatcher);
|
||||||
|
|
||||||
|
verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
|
||||||
|
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
|
||||||
|
XdsClusterConfig.ClusterChild clusterChild =
|
||||||
|
config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
|
||||||
|
assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
|
||||||
|
StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
|
||||||
|
assertThat(endpointOr.getStatus().getCode()).isEqualTo(Status.Code.INTERNAL);
|
||||||
|
assertThat(endpointOr.getStatus().getDescription())
|
||||||
|
.isEqualTo("Could not find dns name resolver");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCdsError() throws IOException {
|
public void testCdsError() throws IOException {
|
||||||
controlPlaneService.setXdsConfig(
|
controlPlaneService.setXdsConfig(
|
||||||
|
@ -943,4 +1021,6 @@ public class XdsDependencyManagerTest {
|
||||||
&& xdsConfig.getClusters().keySet().containsAll(expectedNames);
|
&& xdsConfig.getClusters().keySet().containsAll(expectedNames);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class FakeSocketAddress extends java.net.SocketAddress {}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue