mirror of https://github.com/grpc/grpc-java.git
xds: Improve shutdown handling of XdsDepManager
The most important change here is to handle subscribeToCluster() calls after shutdown(), and preventing the internal state from being heavily confused as the assumption is there are no watchers after shutdown(). ClusterSubscription.closed isn't strictly necessary, but I don't want the code to depend on double-deregistration being safe. maybePublishConfig() isn't being called after shutdown(), but adding the protection avoids a class of bugs that would cause channel panic.
This commit is contained in:
parent
22cf7cf2ac
commit
142e378cea
|
@ -93,13 +93,15 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Closeable subscribeToCluster(String clusterName) {
|
public Closeable subscribeToCluster(String clusterName) {
|
||||||
|
|
||||||
checkNotNull(clusterName, "clusterName");
|
checkNotNull(clusterName, "clusterName");
|
||||||
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
ClusterSubscription subscription = new ClusterSubscription(clusterName);
|
||||||
|
|
||||||
syncContext.execute(() -> {
|
syncContext.execute(() -> {
|
||||||
|
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||||
|
subscription.closed = true;
|
||||||
|
return; // shutdown() called
|
||||||
|
}
|
||||||
addClusterWatcher(clusterName, subscription, 1);
|
addClusterWatcher(clusterName, subscription, 1);
|
||||||
maybePublishConfig();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
return subscription;
|
return subscription;
|
||||||
|
@ -207,10 +209,14 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
checkNotNull(subscription, "subscription");
|
checkNotNull(subscription, "subscription");
|
||||||
String clusterName = subscription.getClusterName();
|
String clusterName = subscription.getClusterName();
|
||||||
syncContext.execute(() -> {
|
syncContext.execute(() -> {
|
||||||
|
if (subscription.closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
subscription.closed = true;
|
||||||
XdsWatcherBase<?> cdsWatcher =
|
XdsWatcherBase<?> cdsWatcher =
|
||||||
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
|
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
|
||||||
if (cdsWatcher == null) {
|
if (cdsWatcher == null) {
|
||||||
return; // already released while waiting for the syncContext
|
return; // shutdown() called
|
||||||
}
|
}
|
||||||
cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
|
cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
|
||||||
maybePublishConfig();
|
maybePublishConfig();
|
||||||
|
@ -257,6 +263,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
*/
|
*/
|
||||||
private void maybePublishConfig() {
|
private void maybePublishConfig() {
|
||||||
syncContext.throwIfNotInThisSynchronizationContext();
|
syncContext.throwIfNotInThisSynchronizationContext();
|
||||||
|
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
|
||||||
|
return; // shutdown() called
|
||||||
|
}
|
||||||
boolean waitingOnResource = resourceWatchers.values().stream()
|
boolean waitingOnResource = resourceWatchers.values().stream()
|
||||||
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
|
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
|
||||||
.anyMatch(XdsWatcherBase::missingResult);
|
.anyMatch(XdsWatcherBase::missingResult);
|
||||||
|
@ -293,6 +302,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
|
routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (routeSource == null) {
|
||||||
|
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
|
||||||
|
"Bug: No route source found for listener " + dataPlaneAuthority));
|
||||||
|
}
|
||||||
|
|
||||||
StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
|
StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
|
||||||
if (!statusOrRdsUpdate.hasValue()) {
|
if (!statusOrRdsUpdate.hasValue()) {
|
||||||
return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
|
return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
|
||||||
|
@ -557,14 +571,15 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
|
||||||
void onUpdate(StatusOr<XdsConfig> config);
|
void onUpdate(StatusOr<XdsConfig> config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ClusterSubscription implements Closeable {
|
private final class ClusterSubscription implements Closeable {
|
||||||
String clusterName;
|
private final String clusterName;
|
||||||
|
boolean closed; // Accessed from syncContext
|
||||||
|
|
||||||
public ClusterSubscription(String clusterName) {
|
public ClusterSubscription(String clusterName) {
|
||||||
this.clusterName = clusterName;
|
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getClusterName() {
|
String getClusterName() {
|
||||||
return clusterName;
|
return clusterName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -848,6 +848,22 @@ public class XdsDependencyManagerTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void subscribeToClusterAfterShutdown() throws Exception {
|
||||||
|
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
|
||||||
|
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
|
||||||
|
|
||||||
|
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
|
||||||
|
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
|
||||||
|
serverName, serverName, nameResolverArgs, scheduler);
|
||||||
|
inOrder.verify(xdsConfigWatcher).onUpdate(any());
|
||||||
|
xdsDependencyManager.shutdown();
|
||||||
|
|
||||||
|
Closeable subscription = xdsDependencyManager.subscribeToCluster("CDS");
|
||||||
|
inOrder.verify(xdsConfigWatcher, never()).onUpdate(any());
|
||||||
|
subscription.close();
|
||||||
|
}
|
||||||
|
|
||||||
private Listener buildInlineClientListener(String rdsName, String clusterName) {
|
private Listener buildInlineClientListener(String rdsName, String clusterName) {
|
||||||
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
|
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue