mirror of https://github.com/grpc/grpc-java.git
xds: Support tracking non-xds resources in XdsDepManager
This will be used for logical dns clusters as part of gRFC A74. Swapping to EnumMap wasn't really necessary, but was easy given the new type system. I can't say I'm particularly happy with the name of the new TrackedWatcher type, but XdsConfigWatcher prevented using "Watcher" because it won't implement the new interface, and ResourceWatcher already exists in XdsClient. So we have TrackedWatcher, WatcherTracer, TypeWatchers, and TrackedWatcherType.
This commit is contained in:
parent
8974a306af
commit
d5b4fb51c2
|
@ -36,6 +36,7 @@ import io.grpc.xds.client.XdsClient;
|
|||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
|
@ -53,8 +54,19 @@ import javax.annotation.Nullable;
|
|||
* applies to a single data plane authority.
|
||||
*/
|
||||
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
||||
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
|
||||
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
|
||||
private enum TrackedWatcherTypeEnum {
|
||||
LDS, RDS, CDS, EDS
|
||||
}
|
||||
|
||||
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
|
||||
private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
|
||||
private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
|
||||
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
|
||||
|
||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
||||
private final String listenerName;
|
||||
private final XdsClient xdsClient;
|
||||
|
@ -63,7 +75,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
private XdsConfigWatcher xdsConfigWatcher;
|
||||
|
||||
private StatusOr<XdsConfig> lastUpdate = null;
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
|
||||
new EnumMap<>(TrackedWatcherTypeEnum.class);
|
||||
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
||||
|
||||
XdsDependencyManager(XdsClient xdsClient,
|
||||
|
@ -86,7 +99,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
|
||||
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
||||
// start the ball rolling
|
||||
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
||||
syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +109,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
||||
|
||||
syncContext.execute(() -> {
|
||||
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||
if (getWatchers(LDS_TYPE).isEmpty()) {
|
||||
subscription.closed = true;
|
||||
return; // shutdown() called
|
||||
}
|
||||
|
@ -107,33 +120,28 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return subscription;
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
|
||||
private <T extends ResourceUpdate> void addWatcher(
|
||||
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
XdsResourceType<T> type = watcher.type;
|
||||
String resourceName = watcher.resourceName;
|
||||
|
||||
getWatchers(type).put(resourceName, watcher);
|
||||
getWatchers(watcherType).put(resourceName, watcher);
|
||||
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
syncContext.execute(() -> {
|
||||
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
|
||||
shutdownWatchersForType(watchers);
|
||||
for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
|
||||
watcher.close();
|
||||
}
|
||||
}
|
||||
resourceWatchers.clear();
|
||||
subscriptions.clear();
|
||||
});
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
|
||||
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
|
||||
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
|
||||
watcherEntry.getValue());
|
||||
watcherEntry.getValue().cancelled = true;
|
||||
}
|
||||
}
|
||||
|
||||
private void releaseSubscription(ClusterSubscription subscription) {
|
||||
checkNotNull(subscription, "subscription");
|
||||
syncContext.execute(() -> {
|
||||
|
@ -154,12 +162,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
*/
|
||||
private void maybePublishConfig() {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||
if (getWatchers(LDS_TYPE).isEmpty()) {
|
||||
return; // shutdown() called
|
||||
}
|
||||
boolean waitingOnResource = resourceWatchers.values().stream()
|
||||
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
|
||||
.anyMatch(XdsWatcherBase::missingResult);
|
||||
.anyMatch(TrackedWatcher::missingResult);
|
||||
if (waitingOnResource) {
|
||||
return;
|
||||
}
|
||||
|
@ -194,8 +202,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
// Iterate watchers and build the XdsConfig
|
||||
|
||||
XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
|
||||
= tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
|
||||
TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
|
||||
= tracer.getWatcher(LDS_TYPE, listenerName);
|
||||
if (ldsWatcher == null) {
|
||||
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
||||
"Bug: No listener watcher found for " + listenerName));
|
||||
|
@ -241,14 +249,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return StatusOr.fromValue(builder.build());
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
|
||||
XdsResourceType<T> resourceType) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
|
||||
private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
|
||||
if (typeWatchers == null) {
|
||||
typeWatchers = new TypeWatchers<T>(resourceType);
|
||||
resourceWatchers.put(resourceType, typeWatchers);
|
||||
typeWatchers = new TypeWatchers<T>(watcherType);
|
||||
resourceWatchers.put(watcherType.typeEnum, typeWatchers);
|
||||
}
|
||||
assert typeWatchers.resourceType == resourceType;
|
||||
assert typeWatchers.watcherType == watcherType;
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
|
||||
return tTypeWatchers.watchers;
|
||||
|
@ -275,7 +282,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return;
|
||||
}
|
||||
|
||||
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
|
||||
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
|
||||
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
|
||||
if (!cdsWatcherDataOr.hasValue()) {
|
||||
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
|
||||
|
@ -318,8 +325,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
|
||||
break;
|
||||
case EDS:
|
||||
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
|
||||
tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
|
||||
TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
|
||||
tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
|
||||
if (edsWatcher != null) {
|
||||
child = new EndpointConfig(edsWatcher.getData());
|
||||
} else {
|
||||
|
@ -346,27 +353,27 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
}
|
||||
|
||||
private void addRdsWatcher(String resourceName) {
|
||||
if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
|
||||
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(new RdsWatcher(resourceName));
|
||||
addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
|
||||
}
|
||||
|
||||
private void addEdsWatcher(String edsServiceName) {
|
||||
if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
|
||||
if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(new EdsWatcher(edsServiceName));
|
||||
addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
|
||||
}
|
||||
|
||||
private void addClusterWatcher(String clusterName) {
|
||||
if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
|
||||
if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(new CdsWatcher(clusterName));
|
||||
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
|
||||
}
|
||||
|
||||
private void updateRoutes(List<VirtualHost> virtualHosts) {
|
||||
|
@ -404,13 +411,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return clusters;
|
||||
}
|
||||
|
||||
private static class TypeWatchers<T extends ResourceUpdate> {
|
||||
private static class TypeWatchers<T> {
|
||||
// Key is resource name
|
||||
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
|
||||
final XdsResourceType<T> resourceType;
|
||||
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
|
||||
final TrackedWatcherType<T> watcherType;
|
||||
|
||||
TypeWatchers(XdsResourceType<T> resourceType) {
|
||||
this.resourceType = resourceType;
|
||||
TypeWatchers(TrackedWatcherType<T> watcherType) {
|
||||
this.watcherType = checkNotNull(watcherType, "watcherType");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -442,38 +449,36 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
/** State for tracing garbage collector. */
|
||||
private static final class WatcherTracer {
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
|
||||
|
||||
public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
|
||||
public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
|
||||
this.resourceWatchers = resourceWatchers;
|
||||
|
||||
this.usedWatchers = new HashMap<>();
|
||||
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
|
||||
usedWatchers.put(type, newTypeWatchers(type));
|
||||
this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
|
||||
for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
|
||||
usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
|
||||
XdsResourceType<T> type) {
|
||||
private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
|
||||
return new TypeWatchers<T>(type);
|
||||
}
|
||||
|
||||
public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
|
||||
XdsResourceType<T> resourceType, String name) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
|
||||
public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
|
||||
if (typeWatchers == null) {
|
||||
return null;
|
||||
}
|
||||
assert typeWatchers.resourceType == resourceType;
|
||||
assert typeWatchers.watcherType == watcherType;
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
|
||||
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
|
||||
TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
|
||||
if (watcher == null) {
|
||||
return null;
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
|
||||
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
|
||||
usedTypeWatchers.watchers.put(name, watcher);
|
||||
return watcher;
|
||||
}
|
||||
|
@ -481,9 +486,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
/** Shut down unused watchers. */
|
||||
public void closeUnusedWatchers() {
|
||||
boolean changed = false; // Help out the GC by preferring old objects
|
||||
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
|
||||
TypeWatchers<?> orig = resourceWatchers.get(type);
|
||||
TypeWatchers<?> used = usedWatchers.get(type);
|
||||
for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
|
||||
TypeWatchers<?> orig = resourceWatchers.get(key);
|
||||
TypeWatchers<?> used = usedWatchers.get(key);
|
||||
for (String name : orig.watchers.keySet()) {
|
||||
if (used.watchers.containsKey(name)) {
|
||||
continue;
|
||||
|
@ -498,8 +503,33 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnusedTypeParameter")
|
||||
private static final class TrackedWatcherType<T> {
|
||||
public final TrackedWatcherTypeEnum typeEnum;
|
||||
|
||||
public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
|
||||
this.typeEnum = checkNotNull(typeEnum, "typeEnum");
|
||||
}
|
||||
}
|
||||
|
||||
private interface TrackedWatcher<T> {
|
||||
@Nullable
|
||||
StatusOr<T> getData();
|
||||
|
||||
default boolean missingResult() {
|
||||
return getData() == null;
|
||||
}
|
||||
|
||||
default boolean hasDataValue() {
|
||||
StatusOr<T> data = getData();
|
||||
return data != null && data.hasValue();
|
||||
}
|
||||
|
||||
void close();
|
||||
}
|
||||
|
||||
private abstract class XdsWatcherBase<T extends ResourceUpdate>
|
||||
implements ResourceWatcher<T> {
|
||||
implements ResourceWatcher<T>, TrackedWatcher<T> {
|
||||
private final XdsResourceType<T> type;
|
||||
private final String resourceName;
|
||||
boolean cancelled;
|
||||
|
@ -554,24 +584,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
protected abstract void subscribeToChildren(T update);
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancelled = true;
|
||||
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
|
||||
}
|
||||
|
||||
boolean missingResult() {
|
||||
return data == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
StatusOr<T> getData() {
|
||||
public StatusOr<T> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
boolean hasDataValue() {
|
||||
return data != null && data.hasValue();
|
||||
}
|
||||
|
||||
public String toContextString() {
|
||||
return toContextStr(type.typeName(), resourceName);
|
||||
}
|
||||
|
@ -622,7 +646,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
if (rdsName == null) {
|
||||
return null;
|
||||
}
|
||||
return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
|
||||
return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
|
||||
}
|
||||
|
||||
public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
|
||||
|
@ -688,7 +712,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
|
||||
CdsWatcher(String resourceName) {
|
||||
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
||||
super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -721,7 +745,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
|
||||
private EdsWatcher(String resourceName) {
|
||||
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
||||
super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue