mirror of https://github.com/grpc/grpc-java.git
xds: propagate audience from cluster resource in gcp auth filter (#11972)
This commit is contained in:
parent
908f9f19cd
commit
84c7713b2f
|
@ -16,8 +16,13 @@
|
||||||
|
|
||||||
package io.grpc.xds;
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static io.grpc.xds.XdsNameResolver.CLUSTER_SELECTION_KEY;
|
||||||
|
import static io.grpc.xds.XdsNameResolver.XDS_CONFIG_CALL_OPTION_KEY;
|
||||||
|
|
||||||
import com.google.auth.oauth2.ComputeEngineCredentials;
|
import com.google.auth.oauth2.ComputeEngineCredentials;
|
||||||
import com.google.auth.oauth2.IdTokenCredentials;
|
import com.google.auth.oauth2.IdTokenCredentials;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.primitives.UnsignedLongs;
|
import com.google.common.primitives.UnsignedLongs;
|
||||||
import com.google.protobuf.Any;
|
import com.google.protobuf.Any;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
@ -34,8 +39,11 @@ import io.grpc.CompositeCallCredentials;
|
||||||
import io.grpc.Metadata;
|
import io.grpc.Metadata;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.StatusOr;
|
||||||
import io.grpc.auth.MoreCallCredentials;
|
import io.grpc.auth.MoreCallCredentials;
|
||||||
|
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser.AudienceWrapper;
|
||||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||||
|
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||||
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -52,6 +60,13 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
static final String TYPE_URL =
|
static final String TYPE_URL =
|
||||||
"type.googleapis.com/envoy.extensions.filters.http.gcp_authn.v3.GcpAuthnFilterConfig";
|
"type.googleapis.com/envoy.extensions.filters.http.gcp_authn.v3.GcpAuthnFilterConfig";
|
||||||
|
|
||||||
|
final String filterInstanceName;
|
||||||
|
|
||||||
|
GcpAuthenticationFilter(String name) {
|
||||||
|
filterInstanceName = checkNotNull(name, "name");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static final class Provider implements Filter.Provider {
|
static final class Provider implements Filter.Provider {
|
||||||
@Override
|
@Override
|
||||||
public String[] typeUrls() {
|
public String[] typeUrls() {
|
||||||
|
@ -65,7 +80,7 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GcpAuthenticationFilter newInstance(String name) {
|
public GcpAuthenticationFilter newInstance(String name) {
|
||||||
return new GcpAuthenticationFilter();
|
return new GcpAuthenticationFilter(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -119,35 +134,58 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
||||||
|
|
||||||
/*String clusterName = callOptions.getOption(XdsAttributes.ATTR_CLUSTER_NAME);
|
String clusterName = callOptions.getOption(CLUSTER_SELECTION_KEY);
|
||||||
if (clusterName == null) {
|
if (clusterName == null) {
|
||||||
|
return new FailingClientCall<>(
|
||||||
|
Status.UNAVAILABLE.withDescription(
|
||||||
|
String.format(
|
||||||
|
"GCP Authn for %s does not contain cluster resource", filterInstanceName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clusterName.startsWith("cluster:")) {
|
||||||
return next.newCall(method, callOptions);
|
return next.newCall(method, callOptions);
|
||||||
}*/
|
}
|
||||||
|
XdsConfig xdsConfig = callOptions.getOption(XDS_CONFIG_CALL_OPTION_KEY);
|
||||||
// TODO: Fetch the CDS resource for the cluster.
|
if (xdsConfig == null) {
|
||||||
// If the CDS resource is not available, fail the RPC with Status.UNAVAILABLE.
|
return new FailingClientCall<>(
|
||||||
|
Status.UNAVAILABLE.withDescription(
|
||||||
// TODO: Extract the audience from the CDS resource metadata.
|
String.format(
|
||||||
// If the audience is not found or is in the wrong format, fail the RPC.
|
"GCP Authn for %s with %s does not contain xds configuration",
|
||||||
String audience = "TEST_AUDIENCE";
|
filterInstanceName, clusterName)));
|
||||||
|
}
|
||||||
try {
|
StatusOr<XdsClusterConfig> xdsCluster =
|
||||||
|
xdsConfig.getClusters().get(clusterName.substring("cluster:".length()));
|
||||||
|
if (xdsCluster == null) {
|
||||||
|
return new FailingClientCall<>(
|
||||||
|
Status.UNAVAILABLE.withDescription(
|
||||||
|
String.format(
|
||||||
|
"GCP Authn for %s with %s - xds cluster config does not contain xds cluster",
|
||||||
|
filterInstanceName, clusterName)));
|
||||||
|
}
|
||||||
|
if (!xdsCluster.hasValue()) {
|
||||||
|
return new FailingClientCall<>(xdsCluster.getStatus());
|
||||||
|
}
|
||||||
|
Object audienceObj =
|
||||||
|
xdsCluster.getValue().getClusterResource().parsedMetadata().get(filterInstanceName);
|
||||||
|
if (audienceObj == null) {
|
||||||
|
return next.newCall(method, callOptions);
|
||||||
|
}
|
||||||
|
if (!(audienceObj instanceof AudienceWrapper)) {
|
||||||
|
return new FailingClientCall<>(
|
||||||
|
Status.UNAVAILABLE.withDescription(
|
||||||
|
String.format("GCP Authn found wrong type in %s metadata: %s=%s",
|
||||||
|
clusterName, filterInstanceName, audienceObj.getClass())));
|
||||||
|
}
|
||||||
|
AudienceWrapper audience = (AudienceWrapper) audienceObj;
|
||||||
CallCredentials existingCallCredentials = callOptions.getCredentials();
|
CallCredentials existingCallCredentials = callOptions.getCredentials();
|
||||||
CallCredentials newCallCredentials =
|
CallCredentials newCallCredentials =
|
||||||
getCallCredentials(callCredentialsCache, audience, credentials);
|
getCallCredentials(callCredentialsCache, audience.audience, credentials);
|
||||||
if (existingCallCredentials != null) {
|
if (existingCallCredentials != null) {
|
||||||
callOptions = callOptions.withCallCredentials(
|
callOptions = callOptions.withCallCredentials(
|
||||||
new CompositeCallCredentials(existingCallCredentials, newCallCredentials));
|
new CompositeCallCredentials(existingCallCredentials, newCallCredentials));
|
||||||
} else {
|
} else {
|
||||||
callOptions = callOptions.withCallCredentials(newCallCredentials);
|
callOptions = callOptions.withCallCredentials(newCallCredentials);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
// If we fail to attach CallCredentials due to any reason, return a FailingClientCall
|
|
||||||
return new FailingClientCall<>(Status.UNAUTHENTICATED
|
|
||||||
.withDescription("Failed to attach CallCredentials.")
|
|
||||||
.withCause(e));
|
|
||||||
}
|
|
||||||
return next.newCall(method, callOptions);
|
return next.newCall(method, callOptions);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -186,9 +224,11 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** An implementation of {@link ClientCall} that fails when started. */
|
/** An implementation of {@link ClientCall} that fails when started. */
|
||||||
private static final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
@VisibleForTesting
|
||||||
|
static final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
private final Status error;
|
@VisibleForTesting
|
||||||
|
final Status error;
|
||||||
|
|
||||||
public FailingClientCall(Status error) {
|
public FailingClientCall(Status error) {
|
||||||
this.error = error;
|
this.error = error;
|
||||||
|
@ -235,13 +275,21 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
|
|
||||||
static class AudienceMetadataParser implements MetadataValueParser {
|
static class AudienceMetadataParser implements MetadataValueParser {
|
||||||
|
|
||||||
|
static final class AudienceWrapper {
|
||||||
|
final String audience;
|
||||||
|
|
||||||
|
AudienceWrapper(String audience) {
|
||||||
|
this.audience = checkNotNull(audience);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getTypeUrl() {
|
public String getTypeUrl() {
|
||||||
return "type.googleapis.com/envoy.extensions.filters.http.gcp_authn.v3.Audience";
|
return "type.googleapis.com/envoy.extensions.filters.http.gcp_authn.v3.Audience";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String parse(Any any) throws ResourceInvalidException {
|
public AudienceWrapper parse(Any any) throws ResourceInvalidException {
|
||||||
Audience audience;
|
Audience audience;
|
||||||
try {
|
try {
|
||||||
audience = any.unpack(Audience.class);
|
audience = any.unpack(Audience.class);
|
||||||
|
@ -253,7 +301,7 @@ final class GcpAuthenticationFilter implements Filter {
|
||||||
throw new ResourceInvalidException(
|
throw new ResourceInvalidException(
|
||||||
"Audience URL is empty. Metadata value must contain a valid URL.");
|
"Audience URL is empty. Metadata value must contain a valid URL.");
|
||||||
}
|
}
|
||||||
return url;
|
return new AudienceWrapper(url);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,25 +17,60 @@
|
||||||
package io.grpc.xds;
|
package io.grpc.xds;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static io.grpc.xds.XdsNameResolver.CLUSTER_SELECTION_KEY;
|
||||||
|
import static io.grpc.xds.XdsNameResolver.XDS_CONFIG_CALL_OPTION_KEY;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.CLUSTER_NAME;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.EDS_NAME;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.buildRouteConfiguration;
|
||||||
|
import static io.grpc.xds.XdsTestUtils.getWrrLbConfigAsMap;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.protobuf.Any;
|
import com.google.protobuf.Any;
|
||||||
import com.google.protobuf.Empty;
|
import com.google.protobuf.Empty;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.UInt64Value;
|
import com.google.protobuf.UInt64Value;
|
||||||
|
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
||||||
import io.envoyproxy.envoy.extensions.filters.http.gcp_authn.v3.GcpAuthnFilterConfig;
|
import io.envoyproxy.envoy.extensions.filters.http.gcp_authn.v3.GcpAuthnFilterConfig;
|
||||||
import io.envoyproxy.envoy.extensions.filters.http.gcp_authn.v3.TokenCacheConfig;
|
import io.envoyproxy.envoy.extensions.filters.http.gcp_authn.v3.TokenCacheConfig;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.Channel;
|
import io.grpc.Channel;
|
||||||
|
import io.grpc.ClientCall;
|
||||||
import io.grpc.ClientInterceptor;
|
import io.grpc.ClientInterceptor;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.StatusOr;
|
||||||
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
import io.grpc.testing.TestMethodDescriptors;
|
import io.grpc.testing.TestMethodDescriptors;
|
||||||
|
import io.grpc.xds.Endpoints.LbEndpoint;
|
||||||
|
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||||
|
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser.AudienceWrapper;
|
||||||
|
import io.grpc.xds.GcpAuthenticationFilter.FailingClientCall;
|
||||||
import io.grpc.xds.GcpAuthenticationFilter.GcpAuthenticationConfig;
|
import io.grpc.xds.GcpAuthenticationFilter.GcpAuthenticationConfig;
|
||||||
|
import io.grpc.xds.XdsClusterResource.CdsUpdate;
|
||||||
|
import io.grpc.xds.XdsConfig.XdsClusterConfig;
|
||||||
|
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
|
||||||
|
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
|
||||||
|
import io.grpc.xds.XdsListenerResource.LdsUpdate;
|
||||||
|
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
|
||||||
|
import io.grpc.xds.client.Locality;
|
||||||
|
import io.grpc.xds.client.XdsResourceType;
|
||||||
|
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
@ -46,6 +81,17 @@ import org.mockito.Mockito;
|
||||||
public class GcpAuthenticationFilterTest {
|
public class GcpAuthenticationFilterTest {
|
||||||
private static final GcpAuthenticationFilter.Provider FILTER_PROVIDER =
|
private static final GcpAuthenticationFilter.Provider FILTER_PROVIDER =
|
||||||
new GcpAuthenticationFilter.Provider();
|
new GcpAuthenticationFilter.Provider();
|
||||||
|
private static final String serverName = InProcessServerBuilder.generateName();
|
||||||
|
private static final LdsUpdate ldsUpdate = getLdsUpdate();
|
||||||
|
private static final EdsUpdate edsUpdate = getEdsUpdate();
|
||||||
|
private static final RdsUpdate rdsUpdate = getRdsUpdate();
|
||||||
|
private static final CdsUpdate cdsUpdate = getCdsUpdate();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewFilterInstancesPerFilterName() {
|
||||||
|
assertThat(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1"))
|
||||||
|
.isNotEqualTo(new GcpAuthenticationFilter("FILTER_INSTANCE_NAME1"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void filterType_clientOnly() {
|
public void filterType_clientOnly() {
|
||||||
|
@ -92,35 +138,258 @@ public class GcpAuthenticationFilterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientInterceptor_createsAndReusesCachedCredentials() {
|
public void testClientInterceptor_success() throws IOException, ResourceInvalidException {
|
||||||
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
|
CLUSTER_NAME,
|
||||||
|
cdsUpdate,
|
||||||
|
new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
GcpAuthenticationFilter filter = new GcpAuthenticationFilter();
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
|
||||||
// Create interceptor
|
|
||||||
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
|
||||||
// Mock channel and capture CallOptions
|
|
||||||
Channel mockChannel = Mockito.mock(Channel.class);
|
Channel mockChannel = Mockito.mock(Channel.class);
|
||||||
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class);
|
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class);
|
||||||
|
|
||||||
// Execute interception twice to check caching
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
interceptor.interceptCall(methodDescriptor, CallOptions.DEFAULT, mockChannel);
|
|
||||||
interceptor.interceptCall(methodDescriptor, CallOptions.DEFAULT, mockChannel);
|
|
||||||
|
|
||||||
// Capture and verify CallOptions for CallCredentials presence
|
verify(mockChannel).newCall(eq(methodDescriptor), callOptionsCaptor.capture());
|
||||||
Mockito.verify(mockChannel, Mockito.times(2))
|
CallOptions capturedOptions = callOptionsCaptor.getAllValues().get(0);
|
||||||
|
assertNotNull(capturedOptions.getCredentials());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_createsAndReusesCachedCredentials()
|
||||||
|
throws IOException, ResourceInvalidException {
|
||||||
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
|
CLUSTER_NAME,
|
||||||
|
cdsUpdate,
|
||||||
|
new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = Mockito.mock(Channel.class);
|
||||||
|
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class);
|
||||||
|
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
verify(mockChannel, Mockito.times(2))
|
||||||
.newCall(eq(methodDescriptor), callOptionsCaptor.capture());
|
.newCall(eq(methodDescriptor), callOptionsCaptor.capture());
|
||||||
|
|
||||||
// Retrieve the CallOptions captured from both calls
|
|
||||||
CallOptions firstCapturedOptions = callOptionsCaptor.getAllValues().get(0);
|
CallOptions firstCapturedOptions = callOptionsCaptor.getAllValues().get(0);
|
||||||
CallOptions secondCapturedOptions = callOptionsCaptor.getAllValues().get(1);
|
CallOptions secondCapturedOptions = callOptionsCaptor.getAllValues().get(1);
|
||||||
|
|
||||||
// Ensure that CallCredentials was added
|
|
||||||
assertNotNull(firstCapturedOptions.getCredentials());
|
assertNotNull(firstCapturedOptions.getCredentials());
|
||||||
assertNotNull(secondCapturedOptions.getCredentials());
|
assertNotNull(secondCapturedOptions.getCredentials());
|
||||||
|
|
||||||
// Ensure that the CallCredentials from both calls are the same, indicating caching
|
|
||||||
assertSame(firstCapturedOptions.getCredentials(), secondCapturedOptions.getCredentials());
|
assertSame(firstCapturedOptions.getCredentials(), secondCapturedOptions.getCredentials());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_withoutClusterSelectionKey() throws Exception {
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = mock(Channel.class);
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT;
|
||||||
|
|
||||||
|
ClientCall<Void, Void> call = interceptor.interceptCall(
|
||||||
|
methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
assertTrue(call instanceof FailingClientCall);
|
||||||
|
FailingClientCall<Void, Void> clientCall = (FailingClientCall<Void, Void>) call;
|
||||||
|
assertThat(clientCall.error.getDescription()).contains("does not contain cluster resource");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_clusterSelectionKeyWithoutPrefix() throws Exception {
|
||||||
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
|
CLUSTER_NAME,
|
||||||
|
cdsUpdate,
|
||||||
|
new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster0")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
|
Channel mockChannel = mock(Channel.class);
|
||||||
|
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
verify(mockChannel).newCall(methodDescriptor, callOptionsWithXds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_xdsConfigDoesNotExist() throws Exception {
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = mock(Channel.class);
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0");
|
||||||
|
|
||||||
|
ClientCall<Void, Void> call =
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
assertTrue(call instanceof FailingClientCall);
|
||||||
|
FailingClientCall<Void, Void> clientCall = (FailingClientCall<Void, Void>) call;
|
||||||
|
assertThat(clientCall.error.getDescription()).contains("does not contain xds configuration");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_incorrectClusterName() throws Exception {
|
||||||
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
|
CLUSTER_NAME,
|
||||||
|
cdsUpdate,
|
||||||
|
new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster("custer0", StatusOr.fromValue(clusterConfig)).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = mock(Channel.class);
|
||||||
|
|
||||||
|
ClientCall<Void, Void> call =
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
assertTrue(call instanceof FailingClientCall);
|
||||||
|
FailingClientCall<Void, Void> clientCall = (FailingClientCall<Void, Void>) call;
|
||||||
|
assertThat(clientCall.error.getDescription()).contains("does not contain xds cluster");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_statusOrError() throws Exception {
|
||||||
|
StatusOr<XdsClusterConfig> errorCluster =
|
||||||
|
StatusOr.fromStatus(Status.NOT_FOUND.withDescription("Cluster resource not found"));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster(CLUSTER_NAME, errorCluster).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = mock(Channel.class);
|
||||||
|
|
||||||
|
ClientCall<Void, Void> call =
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
assertTrue(call instanceof FailingClientCall);
|
||||||
|
FailingClientCall<Void, Void> clientCall = (FailingClientCall<Void, Void>) call;
|
||||||
|
assertThat(clientCall.error.getDescription()).contains("Cluster resource not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientInterceptor_notAudienceWrapper()
|
||||||
|
throws IOException, ResourceInvalidException {
|
||||||
|
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
|
||||||
|
CLUSTER_NAME,
|
||||||
|
getCdsUpdateWithIncorrectAudienceWrapper(),
|
||||||
|
new EndpointConfig(StatusOr.fromValue(edsUpdate)));
|
||||||
|
XdsConfig defaultXdsConfig = new XdsConfig.XdsConfigBuilder()
|
||||||
|
.setListener(ldsUpdate)
|
||||||
|
.setRoute(rdsUpdate)
|
||||||
|
.setVirtualHost(rdsUpdate.virtualHosts.get(0))
|
||||||
|
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)).build();
|
||||||
|
CallOptions callOptionsWithXds = CallOptions.DEFAULT
|
||||||
|
.withOption(CLUSTER_SELECTION_KEY, "cluster:cluster0")
|
||||||
|
.withOption(XDS_CONFIG_CALL_OPTION_KEY, defaultXdsConfig);
|
||||||
|
GcpAuthenticationConfig config = new GcpAuthenticationConfig(10);
|
||||||
|
GcpAuthenticationFilter filter = new GcpAuthenticationFilter("FILTER_INSTANCE_NAME");
|
||||||
|
ClientInterceptor interceptor = filter.buildClientInterceptor(config, null, null);
|
||||||
|
MethodDescriptor<Void, Void> methodDescriptor = TestMethodDescriptors.voidMethod();
|
||||||
|
Channel mockChannel = Mockito.mock(Channel.class);
|
||||||
|
|
||||||
|
ClientCall<Void, Void> call =
|
||||||
|
interceptor.interceptCall(methodDescriptor, callOptionsWithXds, mockChannel);
|
||||||
|
|
||||||
|
assertTrue(call instanceof FailingClientCall);
|
||||||
|
FailingClientCall<Void, Void> clientCall = (FailingClientCall<Void, Void>) call;
|
||||||
|
assertThat(clientCall.error.getDescription()).contains("GCP Authn found wrong type");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LdsUpdate getLdsUpdate() {
|
||||||
|
Filter.NamedFilterConfig routerFilterConfig = new Filter.NamedFilterConfig(
|
||||||
|
serverName, RouterFilter.ROUTER_CONFIG);
|
||||||
|
HttpConnectionManager httpConnectionManager = HttpConnectionManager.forRdsName(
|
||||||
|
0L, RDS_NAME, Collections.singletonList(routerFilterConfig));
|
||||||
|
return XdsListenerResource.LdsUpdate.forApiListener(httpConnectionManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RdsUpdate getRdsUpdate() {
|
||||||
|
RouteConfiguration routeConfiguration =
|
||||||
|
buildRouteConfiguration(serverName, RDS_NAME, CLUSTER_NAME);
|
||||||
|
XdsResourceType.Args args = new XdsResourceType.Args(null, "0", "0", null, null, null);
|
||||||
|
try {
|
||||||
|
return XdsRouteConfigureResource.getInstance().doParse(args, routeConfiguration);
|
||||||
|
} catch (ResourceInvalidException ex) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EdsUpdate getEdsUpdate() {
|
||||||
|
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
|
||||||
|
LbEndpoint lbEndpoint = LbEndpoint.create(
|
||||||
|
serverName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of());
|
||||||
|
lbEndpointsMap.put(
|
||||||
|
Locality.create("", "", ""),
|
||||||
|
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of()));
|
||||||
|
return new XdsEndpointResource.EdsUpdate(EDS_NAME, lbEndpointsMap, Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CdsUpdate getCdsUpdate() {
|
||||||
|
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
||||||
|
parsedMetadata.put("FILTER_INSTANCE_NAME", new AudienceWrapper("TEST_AUDIENCE"));
|
||||||
|
try {
|
||||||
|
CdsUpdate.Builder cdsUpdate = CdsUpdate.forEds(
|
||||||
|
CLUSTER_NAME, EDS_NAME, null, null, null, null, false)
|
||||||
|
.lbPolicyConfig(getWrrLbConfigAsMap());
|
||||||
|
return cdsUpdate.parsedMetadata(parsedMetadata.build()).build();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CdsUpdate getCdsUpdateWithIncorrectAudienceWrapper() throws IOException {
|
||||||
|
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
|
||||||
|
parsedMetadata.put("FILTER_INSTANCE_NAME", "TEST_AUDIENCE");
|
||||||
|
CdsUpdate.Builder cdsUpdate = CdsUpdate.forEds(
|
||||||
|
CLUSTER_NAME, EDS_NAME, null, null, null, null, false)
|
||||||
|
.lbPolicyConfig(getWrrLbConfigAsMap());
|
||||||
|
return cdsUpdate.parsedMetadata(parsedMetadata.build()).build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,7 @@ import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
|
||||||
import io.grpc.xds.Endpoints.LbEndpoint;
|
import io.grpc.xds.Endpoints.LbEndpoint;
|
||||||
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
||||||
import io.grpc.xds.Filter.FilterConfig;
|
import io.grpc.xds.Filter.FilterConfig;
|
||||||
|
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser.AudienceWrapper;
|
||||||
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
|
||||||
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
|
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
|
||||||
import io.grpc.xds.VirtualHost.Route;
|
import io.grpc.xds.VirtualHost.Route;
|
||||||
|
@ -2417,8 +2418,7 @@ public class GrpcXdsClientImplDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processCluster_parsesAudienceMetadata()
|
public void processCluster_parsesAudienceMetadata() throws Exception {
|
||||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
|
||||||
MetadataRegistry.getInstance();
|
MetadataRegistry.getInstance();
|
||||||
|
|
||||||
Audience audience = Audience.newBuilder()
|
Audience audience = Audience.newBuilder()
|
||||||
|
@ -2462,7 +2462,10 @@ public class GrpcXdsClientImplDataTest {
|
||||||
"FILTER_METADATA", ImmutableMap.of(
|
"FILTER_METADATA", ImmutableMap.of(
|
||||||
"key1", "value1",
|
"key1", "value1",
|
||||||
"key2", 42.0));
|
"key2", 42.0));
|
||||||
assertThat(update.parsedMetadata()).isEqualTo(expectedParsedMetadata);
|
assertThat(update.parsedMetadata().get("FILTER_METADATA"))
|
||||||
|
.isEqualTo(expectedParsedMetadata.get("FILTER_METADATA"));
|
||||||
|
assertThat(update.parsedMetadata().get("AUDIENCE_METADATA"))
|
||||||
|
.isInstanceOf(AudienceWrapper.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -2519,8 +2522,7 @@ public class GrpcXdsClientImplDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata()
|
public void processCluster_metadataKeyCollision_resolvesToTypedMetadata() throws Exception {
|
||||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
|
||||||
MetadataRegistry metadataRegistry = MetadataRegistry.getInstance();
|
MetadataRegistry metadataRegistry = MetadataRegistry.getInstance();
|
||||||
|
|
||||||
MetadataValueParser testParser =
|
MetadataValueParser testParser =
|
||||||
|
@ -2575,8 +2577,7 @@ public class GrpcXdsClientImplDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseNonAggregateCluster_withHttp11ProxyTransportSocket()
|
public void parseNonAggregateCluster_withHttp11ProxyTransportSocket() throws Exception {
|
||||||
throws ResourceInvalidException, InvalidProtocolBufferException {
|
|
||||||
XdsClusterResource.isEnabledXdsHttpConnect = true;
|
XdsClusterResource.isEnabledXdsHttpConnect = true;
|
||||||
|
|
||||||
Http11ProxyUpstreamTransport http11ProxyUpstreamTransport =
|
Http11ProxyUpstreamTransport http11ProxyUpstreamTransport =
|
||||||
|
|
|
@ -291,7 +291,7 @@ public class XdsTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private static ImmutableMap<String, ?> getWrrLbConfigAsMap() throws IOException {
|
static ImmutableMap<String, ?> getWrrLbConfigAsMap() throws IOException {
|
||||||
String lbConfigStr = "{\"wrr_locality_experimental\" : "
|
String lbConfigStr = "{\"wrr_locality_experimental\" : "
|
||||||
+ "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}";
|
+ "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}";
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue