diff --git a/binder/src/main/java/io/grpc/binder/internal/Outbound.java b/binder/src/main/java/io/grpc/binder/internal/Outbound.java index f395fe1701..7db5bf0fbe 100644 --- a/binder/src/main/java/io/grpc/binder/internal/Outbound.java +++ b/binder/src/main/java/io/grpc/binder/internal/Outbound.java @@ -19,7 +19,6 @@ package io.grpc.binder.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; -import static java.lang.Math.max; import android.os.Parcel; import com.google.errorprone.annotations.concurrent.GuardedBy; @@ -397,8 +396,7 @@ abstract class Outbound { @GuardedBy("this") void setDeadline(Deadline deadline) { headers.discardAll(TIMEOUT_KEY); - long effectiveTimeoutNanos = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); - headers.put(TIMEOUT_KEY, effectiveTimeoutNanos); + headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS)); } } diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index bb346657d5..9718f8c517 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; -import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -124,8 +123,7 @@ public abstract class AbstractClientStream extends AbstractStream @Override public void setDeadline(Deadline deadline) { headers.discardAll(TIMEOUT_KEY); - long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); - headers.put(TIMEOUT_KEY, effectiveTimeout); + headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS)); } @Override diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 65c083f10e..658a678547 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -651,12 +651,14 @@ public final class GrpcUtil { static class TimeoutMarshaller implements Metadata.AsciiMarshaller { @Override - public String toAsciiString(Long timeoutNanos) { + public String toAsciiString(Long timeoutNanosObject) { long cutoff = 100000000; + // Timeout checking is inherently racy. RPCs with timeouts in the past ideally don't even get + // here, but if the timeout is expired assume that happened recently and adjust it to the + // smallest allowed timeout + long timeoutNanos = Math.max(1, timeoutNanosObject); TimeUnit unit = TimeUnit.NANOSECONDS; - if (timeoutNanos < 0) { - throw new IllegalArgumentException("Timeout too small"); - } else if (timeoutNanos < cutoff) { + if (timeoutNanos < cutoff) { return timeoutNanos + "n"; } else if (timeoutNanos < cutoff * 1000L) { return unit.toMicros(timeoutNanos) + "u"; diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 18fafe6557..8f14b74035 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -465,6 +465,24 @@ public class AbstractClientStreamTest { .isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600)); } + @Test + public void setDeadline_thePastBecomesPositive() { + AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class); + ClientStream stream = new BaseAbstractClientStream( + allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx, + transportTracer); + + stream.setDeadline(Deadline.after(-1, TimeUnit.NANOSECONDS)); + stream.start(mockListener); + + ArgumentCaptor headersCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(sink).writeHeaders(headersCaptor.capture(), ArgumentMatchers.any()); + + Metadata headers = headersCaptor.getValue(); + assertThat(headers.get(Metadata.Key.of("grpc-timeout", Metadata.ASCII_STRING_MARSHALLER))) + .isEqualTo("1n"); + } + @Test public void appendTimeoutInsight() { InsightBuilder insight = new InsightBuilder(); diff --git a/core/src/test/java/io/grpc/internal/GrpcUtilTest.java b/core/src/test/java/io/grpc/internal/GrpcUtilTest.java index 229c593ef8..c243790028 100644 --- a/core/src/test/java/io/grpc/internal/GrpcUtilTest.java +++ b/core/src/test/java/io/grpc/internal/GrpcUtilTest.java @@ -98,8 +98,8 @@ public class GrpcUtilTest { GrpcUtil.TimeoutMarshaller marshaller = new GrpcUtil.TimeoutMarshaller(); // nanos - assertEquals("0n", marshaller.toAsciiString(0L)); - assertEquals(0L, (long) marshaller.parseAsciiString("0n")); + assertEquals("1n", marshaller.toAsciiString(1L)); + assertEquals(1L, (long) marshaller.parseAsciiString("1n")); assertEquals("99999999n", marshaller.toAsciiString(99999999L)); assertEquals(99999999L, (long) marshaller.parseAsciiString("99999999n")); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 2ddaba751e..0f18efe078 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -53,6 +53,7 @@ import io.grpc.BinaryLog; import io.grpc.Channel; import io.grpc.Compressor; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Grpc; import io.grpc.HandlerRegistry; import io.grpc.IntegerMarshaller; @@ -1146,11 +1147,21 @@ public class ServerImplTest { @Test public void testContextExpiredBeforeStreamCreate_StreamCancelNotCalledBeforeSetListener() throws Exception { + builder.ticker = new Deadline.Ticker() { + private long time; + + @Override + public long nanoTime() { + time += 1000; + return time; + } + }; + AtomicBoolean contextCancelled = new AtomicBoolean(false); AtomicReference context = new AtomicReference<>(); AtomicReference> callReference = new AtomicReference<>(); - testStreamClose_setup(callReference, context, contextCancelled, 0L); + testStreamClose_setup(callReference, context, contextCancelled, 1L); // This assert that stream.setListener(jumpListener) is called before stream.cancel(), which // prevents extremely short deadlines causing NPEs. diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index e294b4eb63..e31696eb63 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -18,7 +18,6 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; -import static java.lang.Math.max; import com.google.common.base.MoreObjects; import com.google.common.io.ByteStreams; @@ -939,8 +938,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @Override public void setDeadline(Deadline deadline) { headers.discardAll(TIMEOUT_KEY); - long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); - headers.put(TIMEOUT_KEY, effectiveTimeout); + headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS)); } @Override