diff --git a/api/src/main/java/io/grpc/CallOptions.java b/api/src/main/java/io/grpc/CallOptions.java index 9c54bf5026..25c4df386a 100644 --- a/api/src/main/java/io/grpc/CallOptions.java +++ b/api/src/main/java/io/grpc/CallOptions.java @@ -512,6 +512,7 @@ public final class CallOptions { builder.waitForReady = other.waitForReady; builder.maxInboundMessageSize = other.maxInboundMessageSize; builder.maxOutboundMessageSize = other.maxOutboundMessageSize; + builder.onReadyThreshold = other.onReadyThreshold; return builder; } @@ -527,6 +528,7 @@ public final class CallOptions { .add("waitForReady", isWaitForReady()) .add("maxInboundMessageSize", maxInboundMessageSize) .add("maxOutboundMessageSize", maxOutboundMessageSize) + .add("onReadyThreshold", onReadyThreshold) .add("streamTracerFactories", streamTracerFactories) .toString(); } diff --git a/api/src/test/java/io/grpc/CallOptionsTest.java b/api/src/test/java/io/grpc/CallOptionsTest.java index f4c98c1369..cc90a9799d 100644 --- a/api/src/test/java/io/grpc/CallOptionsTest.java +++ b/api/src/test/java/io/grpc/CallOptionsTest.java @@ -81,6 +81,16 @@ public class CallOptionsTest { .isFalse(); } + @Test + public void withOnReadyThreshold() { + int onReadyThreshold = 1024; + CallOptions callOptions = CallOptions.DEFAULT.withOnReadyThreshold(onReadyThreshold); + callOptions = callOptions.withWaitForReady(); + assertThat(callOptions.getOnReadyThreshold()).isEqualTo(onReadyThreshold); + callOptions = callOptions.clearOnReadyThreshold(); + assertThat(callOptions.getOnReadyThreshold()).isNull(); + } + @Test public void allWiths() { assertThat(allSet.getAuthority()).isSameInstanceAs(sampleAuthority); @@ -148,6 +158,7 @@ public class CallOptionsTest { .withCallCredentials(null) .withMaxInboundMessageSize(44) .withMaxOutboundMessageSize(55) + .withOnReadyThreshold(1024) .toString(); assertThat(actual).contains("deadline=null"); @@ -159,6 +170,7 @@ public class CallOptionsTest { assertThat(actual).contains("waitForReady=true"); assertThat(actual).contains("maxInboundMessageSize=44"); assertThat(actual).contains("maxOutboundMessageSize=55"); + assertThat(actual).contains("onReadyThreshold=1024"); assertThat(actual).contains("streamTracerFactories=[tracerFactory1, tracerFactory2]"); } diff --git a/stub/src/main/java/io/grpc/stub/AbstractStub.java b/stub/src/main/java/io/grpc/stub/AbstractStub.java index efda8799d7..0b6f86f2ac 100644 --- a/stub/src/main/java/io/grpc/stub/AbstractStub.java +++ b/stub/src/main/java/io/grpc/stub/AbstractStub.java @@ -252,6 +252,16 @@ public abstract class AbstractStub> { return build(channel, callOptions.withMaxOutboundMessageSize(maxSize)); } + /** + * Returns a new stub that limits the maximum number of bytes per stream in the queue. + * + * @since 1.1.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021") + public final S withOnReadyThreshold(int numBytes) { + return build(channel, callOptions.withOnReadyThreshold(numBytes)); + } + /** * A factory class for stub. * diff --git a/stub/src/test/java/io/grpc/stub/BaseAbstractStubTest.java b/stub/src/test/java/io/grpc/stub/BaseAbstractStubTest.java index cc5d578544..9f7f10d829 100644 --- a/stub/src/test/java/io/grpc/stub/BaseAbstractStubTest.java +++ b/stub/src/test/java/io/grpc/stub/BaseAbstractStubTest.java @@ -16,6 +16,7 @@ package io.grpc.stub; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -90,4 +91,16 @@ abstract class BaseAbstractStubTest> { assertEquals(callOptions.getExecutor(), executor); } + + @Test + public void withOnReadyThreshold() { + T stub = create(channel); + CallOptions callOptions = stub.getCallOptions(); + assertNull(callOptions.getOnReadyThreshold()); + + int onReadyThreshold = 1024; + stub = stub.withOnReadyThreshold(onReadyThreshold); + callOptions = stub.getCallOptions(); + assertThat(callOptions.getOnReadyThreshold()).isEqualTo(onReadyThreshold); + } }