diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index 999ee0d4b0..7af03caf4b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -26,6 +26,7 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import java.io.Closeable; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -103,19 +104,17 @@ final class XdsConfig { static final class XdsClusterConfig { private final String clusterName; private final CdsUpdate clusterResource; - private final StatusOr endpoint; //Will be null for non-EDS clusters + private final ClusterChild children; // holds details - XdsClusterConfig(String clusterName, CdsUpdate clusterResource, - StatusOr endpoint) { + XdsClusterConfig(String clusterName, CdsUpdate clusterResource, ClusterChild details) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.clusterResource = checkNotNull(clusterResource, "clusterResource"); - this.endpoint = endpoint; + this.children = checkNotNull(details, "details"); } @Override public int hashCode() { - int endpointHash = (endpoint != null) ? endpoint.hashCode() : 0; - return clusterName.hashCode() + clusterResource.hashCode() + endpointHash; + return clusterName.hashCode() + clusterResource.hashCode() + children.hashCode(); } @Override @@ -126,7 +125,7 @@ final class XdsConfig { XdsClusterConfig o = (XdsClusterConfig) obj; return Objects.equals(clusterName, o.clusterName) && Objects.equals(clusterResource, o.clusterResource) - && Objects.equals(endpoint, o.endpoint); + && Objects.equals(children, o.children); } @Override @@ -134,7 +133,8 @@ final class XdsConfig { StringBuilder builder = new StringBuilder(); builder.append("XdsClusterConfig{clusterName=").append(clusterName) .append(", clusterResource=").append(clusterResource) - .append(", endpoint=").append(endpoint).append("}"); + .append(", children={").append(children) + .append("}"); return builder.toString(); } @@ -146,8 +146,60 @@ final class XdsConfig { return clusterResource; } - public StatusOr getEndpoint() { - return endpoint; + public ClusterChild getChildren() { + return children; + } + + interface ClusterChild {} + + /** Endpoint info for EDS and LOGICAL_DNS clusters. If there was an + * error, endpoints will be null and resolution_note will be set. + */ + static final class EndpointConfig implements ClusterChild { + private final StatusOr endpoint; + + public EndpointConfig(StatusOr endpoint) { + this.endpoint = checkNotNull(endpoint, "endpoint"); + } + + @Override + public int hashCode() { + return endpoint.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof EndpointConfig)) { + return false; + } + return Objects.equals(endpoint, ((EndpointConfig)obj).endpoint); + } + + public StatusOr getEndpoint() { + return endpoint; + } + } + + // The list of leaf clusters for an aggregate cluster. + static final class AggregateConfig implements ClusterChild { + private final List leafNames; + + public AggregateConfig(List leafNames) { + this.leafNames = checkNotNull(leafNames, "leafNames"); + } + + @Override + public int hashCode() { + return leafNames.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof AggregateConfig)) { + return false; + } + return Objects.equals(leafNames, ((AggregateConfig) obj).leafNames); + } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index d2af47bc9d..a740095037 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -29,6 +29,9 @@ import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; +import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; @@ -36,6 +39,7 @@ import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsResourceType; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -43,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -299,27 +304,123 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi Map> cdsWatchers = resourceWatchers.get(CLUSTER_RESOURCE).watchers; - // Iterate CDS watchers - for (XdsWatcherBase watcher : cdsWatchers.values()) { - CdsWatcher cdsWatcher = (CdsWatcher) watcher; - String clusterName = cdsWatcher.resourceName(); - StatusOr cdsUpdate = cdsWatcher.getData(); - if (cdsUpdate.hasValue()) { - XdsConfig.XdsClusterConfig clusterConfig; - String edsName = cdsUpdate.getValue().edsServiceName(); - EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName); + // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters + List topLevelClusters = + cdsWatchers.values().stream() + .filter(XdsDependencyManager::isTopLevelCluster) + .map(w -> w.resourceName()) + .collect(Collectors.toList()); - // Only EDS type clusters have endpoint data - StatusOr data = - edsWatcher != null ? edsWatcher.getData() : null; - clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data); - builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig)); + // Flatten multi-level aggregates into lists of leaf clusters + Set leafNames = + addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters); + + addLeavesToBuilder(builder, edsWatchers, leafNames); + + return builder.build(); + } + + private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder, + Map> edsWatchers, + Set leafNames) { + for (String clusterName : leafNames) { + CdsWatcher cdsWatcher = getCluster(clusterName); + StatusOr cdsUpdateOr = cdsWatcher.getData(); + + if (cdsUpdateOr.hasValue()) { + XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); + if (cdsUpdate.clusterType() == ClusterType.EDS) { + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); + if (edsWatcher != null) { + EndpointConfig child = new EndpointConfig(edsWatcher.getData()); + builder.addCluster(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); + } else { + builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + "EDS resource not found for cluster " + clusterName))); + } + } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { + // TODO get the resolved endpoint configuration + builder.addCluster(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null)))); + } } else { - builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus())); + builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus())); + } + } + } + + // Adds the top-level clusters to the builder and returns the leaf cluster names + private Set addTopLevelClustersToBuilder( + XdsConfig.XdsConfigBuilder builder, Map> edsWatchers, + Map> cdsWatchers, List topLevelClusters) { + + Set leafClusterNames = new HashSet<>(); + for (String clusterName : topLevelClusters) { + CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName); + StatusOr cdsWatcherDataOr = cdsWatcher.getData(); + if (!cdsWatcher.hasDataValue()) { + builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus())); + continue; + } + + XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue(); + XdsConfig.XdsClusterConfig.ClusterChild child; + switch (cdsUpdate.clusterType()) { + case AGGREGATE: + List leafNames = getLeafNames(cdsUpdate); + child = new AggregateConfig(leafNames); + leafClusterNames.addAll(leafNames); + break; + case EDS: + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); + if (edsWatcher != null) { + child = new EndpointConfig(edsWatcher.getData()); + } else { + builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + "EDS resource not found for cluster " + clusterName))); + continue; + } + break; + case LOGICAL_DNS: + // TODO get the resolved endpoint configuration + child = new EndpointConfig(null); + break; + default: + throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); + } + builder.addCluster(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); + } + + return leafClusterNames; + } + + private List getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) { + List childNames = new ArrayList<>(); + + for (String cluster : cdsUpdate.prioritizedClusterNames()) { + StatusOr data = getCluster(cluster).getData(); + if (data == null || !data.hasValue() || data.getValue() == null) { + childNames.add(cluster); + continue; + } + if (data.getValue().clusterType() == ClusterType.AGGREGATE) { + childNames.addAll(getLeafNames(data.getValue())); + } else { + childNames.add(cluster); } } - return builder.build(); + return childNames; + } + + private static boolean isTopLevelCluster(XdsWatcherBase cdsWatcher) { + if (! (cdsWatcher instanceof CdsWatcher)) { + return false; + } + return ((CdsWatcher)cdsWatcher).parentContexts.values().stream() + .anyMatch(depth -> depth == 1); } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 96aeb0f41f..c4b7c6c8ac 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -57,6 +57,8 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsConfig.XdsClusterConfig; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsClient; @@ -219,28 +221,37 @@ public class XdsDependencyManagerTest { XdsTestUtils.setAggregateCdsConfig(controlPlaneService, serverName, rootName, childNames); inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); - Map> lastConfigClusters = + Map> lastConfigClusters = testWatcher.lastConfig.getClusters(); assertThat(lastConfigClusters).hasSize(childNames.size() + 1); - StatusOr rootC = lastConfigClusters.get(rootName); + StatusOr rootC = lastConfigClusters.get(rootName); XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE); assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames); for (String childName : childNames) { assertThat(lastConfigClusters).containsKey(childName); + StatusOr childConfigOr = lastConfigClusters.get(childName); XdsClusterResource.CdsUpdate childResource = - lastConfigClusters.get(childName).getValue().getClusterResource(); + childConfigOr.getValue().getClusterResource(); assertThat(childResource.clusterType()).isEqualTo(EDS); assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName)); - StatusOr endpoint = - lastConfigClusters.get(childName).getValue().getEndpoint(); + StatusOr endpoint = getEndpoint(childConfigOr); assertThat(endpoint.hasValue()).isTrue(); assertThat(endpoint.getValue().clusterName).isEqualTo(getEdsNameForCluster(childName)); } } + private static StatusOr getEndpoint(StatusOr childConfigOr) { + XdsClusterConfig.ClusterChild clusterChild = childConfigOr.getValue() + .getChildren(); + assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class); + StatusOr endpoint = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint(); + assertThat(endpoint).isNotNull(); + return endpoint; + } + @Test public void testComplexRegisteredAggregate() throws IOException { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); @@ -289,7 +300,6 @@ public class XdsDependencyManagerTest { inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); String rootName1 = "root_c"; - List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); assertThat(subscription1).isNotNull(); @@ -299,6 +309,7 @@ public class XdsDependencyManagerTest { StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( "No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString()); + List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames); inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture()); assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue(); @@ -336,7 +347,7 @@ public class XdsDependencyManagerTest { fakeClock.forwardTime(16, TimeUnit.SECONDS); verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - List> returnedClusters = new ArrayList<>(); + List> returnedClusters = new ArrayList<>(); for (String childName : childNames) { returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName)); } @@ -344,7 +355,7 @@ public class XdsDependencyManagerTest { // Check that missing cluster reported Status and the other 2 are present Status expectedClusterStatus = Status.UNAVAILABLE.withDescription( "No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2))); - StatusOr missingCluster = returnedClusters.get(2); + StatusOr missingCluster = returnedClusters.get(2); assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString()); assertThat(returnedClusters.get(0).hasValue()).isTrue(); assertThat(returnedClusters.get(1).hasValue()).isTrue(); @@ -352,9 +363,9 @@ public class XdsDependencyManagerTest { // Check that missing EDS reported Status, the other one is present and the garbage EDS is not Status expectedEdsStatus = Status.UNAVAILABLE.withDescription( "No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1)); - assertThat(returnedClusters.get(0).getValue().getEndpoint().hasValue()).isTrue(); - assertThat(returnedClusters.get(1).getValue().getEndpoint().hasValue()).isFalse(); - assertThat(returnedClusters.get(1).getValue().getEndpoint().getStatus().toString()) + assertThat(getEndpoint(returnedClusters.get(0)).hasValue()).isTrue(); + assertThat(getEndpoint(returnedClusters.get(1)).hasValue()).isFalse(); + assertThat(getEndpoint(returnedClusters.get(1)).getStatus().toString()) .isEqualTo(expectedEdsStatus.toString()); verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any()); @@ -539,7 +550,7 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig)); inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(8); + assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(4); // Now that it is released, we should only have A11 rootSub.close(); @@ -582,11 +593,9 @@ public class XdsDependencyManagerTest { assertThat(initialConfig.getClusters().keySet()) .containsExactly("root", "clusterA", "clusterB"); - XdsEndpointResource.EdsUpdate edsForA = - initialConfig.getClusters().get("clusterA").getValue().getEndpoint().getValue(); + EdsUpdate edsForA = getEndpoint(initialConfig.getClusters().get("clusterA")).getValue(); assertThat(edsForA.clusterName).isEqualTo(edsName); - XdsEndpointResource.EdsUpdate edsForB = - initialConfig.getClusters().get("clusterB").getValue().getEndpoint().getValue(); + EdsUpdate edsForB = getEndpoint(initialConfig.getClusters().get("clusterB")).getValue(); assertThat(edsForB.clusterName).isEqualTo(edsName); assertThat(edsForA).isEqualTo(edsForB); edsForA.localityLbEndpointsMap.values().forEach( @@ -635,7 +644,7 @@ public class XdsDependencyManagerTest { inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); XdsConfig config = xdsConfigCaptor.getValue(); assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName); - assertThat(config.getClusters().size()).isEqualTo(8); + assertThat(config.getClusters().size()).isEqualTo(4); } @Test @@ -689,8 +698,7 @@ public class XdsDependencyManagerTest { // Verify that the config is updated as expected inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); XdsConfig config = xdsConfigCaptor.getValue(); - assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA", "clusterA2", - "clusterA21", "clusterA22"); + assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA21", "clusterA22"); } private Listener buildInlineClientListener(String rdsName, String clusterName) { diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index 7f5ec0b27c..ea28734ec6 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -57,6 +57,7 @@ import io.grpc.internal.JsonParser; import io.grpc.stub.StreamObserver; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; +import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig; import io.grpc.xds.client.Bootstrapper; import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsResourceType; @@ -269,7 +270,7 @@ public class XdsTestUtils { CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null) .lbPolicyConfig(getWrrLbConfigAsMap()).build(); XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( - CLUSTER_NAME, cdsUpdate, StatusOr.fromValue(edsUpdate)); + CLUSTER_NAME, cdsUpdate, new EndpointConfig(StatusOr.fromValue(edsUpdate))); builder .setListener(ldsUpdate)