xds: add "resource_timer_is_transient_failure" server feature (#12063)

This commit is contained in:
MV Shiva 2025-07-15 15:33:02 +05:30 committed by GitHub
parent a8de9f07ab
commit 5a8326f1c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 176 additions and 41 deletions

View File

@ -63,17 +63,20 @@ public abstract class Bootstrapper {
public abstract boolean isTrustedXdsServer();
public abstract boolean resourceTimerIsTransientError();
@VisibleForTesting
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, false, false);
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
false, false, false);
}
@VisibleForTesting
public static ServerInfo create(
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
boolean isTrustedXdsServer) {
boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
ignoreResourceDeletion, isTrustedXdsServer);
ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
}
}

View File

@ -43,6 +43,8 @@ public abstract class BootstrapperImpl extends Bootstrapper {
public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
"GRPC_EXPERIMENTAL_XDS_FALLBACK";
public static final String GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING =
"GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING";
// Client features.
@VisibleForTesting
@ -54,10 +56,16 @@ public abstract class BootstrapperImpl extends Bootstrapper {
// Server features.
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
private static final String
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
@VisibleForTesting
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
@VisibleForTesting
public static boolean xdsDataErrorHandlingEnabled
= GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING, false);
protected final XdsLogger logger;
protected FileReader reader = LocalFileReader.INSTANCE;
@ -247,6 +255,7 @@ public abstract class BootstrapperImpl extends Bootstrapper {
Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);
boolean resourceTimerIsTransientError = false;
boolean ignoreResourceDeletion = false;
// "For forward compatibility reasons, the client will ignore any entry in the list that it
// does not understand, regardless of type."
@ -254,11 +263,14 @@ public abstract class BootstrapperImpl extends Bootstrapper {
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
}
servers.add(
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
serverFeatures != null
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER)));
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
resourceTimerIsTransientError));
}
return servers.build();
}

View File

