mirror of https://github.com/grpc/grpc-java.git
Revert "xds: Support tracking non-xds resources in XdsDepManager"
This reverts commitd5b4fb51c2
as part of reverting297ab05efe
.
This commit is contained in:
parent
98e4252e34
commit
69b8cf5f26
|
@ -36,7 +36,6 @@ import io.grpc.xds.client.XdsClient;
|
|||
import io.grpc.xds.client.XdsClient.ResourceWatcher;
|
||||
import io.grpc.xds.client.XdsResourceType;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
|
@ -54,19 +53,8 @@ import javax.annotation.Nullable;
|
|||
* applies to a single data plane authority.
|
||||
*/
|
||||
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
|
||||
private enum TrackedWatcherTypeEnum {
|
||||
LDS, RDS, CDS, EDS
|
||||
}
|
||||
|
||||
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
|
||||
private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
|
||||
private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
|
||||
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
|
||||
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
|
||||
|
||||
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
|
||||
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
|
||||
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
|
||||
private final String listenerName;
|
||||
private final XdsClient xdsClient;
|
||||
|
@ -75,8 +63,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
private XdsConfigWatcher xdsConfigWatcher;
|
||||
|
||||
private StatusOr<XdsConfig> lastUpdate = null;
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
|
||||
new EnumMap<>(TrackedWatcherTypeEnum.class);
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
|
||||
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
|
||||
|
||||
XdsDependencyManager(XdsClient xdsClient,
|
||||
|
@ -99,7 +86,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
|
||||
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
|
||||
// start the ball rolling
|
||||
syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
|
||||
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,7 +96,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
||||
|
||||
syncContext.execute(() -> {
|
||||
if (getWatchers(LDS_TYPE).isEmpty()) {
|
||||
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||
subscription.closed = true;
|
||||
return; // shutdown() called
|
||||
}
|
||||
|
@ -120,28 +107,33 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return subscription;
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> void addWatcher(
|
||||
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
|
||||
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
XdsResourceType<T> type = watcher.type;
|
||||
String resourceName = watcher.resourceName;
|
||||
|
||||
getWatchers(watcherType).put(resourceName, watcher);
|
||||
getWatchers(type).put(resourceName, watcher);
|
||||
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
syncContext.execute(() -> {
|
||||
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
|
||||
for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
|
||||
watcher.close();
|
||||
}
|
||||
shutdownWatchersForType(watchers);
|
||||
}
|
||||
resourceWatchers.clear();
|
||||
subscriptions.clear();
|
||||
});
|
||||
}
|
||||
|
||||
private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
|
||||
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
|
||||
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
|
||||
watcherEntry.getValue());
|
||||
watcherEntry.getValue().cancelled = true;
|
||||
}
|
||||
}
|
||||
|
||||
private void releaseSubscription(ClusterSubscription subscription) {
|
||||
checkNotNull(subscription, "subscription");
|
||||
syncContext.execute(() -> {
|
||||
|
@ -162,12 +154,12 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
*/
|
||||
private void maybePublishConfig() {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
if (getWatchers(LDS_TYPE).isEmpty()) {
|
||||
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||
return; // shutdown() called
|
||||
}
|
||||
boolean waitingOnResource = resourceWatchers.values().stream()
|
||||
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
|
||||
.anyMatch(TrackedWatcher::missingResult);
|
||||
.anyMatch(XdsWatcherBase::missingResult);
|
||||
if (waitingOnResource) {
|
||||
return;
|
||||
}
|
||||
|
@ -202,8 +194,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
// Iterate watchers and build the XdsConfig
|
||||
|
||||
TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
|
||||
= tracer.getWatcher(LDS_TYPE, listenerName);
|
||||
XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
|
||||
= tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
|
||||
if (ldsWatcher == null) {
|
||||
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
||||
"Bug: No listener watcher found for " + listenerName));
|
||||
|
@ -249,13 +241,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return StatusOr.fromValue(builder.build());
|
||||
}
|
||||
|
||||
private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
|
||||
private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
|
||||
XdsResourceType<T> resourceType) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
|
||||
if (typeWatchers == null) {
|
||||
typeWatchers = new TypeWatchers<T>(watcherType);
|
||||
resourceWatchers.put(watcherType.typeEnum, typeWatchers);
|
||||
typeWatchers = new TypeWatchers<T>(resourceType);
|
||||
resourceWatchers.put(resourceType, typeWatchers);
|
||||
}
|
||||
assert typeWatchers.watcherType == watcherType;
|
||||
assert typeWatchers.resourceType == resourceType;
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
|
||||
return tTypeWatchers.watchers;
|
||||
|
@ -282,7 +275,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return;
|
||||
}
|
||||
|
||||
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
|
||||
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
|
||||
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
|
||||
if (!cdsWatcherDataOr.hasValue()) {
|
||||
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
|
||||
|
@ -325,8 +318,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
|
||||
break;
|
||||
case EDS:
|
||||
TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
|
||||
tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
|
||||
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
|
||||
tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
|
||||
if (edsWatcher != null) {
|
||||
child = new EndpointConfig(edsWatcher.getData());
|
||||
} else {
|
||||
|
@ -353,27 +346,27 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
}
|
||||
|
||||
private void addRdsWatcher(String resourceName) {
|
||||
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
|
||||
if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
|
||||
addWatcher(new RdsWatcher(resourceName));
|
||||
}
|
||||
|
||||
private void addEdsWatcher(String edsServiceName) {
|
||||
if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
|
||||
if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
|
||||
addWatcher(new EdsWatcher(edsServiceName));
|
||||
}
|
||||
|
||||
private void addClusterWatcher(String clusterName) {
|
||||
if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
|
||||
if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
|
||||
addWatcher(new CdsWatcher(clusterName));
|
||||
}
|
||||
|
||||
private void updateRoutes(List<VirtualHost> virtualHosts) {
|
||||
|
@ -411,13 +404,13 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
return clusters;
|
||||
}
|
||||
|
||||
private static class TypeWatchers<T> {
|
||||
private static class TypeWatchers<T extends ResourceUpdate> {
|
||||
// Key is resource name
|
||||
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
|
||||
final TrackedWatcherType<T> watcherType;
|
||||
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
|
||||
final XdsResourceType<T> resourceType;
|
||||
|
||||
TypeWatchers(TrackedWatcherType<T> watcherType) {
|
||||
this.watcherType = checkNotNull(watcherType, "watcherType");
|
||||
TypeWatchers(XdsResourceType<T> resourceType) {
|
||||
this.resourceType = resourceType;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -449,36 +442,38 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
/** State for tracing garbage collector. */
|
||||
private static final class WatcherTracer {
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
|
||||
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
|
||||
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
|
||||
|
||||
public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
|
||||
public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
|
||||
this.resourceWatchers = resourceWatchers;
|
||||
|
||||
this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
|
||||
for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
|
||||
usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
|
||||
this.usedWatchers = new HashMap<>();
|
||||
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
|
||||
usedWatchers.put(type, newTypeWatchers(type));
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
|
||||
private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
|
||||
XdsResourceType<T> type) {
|
||||
return new TypeWatchers<T>(type);
|
||||
}
|
||||
|
||||
public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
|
||||
public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
|
||||
XdsResourceType<T> resourceType, String name) {
|
||||
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
|
||||
if (typeWatchers == null) {
|
||||
return null;
|
||||
}
|
||||
assert typeWatchers.watcherType == watcherType;
|
||||
assert typeWatchers.resourceType == resourceType;
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
|
||||
TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
|
||||
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
|
||||
if (watcher == null) {
|
||||
return null;
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
|
||||
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
|
||||
usedTypeWatchers.watchers.put(name, watcher);
|
||||
return watcher;
|
||||
}
|
||||
|
@ -486,9 +481,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
/** Shut down unused watchers. */
|
||||
public void closeUnusedWatchers() {
|
||||
boolean changed = false; // Help out the GC by preferring old objects
|
||||
for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
|
||||
TypeWatchers<?> orig = resourceWatchers.get(key);
|
||||
TypeWatchers<?> used = usedWatchers.get(key);
|
||||
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
|
||||
TypeWatchers<?> orig = resourceWatchers.get(type);
|
||||
TypeWatchers<?> used = usedWatchers.get(type);
|
||||
for (String name : orig.watchers.keySet()) {
|
||||
if (used.watchers.containsKey(name)) {
|
||||
continue;
|
||||
|
@ -503,33 +498,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnusedTypeParameter")
|
||||
private static final class TrackedWatcherType<T> {
|
||||
public final TrackedWatcherTypeEnum typeEnum;
|
||||
|
||||
public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
|
||||
this.typeEnum = checkNotNull(typeEnum, "typeEnum");
|
||||
}
|
||||
}
|
||||
|
||||
private interface TrackedWatcher<T> {
|
||||
@Nullable
|
||||
StatusOr<T> getData();
|
||||
|
||||
default boolean missingResult() {
|
||||
return getData() == null;
|
||||
}
|
||||
|
||||
default boolean hasDataValue() {
|
||||
StatusOr<T> data = getData();
|
||||
return data != null && data.hasValue();
|
||||
}
|
||||
|
||||
void close();
|
||||
}
|
||||
|
||||
private abstract class XdsWatcherBase<T extends ResourceUpdate>
|
||||
implements ResourceWatcher<T>, TrackedWatcher<T> {
|
||||
implements ResourceWatcher<T> {
|
||||
private final XdsResourceType<T> type;
|
||||
private final String resourceName;
|
||||
boolean cancelled;
|
||||
|
@ -584,18 +554,24 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
protected abstract void subscribeToChildren(T update);
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancelled = true;
|
||||
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean missingResult() {
|
||||
return data == null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public StatusOr<T> getData() {
|
||||
StatusOr<T> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
boolean hasDataValue() {
|
||||
return data != null && data.hasValue();
|
||||
}
|
||||
|
||||
public String toContextString() {
|
||||
return toContextStr(type.typeName(), resourceName);
|
||||
}
|
||||
|
@ -646,7 +622,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
if (rdsName == null) {
|
||||
return null;
|
||||
}
|
||||
return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
|
||||
return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
|
||||
}
|
||||
|
||||
public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
|
||||
|
@ -712,7 +688,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
|
||||
CdsWatcher(String resourceName) {
|
||||
super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
|
||||
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -745,7 +721,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
|||
|
||||
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
|
||||
private EdsWatcher(String resourceName) {
|
||||
super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
|
||||
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue