Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener
This commit is contained in:
James Duong 2024-03-21 22:37:26 +00:00 committed by GitHub
parent 68eb639b1c
commit 2c83ef0632
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 330 additions and 21 deletions

View File

@ -79,6 +79,8 @@ public final class CallOptions {
private final Integer maxInboundMessageSize;
@Nullable
private final Integer maxOutboundMessageSize;
@Nullable
private final Integer onReadyThreshold;
private CallOptions(Builder builder) {
this.deadline = builder.deadline;
@ -91,6 +93,7 @@ public final class CallOptions {
this.waitForReady = builder.waitForReady;
this.maxInboundMessageSize = builder.maxInboundMessageSize;
this.maxOutboundMessageSize = builder.maxOutboundMessageSize;
this.onReadyThreshold = builder.onReadyThreshold;
}
static class Builder {
@ -105,6 +108,7 @@ public final class CallOptions {
Boolean waitForReady;
Integer maxInboundMessageSize;
Integer maxOutboundMessageSize;
Integer onReadyThreshold;
private CallOptions build() {
return new CallOptions(this);
@ -203,6 +207,46 @@ public final class CallOptions {
return builder.build();
}
/**
* Specifies how many bytes must be queued before the call is
* considered not ready to send more messages.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public CallOptions withOnReadyThreshold(int numBytes) {
checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
Builder builder = toBuilder(this);
builder.onReadyThreshold = numBytes;
return builder.build();
}
/**
* Resets to the default number of bytes that must be queued before the
* call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
* 'wait for ready'</a> state.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public CallOptions clearOnReadyThreshold() {
Builder builder = toBuilder(this);
builder.onReadyThreshold = null;
return builder.build();
}
/**
* Returns to the default number of bytes that must be queued before the
* call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
* 'wait for ready'</a> state.
*
* @return null if the default threshold is used.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public Integer getOnReadyThreshold() {
return onReadyThreshold;
}
/**
* Returns the compressor's name.
*/

View File

@ -58,6 +58,12 @@ abstract class PartialForwardingServerCall<ReqT, RespT> extends ServerCall<ReqT,
delegate().setMessageCompression(enabled);
}
@Override
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public void setOnReadyThreshold(int numBytes) {
delegate().setOnReadyThreshold(numBytes);
}
@Override
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public void setCompression(String compressor) {

View File

@ -16,6 +16,8 @@
package io.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import javax.annotation.Nullable;
/**
@ -209,6 +211,19 @@ public abstract class ServerCall<ReqT, RespT> {
// noop
}
/**
* A hint to the call that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A call may ignore this property if
* unsupported. This may only be set before any messages are sent.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public void setOnReadyThreshold(int numBytes) {
checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
}
/**
* Returns the level of security guarantee in communications
*

View File

@ -64,6 +64,11 @@ final class MultiMessageServerStream implements ServerStream {
}
}
@Override
public void setOnReadyThreshold(int numBytes) {
// No-op
}
@Override
public boolean isReady() {
return outbound.isReady();

View File

@ -67,6 +67,11 @@ final class SingleMessageServerStream implements ServerStream {
}
}
@Override
public void setOnReadyThreshold(int numBytes) {
// No-op
}
@Override
public boolean isReady() {
return outbound.isReady();

View File

@ -243,9 +243,13 @@ public abstract class AbstractClientStream extends AbstractStream
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
TransportTracer transportTracer,
CallOptions options) {
super(maxMessageSize, statsTraceCtx, transportTracer);
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
if (options.getOnReadyThreshold() != null) {
this.setOnReadyThreshold(options.getOnReadyThreshold());
}
}
private void setFullStreamDecompression(boolean fullStreamDecompression) {

View File

@ -177,6 +177,19 @@ public abstract class AbstractServerStream extends AbstractStream
return statsTraceCtx;
}
/**
* A hint to the stream that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A stream may ignore this property
* if unsupported. This may only be set before any messages are sent.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
@Override
public void setOnReadyThreshold(int numBytes) {
super.setOnReadyThreshold(numBytes);
}
/**
* This should only be called from the transport thread (except for private interactions with
* {@code AbstractServerStream}).
@ -243,6 +256,8 @@ public abstract class AbstractServerStream extends AbstractStream
}
}
@Override
protected ServerStreamListener listener() {
return listener;

View File

@ -77,6 +77,19 @@ public abstract class AbstractStream implements Stream {
}
}
/**
* A hint to the stream that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A stream may ignore this property if
* unsupported. This may only be set during stream initialization before
* any messages are set.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
protected void setOnReadyThreshold(int numBytes) {
transportState().setOnReadyThreshold(numBytes);
}
/**
* Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
* (half closure on client; closure on server).
@ -143,6 +156,9 @@ public abstract class AbstractStream implements Stream {
@GuardedBy("onReadyLock")
private boolean deallocated;
@GuardedBy("onReadyLock")
private int onReadyThreshold;
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
@ -157,6 +173,7 @@ public abstract class AbstractStream implements Stream {
transportTracer);
// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
deframer = rawDeframer;
onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
}
final void optimizeForDirectExecutor() {
@ -178,6 +195,20 @@ public abstract class AbstractStream implements Stream {
*/
protected abstract StreamListener listener();
/**
* A hint to the stream that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A stream may ignore this property if
* unsupported. This may only be set before any messages are sent.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
void setOnReadyThreshold(int numBytes) {
synchronized (onReadyLock) {
this.onReadyThreshold = numBytes;
}
}
@Override
public void messagesAvailable(StreamListener.MessageProducer producer) {
listener().messagesAvailable(producer);
@ -259,7 +290,7 @@ public abstract class AbstractStream implements Stream {
private boolean isReady() {
synchronized (onReadyLock) {
return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
}
}
@ -316,9 +347,9 @@ public abstract class AbstractStream implements Stream {
synchronized (onReadyLock) {
checkState(allocated,
"onStreamAllocated was not called, but it seems the stream is active");
boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes;
boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.InternalMetadata;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
@ -67,8 +68,9 @@ public abstract class Http2ClientStreamTransportState extends AbstractClientStre
protected Http2ClientStreamTransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
TransportTracer transportTracer,
CallOptions options) {
super(maxMessageSize, statsTraceCtx, transportTracer, options);
}
/**

View File

@ -184,6 +184,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
stream.setMessageCompression(enable);
}
@Override
public void setOnReadyThreshold(int numBytes) {
stream.setOnReadyThreshold(numBytes);
}
@Override
public void setCompression(String compressorName) {
// Added here to give a better error message.

View File

@ -96,4 +96,15 @@ public interface ServerStream extends Stream {
* The HTTP/2 stream id, or {@code -1} if not supported.
*/
int streamId();
/**
* A hint to the stream that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A stream may ignore this property if
* unsupported. This may only be set during stream initialization before
* any messages are set.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
void setOnReadyThreshold(int numBytes);
}

View File

@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -482,6 +483,41 @@ public class AbstractClientStreamTest {
assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]");
}
@Test
public void overrideOnReadyThreshold() {
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
BaseTransportState state = new BaseTransportState(statsTraceCtx, transportTracer);
AbstractClientStream stream = new BaseAbstractClientStream(
allocator,
state,
sink,
statsTraceCtx,
transportTracer,
CallOptions.DEFAULT.withOnReadyThreshold(10),
true);
ClientStreamListener listener = new NoopClientStreamListener();
stream.start(listener);
state.onStreamAllocated();
// Stream should be ready. 0 bytes are queued.
assertTrue(stream.isReady());
// Queue some bytes above the custom threshold and check that the stream is not ready.
stream.onSendingBytes(100);
assertFalse(stream.isReady());
// Simulate a flush and verify ready now.
stream.transportState().onSentBytes(91);
assertTrue(stream.isReady());
}
@Test
public void resetOnReadyThreshold() {
CallOptions options = CallOptions.DEFAULT.withOnReadyThreshold(10);
assertEquals(Integer.valueOf(10), options.getOnReadyThreshold());
assertNull(options.clearOnReadyThreshold().getOnReadyThreshold());
}
/**
* No-op base class for testing.
*/
@ -517,9 +553,23 @@ public class AbstractClientStreamTest {
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
boolean useGet) {
super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet);
this(allocator, state, sink, statsTraceCtx, transportTracer, CallOptions.DEFAULT, useGet);
}
public BaseAbstractClientStream(
WritableBufferAllocator allocator,
TransportState state,
Sink sink,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
CallOptions callOptions,
boolean useGet) {
super(allocator, statsTraceCtx, transportTracer, new Metadata(), callOptions, useGet);
this.state = state;
this.sink = sink;
if (callOptions.getOnReadyThreshold() != null) {
this.transportState().setOnReadyThreshold(callOptions.getOnReadyThreshold());
}
}
@Override
@ -567,7 +617,7 @@ public class AbstractClientStreamTest {
}
public BaseTransportState(StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
super(DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer);
super(DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer, CallOptions.DEFAULT);
}
@Override

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -371,6 +372,15 @@ public class AbstractServerStreamTest {
assertEquals("bad", metadataCaptor.getValue().get(InternalStatus.MESSAGE_KEY));
}
@Test
public void changeOnReadyThreshold() {
stream.setListener(new ServerStreamListenerBase());
stream.transportState().onStreamAllocated();
stream.setOnReadyThreshold(Integer.MAX_VALUE);
stream.onSendingBytes(Integer.MAX_VALUE - 1);
assertTrue(stream.isReady());
}
private static class ServerStreamListenerBase implements ServerStreamListener {
@Override
public void messagesAvailable(MessageProducer producer) {

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import io.grpc.CallOptions;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
@ -347,9 +348,22 @@ public class Http2ClientStreamTransportStateTest {
assertEquals(Code.UNKNOWN, statusCaptor.getValue().getCode());
}
@Test
public void transportStateWithOnReadyThreshold() {
BaseTransportState state = new BaseTransportState(transportTracer,
CallOptions.DEFAULT.withOnReadyThreshold(Integer.MAX_VALUE));
assertEquals(Integer.MAX_VALUE, state.onReadyThreshold);
}
private static class BaseTransportState extends Http2ClientStreamTransportState {
private int onReadyThreshold;
public BaseTransportState(TransportTracer transportTracer, CallOptions options) {
super(DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP, transportTracer, options);
}
public BaseTransportState(TransportTracer transportTracer) {
super(DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP, transportTracer);
this(transportTracer, CallOptions.DEFAULT);
}
@Override
@ -367,5 +381,11 @@ public class Http2ClientStreamTransportStateTest {
public void runOnTransportThread(Runnable r) {
r.run();
}
@Override
void setOnReadyThreshold(int numBytes) {
onReadyThreshold = numBytes;
super.setOnReadyThreshold(numBytes);
}
}
}

View File

@ -147,6 +147,12 @@ public class ServerCallImplTest {
verify(stream).request(10);
}
@Test
public void setOnReadyThreshold() {
call.setOnReadyThreshold(10);
verify(stream).setOnReadyThreshold(10);
}
@Test
public void sendHeader_firstCall() {
Metadata headers = new Metadata();

View File

@ -124,7 +124,8 @@ class CronetClientStream extends AbstractClientStream {
this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
this.annotation = callOptions.getOption(CRONET_ANNOTATION_KEY);
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
callOptions);
// Tests expect the "plain" deframer behavior, not MigratingDeframer
// https://github.com/grpc/grpc-java/issues/7140
@ -270,8 +271,8 @@ class CronetClientStream extends AbstractClientStream {
public TransportState(
int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
TransportTracer transportTracer, CallOptions options) {
super(maxMessageSize, statsTraceCtx, transportTracer, options);
this.lock = Preconditions.checkNotNull(lock, "lock");
}

View File

@ -697,6 +697,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
public int streamId() {
return -1;
}
@Override
public void setOnReadyThreshold(int numBytes) {
// noop
}
}
private class InProcessClientStream implements ClientStream {

View File

@ -237,8 +237,9 @@ class NettyClientStream extends AbstractClientStream {
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
String methodName) {
super(maxMessageSize, statsTraceCtx, transportTracer);
String methodName,
CallOptions options) {
super(maxMessageSize, statsTraceCtx, transportTracer, options);
this.methodName = checkNotNull(methodName, "methodName");
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(eventLoop, "eventLoop");

View File

@ -188,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport {
maxMessageSize,
statsTraceCtx,
transportTracer,
method.getFullMethodName()) {
method.getFullMethodName(),
callOptions) {
@Override
protected Status statusFromFailedFuture(ChannelFuture f) {
return NettyClientTransport.this.statusFromFailedFuture(f);

View File

@ -1000,7 +1000,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
maxMessageSize,
StatsTraceContext.NOOP,
transportTracer,
"methodName");
"methodName",
CallOptions.DEFAULT);
}
@Override

View File

@ -563,7 +563,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
maxMessageSize,
StatsTraceContext.NOOP,
transportTracer,
"methodName");
"methodName",
CallOptions.DEFAULT);
}
@Override

View File

@ -99,7 +99,8 @@ class OkHttpClientStream extends AbstractClientStream {
outboundFlow,
transport,
initialWindowSize,
method.getFullMethodName());
method.getFullMethodName(),
callOptions);
}
@Override
@ -222,8 +223,9 @@ class OkHttpClientStream extends AbstractClientStream {
OutboundFlowController outboundFlow,
OkHttpClientTransport transport,
int initialWindowSize,
String methodName) {
super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
String methodName,
CallOptions options) {
super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer(), options);
this.lock = checkNotNull(lock, "lock");
this.frameWriter = frameWriter;
this.outboundFlow = outboundFlow;

View File

@ -64,6 +64,19 @@ public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver
*/
public abstract void setOnCancelHandler(Runnable onCancelHandler);
/**
* A hint to the call that specifies how many bytes must be queued before
* {@link #isReady()} will return false. A call may ignore this property if
* unsupported. This may only be set during stream initialization before
* any messages are set.
*
* @param numBytes The number of bytes that must be queued. Must be a
* positive integer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public abstract void setOnReadyThreshold(int numBytes);
/**
* Sets the compression algorithm to use for the call. May only be called before sending any
* messages. Default gRPC servers support the "gzip" compressor.

View File

@ -422,6 +422,14 @@ public final class ServerCalls {
this.onCancelHandler = onCancelHandler;
}
@Override
public void setOnReadyThreshold(int numBytes) {
checkState(!frozen, "Cannot alter setOnReadyThreshold after initialization. May only be "
+ "called during the initial call to the application, before the service returns its "
+ "StreamObserver");
call.setOnReadyThreshold(numBytes);
}
@Override
public void disableAutoInboundFlowControl() {
disableAutoRequest();

View File

@ -451,6 +451,31 @@ public class ServerCallsTest {
assertEquals(2, onReadyCalled.get());
}
@Test
public void setOnReadyThreshold() throws Exception {
final int testThreshold = Integer.MAX_VALUE;
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnReadyThreshold(req);
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
callListener.onReady();
callListener.onMessage(testThreshold);
// half-closing triggers the unary request delivery and onReady
callListener.onHalfClose();
assertEquals(testThreshold, serverCall.getOnReadyThreshold());
}
@Test
public void clientSendsOne_errorMissingRequest_unary() {
ServerCallRecorder serverCall = new ServerCallRecorder(UNARY_METHOD);
@ -626,6 +651,7 @@ public class ServerCallsTest {
private Status status;
private boolean isCancelled;
private boolean isReady;
private int onReadyThreshold;
public ServerCallRecorder(MethodDescriptor<Integer, Integer> methodDescriptor) {
this.methodDescriptor = methodDescriptor;
@ -660,9 +686,19 @@ public class ServerCallsTest {
return isReady;
}
@Override
public void setOnReadyThreshold(int numBytes) {
super.setOnReadyThreshold(numBytes);
onReadyThreshold = numBytes;
}
@Override
public MethodDescriptor<Integer, Integer> getMethodDescriptor() {
return methodDescriptor;
}
public int getOnReadyThreshold() {
return onReadyThreshold;
}
}
}

View File

@ -219,6 +219,17 @@ public final class TransmitStatusRuntimeExceptionInterceptor implements ServerIn
});
}
@Override
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
public void setOnReadyThreshold(final int numBytes) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.setOnReadyThreshold(numBytes);
}
});
}
@Override
public void setCompression(final String compressor) {
serializingExecutor.execute(new Runnable() {