@ -199,6 +199,10 @@ public abstract class XdsClient {
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
}
public static ResourceMetadata newResourceMetadataTimeout() {
return new ResourceMetadata(ResourceMetadataStatus.TIMEOUT, "", 0, false, null, null);
}
public static ResourceMetadata newResourceMetadataAcked(
Any rawResource, String version, long updateTimeNanos) {
checkNotNull(rawResource, "rawResource");
@ -256,7 +260,7 @@ public abstract class XdsClient {
* config_dump.proto</a>
*/
public enum ResourceMetadataStatus {
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED, TIMEOUT
}
/**

View File

@ -18,6 +18,7 @@ package io.grpc.xds.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled;
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
@ -67,6 +68,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30;
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@ -738,6 +740,9 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
// When client becomes ready, it triggers a restartTimer for all relevant subscribers.
return;
}
ServerInfo serverInfo = activeCpc.getServerInfo();
int timeoutSec = xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError()
? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC;
class ResourceNotFound implements Runnable {
@Override
@ -761,8 +766,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
respTimer.cancel();
}
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService);
}
void stopTimer() {
@ -840,6 +844,8 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
boolean resourceTimerIsTransientError =
xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
@ -854,14 +860,20 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
if (!absent) {
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
metadata = resourceTimerIsTransientError ? ResourceMetadata.newResourceMetadataTimeout() :
ResourceMetadata.newResourceMetadataDoesNotExist();
for (ResourceWatcher<T> watcher : watchers.keySet()) {
if (processingTracker != null) {
processingTracker.startTask();
}
watchers.get(watcher).execute(() -> {
try {
watcher.onResourceDoesNotExist(resource);
if (resourceTimerIsTransientError) {
watcher.onError(Status.UNAVAILABLE.withDescription(
"Timed out waiting for resource " + resource + " from xDS server"));
} else {
watcher.onResourceDoesNotExist(resource);
}
} finally {
if (processingTracker != null) {
processingTracker.onComplete();

View File

@ -3549,7 +3549,7 @@ public class GrpcXdsClientImplDataTest {
private XdsResourceType.Args getXdsResourceTypeArgs(boolean isTrustedServer) {
return new XdsResourceType.Args(
ServerInfo.create("http://td", "", false, isTrustedServer), "1.0", null, null, null, null
ServerInfo.create("http://td", "", false, isTrustedServer, false), "1.0", null, null, null, null
);
}
}

View File

@ -85,6 +85,7 @@ import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.BootstrapperImpl;
import io.grpc.xds.client.EnvoyProtoData.Node;
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.client.Locality;
@ -145,7 +146,7 @@ import org.mockito.verification.VerificationMode;
public abstract class GrpcXdsClientImplTestBase {
private static final String SERVER_URI = "trafficdirector.googleapis.com";
private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com";
private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com";
private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com";
private static final String LDS_RESOURCE = "listener.googleapis.com";
private static final String RDS_RESOURCE = "route-configuration.googleapis.com";
@ -304,6 +305,30 @@ public abstract class GrpcXdsClientImplTestBase {
private final BindableService adsService = createAdsService();
private final BindableService lrsService = createLrsService();
private XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override
public XdsTransport create(ServerInfo serverInfo) {
if (serverInfo.target().equals(SERVER_URI)) {
return new GrpcXdsTransport(channel);
}
if (serverInfo.target().equals(SERVER_URI_CUSTOM_AUTHORITY)) {
if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForCustomAuthority);
}
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForEmptyAuthority);
}
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
}
};
@Before
public void setUp() throws IOException {
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
@ -322,32 +347,9 @@ public abstract class GrpcXdsClientImplTestBase {
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override
public XdsTransport create(ServerInfo serverInfo) {
if (serverInfo.target().equals(SERVER_URI)) {
return new GrpcXdsTransport(channel);
}
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForCustomAuthority);
}
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForEmptyAuthority);
}
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
}
};
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
true);
true, false);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
@ -357,7 +359,7 @@ public abstract class GrpcXdsClientImplTestBase {
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
@ -3155,6 +3157,108 @@ public abstract class GrpcXdsClientImplTestBase {
verify(anotherWatcher).onError(any());
}
@Test
@SuppressWarnings("unchecked")
public void resourceTimerIsTransientError_schedulesExtendedTimeout() {
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS,
false, true, true);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(serverInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of())
.build();
xdsClient = new XdsClientImpl(
xdsTransportFactory,
bootstrapInfo,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
xdsClientMetricReporter);
ResourceWatcher<CdsUpdate> watcher = mock(ResourceWatcher.class);
String resourceName = "cluster.googleapis.com";
xdsClient.watchXdsResource(
XdsClusterResource.getInstance(),
resourceName,
watcher,
fakeClock.getScheduledExecutorService());
ScheduledTask task = Iterables.getOnlyElement(
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
assertThat(task.getDelay(TimeUnit.SECONDS))
.isEqualTo(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC);
fakeClock.runDueTasks();
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
}
@Test
@SuppressWarnings("unchecked")
public void resourceTimerIsTransientError_callsOnErrorUnavailable() {
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
true, true);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.of())))
.build();
xdsClient = new XdsClientImpl(
xdsTransportFactory,
bootstrapInfo,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
xdsClientMetricReporter);
String timeoutResource = CDS_RESOURCE + "_timeout";
ResourceWatcher<CdsUpdate> timeoutWatcher = mock(ResourceWatcher.class);
xdsClient.watchXdsResource(
XdsClusterResource.getInstance(),
timeoutResource,
timeoutWatcher,
fakeClock.getScheduledExecutorService());
assertThat(resourceDiscoveryCalls).hasSize(1);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.verifyRequest(CDS, ImmutableList.of(timeoutResource), "", "", NODE);
fakeClock.forwardTime(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
fakeClock.runDueTasks();
ArgumentCaptor<Status> errorCaptor = ArgumentCaptor.forClass(Status.class);
verify(timeoutWatcher).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo(
"Timed out waiting for resource " + timeoutResource + " from xDS server");
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
}
private Answer<Void> blockUpdate(CyclicBarrier barrier) {
return new Answer<Void>() {
@Override
@ -4220,7 +4324,7 @@ public abstract class GrpcXdsClientImplTestBase {
private BootstrapInfo buildBootStrap(String serverUri) {
ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
ignoreResourceDeletion(), true);
ignoreResourceDeletion(), true, false);
return Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
@ -4230,7 +4334,7 @@ public abstract class GrpcXdsClientImplTestBase {
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",

View File

@ -368,13 +368,13 @@ public class XdsNameResolverTest {
String serviceAuthority = "[::FFFF:129.144.52.38]:80";
bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true, true)))
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))
.node(Node.newBuilder().build())
.authorities(
ImmutableMap.of(targetAuthority, AuthorityInfo.create(
"xdstp://" + targetAuthority + "/envoy.config.listener.v3.Listener/%s?foo=1&bar=2",
ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true, true)))))
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))))
.build();
expectedLdsResourceName = "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/"
+ "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified

View File

@ -203,7 +203,7 @@ public class CommonBootstrapperTestUtils {
List<ServerInfo> serverInfos = new ArrayList<>();
for (String uri : serverUris) {
serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true));
serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true, false));
}
EnvoyProtoData.Node node = EnvoyProtoData.Node.newBuilder().setId("node-id").build();