core: updates the backoff range as per the A6 redefinition (#11858)

* core: updates the backoff range being used from [0, 1] to [0.8, 1.2] as per the A6 redefinition

* adds a flag for experimental jitter

* xds: Allow FaultFilter's interceptor to be reused

This is the only usage of PickSubchannelArgs when creating a filter's
ClientInterceptor, and a follow-up commit will remove the argument and
actually reuse the interceptors. Other filter's interceptors can
already be reused.

There doesn't seem to be any significant loss of legibility by making
FaultFilter a more ordinary interceptor, but the change does cause the
ForwardingClientCall to be present when faultDelay is configured,
independent of whether the fault delay ends up being triggered.

Reusing interceptors will move more state management out of the RPC path
which will be more relevant with RLQS.

* netty: Removed 4096 min buffer size (#11856)

* netty: Removed 4096 min buffer size

* turns the flag in a var for better efficiency

---------

Co-authored-by: Eric Anderson <ejona@google.com>
This commit is contained in:
Abhishek Agrawal 2025-02-06 03:48:20 +05:30 committed by GitHub
parent 3142928fa3
commit 44e92e2c2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 55 additions and 49 deletions

View File

@ -846,6 +846,15 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
}
private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil
.getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true);
public static long intervalWithJitter(long intervalNanos) {
double inverseJitterFactor = isExperimentalRetryJitterEnabled
? 0.8 * random.nextDouble() + 0.4 : random.nextDouble();
return (long) (intervalNanos * inverseJitterFactor);
}
private static final class SavedCloseMasterListenerReason {
private final Status status;
private final RpcProgress progress;
@ -1066,7 +1075,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
if (pushbackMillis == null) {
if (isRetryableStatusCode) {
shouldRetry = true;
backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
backoffNanos = intervalWithJitter(nextBackoffIntervalNanos);
nextBackoffIntervalNanos = Math.min(
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
retryPolicy.maxBackoffNanos);

View File

@ -3875,7 +3875,7 @@ public class ManagedChannelImplTest {
Status.UNAVAILABLE, PROCESSED, new Metadata());
// in backoff
timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(6, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).hasSize(1);
verify(mockStream2, never()).start(any(ClientStreamListener.class));
@ -3894,7 +3894,7 @@ public class ManagedChannelImplTest {
assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
// backoff ends
timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(6, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).isEmpty();
verify(mockStream2).start(streamListenerCaptor.capture());
verify(mockLoadBalancer, never()).shutdown();

View File

@ -147,6 +147,17 @@ public class RetriableStreamTest {
private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
private final FakeClock fakeClock = new FakeClock();
private static long calculateBackoffWithRetries(int retryCount) {
// Calculate the exponential backoff delay with jitter
double exponent = retryCount > 0 ? Math.pow(BACKOFF_MULTIPLIER, retryCount) : 1;
long delay = (long) (INITIAL_BACKOFF_IN_SECONDS * exponent);
return RetriableStream.intervalWithJitter(delay);
}
private static long calculateMaxBackoff() {
return RetriableStream.intervalWithJitter(MAX_BACKOFF_IN_SECONDS);
}
private final class RecordedRetriableStream extends RetriableStream<String> {
RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers,
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
@ -307,7 +318,7 @@ public class RetriableStreamTest {
retriableStream.sendMessage("msg1 during backoff1");
retriableStream.sendMessage("msg2 during backoff1");
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
@ -364,9 +375,7 @@ public class RetriableStreamTest {
retriableStream.sendMessage("msg2 during backoff2");
retriableStream.sendMessage("msg3 during backoff2");
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
@ -459,7 +468,7 @@ public class RetriableStreamTest {
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -518,7 +527,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -584,7 +593,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -687,7 +696,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -821,7 +830,7 @@ public class RetriableStreamTest {
// send more requests during backoff
retriableStream.request(789);
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(sublistenerCaptor2.get());
inOrder.verify(mockStream2).request(3);
@ -875,7 +884,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).request(3);
@ -920,7 +929,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(retriableStreamRecorder).postCommit();
@ -1028,7 +1037,7 @@ public class RetriableStreamTest {
retriableStream.request(789);
readiness.add(retriableStream.isReady()); // expected false b/c in backoff
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
readiness.add(retriableStream.isReady()); // expected true
@ -1110,7 +1119,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -1160,13 +1169,12 @@ public class RetriableStreamTest {
listener1.closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
// send requests during backoff
retriableStream.request(3);
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS);
retriableStream.request(1);
verify(mockStream1, never()).request(anyInt());
@ -1207,7 +1215,7 @@ public class RetriableStreamTest {
// retry
listener1.closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
verify(retriableStreamRecorder).postCommit();
@ -1260,7 +1268,7 @@ public class RetriableStreamTest {
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder, never()).postCommit();
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
verify(mockStream2).isReady();
@ -1332,7 +1340,7 @@ public class RetriableStreamTest {
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1347,9 +1355,7 @@ public class RetriableStreamTest {
sublistenerCaptor2.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1364,10 +1370,7 @@ public class RetriableStreamTest {
sublistenerCaptor3.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1382,7 +1385,7 @@ public class RetriableStreamTest {
sublistenerCaptor4.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1397,7 +1400,7 @@ public class RetriableStreamTest {
sublistenerCaptor5.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1480,7 +1483,7 @@ public class RetriableStreamTest {
sublistenerCaptor3.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1495,9 +1498,7 @@ public class RetriableStreamTest {
sublistenerCaptor4.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1512,10 +1513,7 @@ public class RetriableStreamTest {
sublistenerCaptor5.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
@ -1804,7 +1802,7 @@ public class RetriableStreamTest {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -1907,7 +1905,7 @@ public class RetriableStreamTest {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -1923,8 +1921,7 @@ public class RetriableStreamTest {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(2);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -1960,7 +1957,7 @@ public class RetriableStreamTest {
.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS);
inOrder.verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);

View File

@ -303,7 +303,7 @@ public class RetryTest {
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
// 2nd attempt received
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
@ -348,7 +348,7 @@ public class RetryTest {
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1);
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
@ -366,7 +366,7 @@ public class RetryTest {
call.request(1);
assertInboundMessageRecorded();
assertInboundWireSizeRecorded(1);
assertRpcStatusRecorded(Status.Code.OK, 12000, 2);
assertRpcStatusRecorded(Status.Code.OK, 14000, 2);
assertRetryStatsRecorded(1, 0, 0);
}
@ -418,7 +418,7 @@ public class RetryTest {
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
elapseBackoff(10, SECONDS);
elapseBackoff(12, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
@ -431,7 +431,7 @@ public class RetryTest {
streamClosedLatch.countDown();
// The call listener is closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1);
assertRpcStatusRecorded(Code.CANCELLED, 19_000, 1);
assertRetryStatsRecorded(1, 0, 0);
}