diff --git a/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java b/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java index f86cea2fe3..cba18ba5b2 100644 --- a/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java +++ b/binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java @@ -77,7 +77,7 @@ final class MultiMessageServerStream implements ServerStream { } @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { try { synchronized (outbound) { outbound.sendHeaders(headers); diff --git a/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java b/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java index ac7a76054e..92e9ff4477 100644 --- a/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java +++ b/binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java @@ -80,7 +80,7 @@ final class SingleMessageServerStream implements ServerStream { } @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { pendingHeaders = headers; } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index 94cdfa4a57..d781cfa9b8 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -41,7 +41,7 @@ public abstract class AbstractServerStream extends AbstractStream * * @param headers the headers to be sent to client. */ - void writeHeaders(Metadata headers); + void writeHeaders(Metadata headers, boolean flush); /** * Sends an outbound frame to the remote end point. @@ -96,11 +96,11 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - public final void writeHeaders(Metadata headers) { + public final void writeHeaders(Metadata headers, boolean flush) { Preconditions.checkNotNull(headers, "headers"); headersSent = true; - abstractServerStreamSink().writeHeaders(headers); + abstractServerStreamSink().writeHeaders(headers, flush); } @Override diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 08aff5f038..1bfee21e05 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -142,7 +142,7 @@ final class ServerCallImpl extends ServerCall { // Don't check if sendMessage has been called, since it requires that sendHeaders was already // called. sendHeadersCalled = true; - stream.writeHeaders(headers); + stream.writeHeaders(headers, !getMethodDescriptor().getType().serverSendsOneMessage()); } @Override diff --git a/core/src/main/java/io/grpc/internal/ServerStream.java b/core/src/main/java/io/grpc/internal/ServerStream.java index 658511939e..861d5f36cc 100644 --- a/core/src/main/java/io/grpc/internal/ServerStream.java +++ b/core/src/main/java/io/grpc/internal/ServerStream.java @@ -36,7 +36,7 @@ public interface ServerStream extends Stream { * * @param headers to send to client. */ - void writeHeaders(Metadata headers); + void writeHeaders(Metadata headers, boolean flush); /** * Closes the stream for both reading and writing. A status code of diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java index 9f6c4922aa..618af766c0 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java @@ -285,28 +285,28 @@ public class AbstractServerStreamTest { public void writeHeaders_failsOnNullHeaders() { thrown.expect(NullPointerException.class); - stream.writeHeaders(null); + stream.writeHeaders(null, true); } @Test public void writeHeaders() { Metadata headers = new Metadata(); - stream.writeHeaders(headers); - verify(sink).writeHeaders(same(headers)); + stream.writeHeaders(headers, true); + verify(sink).writeHeaders(same(headers), eq(true)); } @Test public void writeMessage_dontWriteDuplicateHeaders() { - stream.writeHeaders(new Metadata()); + stream.writeHeaders(new Metadata(), true); stream.writeMessage(new ByteArrayInputStream(new byte[]{})); // Make sure it wasn't called twice - verify(sink).writeHeaders(any(Metadata.class)); + verify(sink).writeHeaders(any(Metadata.class), eq(true)); } @Test public void writeMessage_ignoreIfFramerClosed() { - stream.writeHeaders(new Metadata()); + stream.writeHeaders(new Metadata(), true); stream.endOfMessages(); reset(sink); @@ -317,7 +317,7 @@ public class AbstractServerStreamTest { @Test public void writeMessage() { - stream.writeHeaders(new Metadata()); + stream.writeHeaders(new Metadata(), true); stream.writeMessage(new ByteArrayInputStream(new byte[]{})); stream.flush(); @@ -327,7 +327,7 @@ public class AbstractServerStreamTest { @Test public void writeMessage_closesStream() throws Exception { - stream.writeHeaders(new Metadata()); + stream.writeHeaders(new Metadata(), true); InputStream input = mock(InputStream.class, delegatesTo(new ByteArrayInputStream(new byte[1]))); stream.writeMessage(input); verify(input).close(); diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index cef21f6b7f..833f5109e3 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -153,7 +153,7 @@ public class ServerCallImplTest { call.sendHeaders(headers); - verify(stream).writeHeaders(headers); + verify(stream).writeHeaders(headers, false); } @Test @@ -162,7 +162,7 @@ public class ServerCallImplTest { headers.put(CONTENT_LENGTH_KEY, "123"); call.sendHeaders(headers); - verify(stream).writeHeaders(headers); + verify(stream).writeHeaders(headers, false); assertNull(headers.get(CONTENT_LENGTH_KEY)); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 458d18d16b..dd93e29620 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -681,7 +681,7 @@ public class ServerImplTest { Metadata responseHeaders = new Metadata(); responseHeaders.put(metadataKey, "response value"); call.sendHeaders(responseHeaders); - verify(stream).writeHeaders(responseHeaders); + verify(stream).writeHeaders(responseHeaders, true); verify(stream).setCompressor(isA(Compressor.class)); call.sendMessage(firstResponse); diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 6129bb706c..4dd0500cc3 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -290,7 +290,7 @@ public abstract class AbstractTransportTest { stream.flush(); stream.cancel(Status.CANCELLED); stream.flush(); - serverStreamCreation.stream.writeHeaders(new Metadata()); + serverStreamCreation.stream.writeHeaders(new Metadata(), true); serverStreamCreation.stream.flush(); serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("bar")); serverStreamCreation.stream.flush(); @@ -308,7 +308,7 @@ public abstract class AbstractTransportTest { stream.start(mockClientStreamListener2); serverStreamCreation = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); - serverStreamCreation.stream.writeHeaders(new Metadata()); + serverStreamCreation.stream.writeHeaders(new Metadata(), true); serverStreamCreation.stream.flush(); verify(mockClientStreamListener2, timeout(TIMEOUT_MS)).headersRead(any(Metadata.class)); @@ -468,7 +468,7 @@ public abstract class AbstractTransportTest { // Try to "flush" out any listener notifications on client and server. This also ensures that // the stream still functions. - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); clientStream.halfClose(); assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); @@ -886,7 +886,7 @@ public abstract class AbstractTransportTest { serverHeaders.put(binaryKey, "dup,value"); Metadata serverHeadersCopy = new Metadata(); serverHeadersCopy.merge(serverHeaders); - serverStream.writeHeaders(serverHeaders); + serverStream.writeHeaders(serverHeaders, true); Metadata headers = clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertNotNull(headers); assertAsciiMetadataValuesEqual(serverHeadersCopy.getAll(asciiKey), headers.getAll(asciiKey)); @@ -1010,7 +1010,7 @@ public abstract class AbstractTransportTest { clientStream.halfClose(); assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); Status status = Status.OK.withDescription("Nice talking to you"); @@ -1047,7 +1047,7 @@ public abstract class AbstractTransportTest { ServerStream serverStream = serverStreamCreation.stream; ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); Status strippedStatus = Status.OK.withDescription("Hello. Goodbye."); @@ -1273,7 +1273,7 @@ public abstract class AbstractTransportTest { assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertTrue(serverStream.isReady()); - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); serverStream.writeMessage(methodDescriptor.streamRequest("foo")); serverStream.flush(); @@ -1362,7 +1362,7 @@ public abstract class AbstractTransportTest { ServerStream serverStream = serverStreamCreation.stream; ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); String largeMessage; { @@ -1559,7 +1559,7 @@ public abstract class AbstractTransportTest { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata()); + server.stream.writeHeaders(new Metadata(), true); server.stream.writeMessage(methodDescriptor.streamResponse("response")); server.stream.close(Status.INTERNAL, new Metadata()); @@ -1868,7 +1868,7 @@ public abstract class AbstractTransportTest { assertEquals(0, clientBefore.lastMessageReceivedTimeNanos); clientStream.request(1); - serverStream.writeHeaders(new Metadata()); + serverStream.writeHeaders(new Metadata(), true); serverStream.writeMessage(methodDescriptor.streamResponse("response")); serverStream.flush(); verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); @@ -1984,7 +1984,7 @@ public abstract class AbstractTransportTest { = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverStreamCreation.stream.request(1); - serverStreamCreation.stream.writeHeaders(tooLargeMetadata); + serverStreamCreation.stream.writeHeaders(tooLargeMetadata, true); serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("response")); serverStreamCreation.stream.close(Status.OK, new Metadata()); @@ -2029,7 +2029,7 @@ public abstract class AbstractTransportTest { = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverStreamCreation.stream.request(1); - serverStreamCreation.stream.writeHeaders(new Metadata()); + serverStreamCreation.stream.writeHeaders(new Metadata(), true); serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("response")); serverStreamCreation.stream.close(Status.OK, tooLargeMetadata); diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index dd1028fde7..88265ec8bb 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -546,7 +546,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 80c8d9fb3a..a44d8b4a64 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -94,13 +94,13 @@ class NettyServerStream extends AbstractServerStream { private class Sink implements AbstractServerStream.Sink { @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) { writeQueue.enqueue( SendResponseHeadersCommand.createHeaders( transportState(), Utils.convertServerHeaders(headers)), - true); + flush); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 61dfcb15ec..502c0f68a6 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -923,7 +923,7 @@ public class NettyClientTransportTest { public void streamCreated(ServerStream stream, String method, Metadata headers) { EchoServerStreamListener listener = new EchoServerStreamListener(stream, headers); stream.setListener(listener); - stream.writeHeaders(new Metadata()); + stream.writeHeaders(new Metadata(), true); stream.request(1); streamListeners.add(listener); } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java index 35f525e68e..e95a2a52bc 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java @@ -104,7 +104,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase sendHeadersCap = ArgumentCaptor.forClass(SendResponseHeadersCommand.class); @@ -130,7 +130,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase expectedHeaders = ImmutableListMultimap.copyOf(Utils.convertServerHeaders(headers)); - stream().writeHeaders(headers); + stream().writeHeaders(headers, true); ArgumentCaptor sendHeadersCap = ArgumentCaptor.forClass(SendResponseHeadersCommand.class); @@ -300,7 +300,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase responseHeaders = Headers.createResponseHeaders(metadata); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index 5ed8514b85..3f88c35e01 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -371,7 +371,7 @@ public class OkHttpServerTransportTest { assertThat(streamListener.messages.pop()).isEqualTo("Hello server"); assertThat(streamListener.halfClosedCalled).isTrue(); - streamListener.stream.writeHeaders(metadata("User-Data", "best data")); + streamListener.stream.writeHeaders(metadata("User-Data", "best data"), false); streamListener.stream.writeMessage(new ByteArrayInputStream("Howdy client".getBytes(UTF_8))); streamListener.stream.close(Status.OK, metadata("End-Metadata", "bye")); @@ -434,7 +434,7 @@ public class OkHttpServerTransportTest { assertThat(streamListener.messages.pop()).isEqualTo("Hello server"); assertThat(streamListener.halfClosedCalled).isTrue(); - streamListener.stream.writeHeaders(new Metadata()); + streamListener.stream.writeHeaders(new Metadata(), false); streamListener.stream.writeMessage(new ByteArrayInputStream("Howdy client".getBytes(UTF_8))); streamListener.stream.flush(); @@ -1034,7 +1034,7 @@ public class OkHttpServerTransportTest { streamListener.stream.request(1); pingPong(); assertThat(streamListener.messages.pop()).isEqualTo("Hello Server Pad Me!"); - streamListener.stream.writeHeaders(metadata("User-Data", "best data")); + streamListener.stream.writeHeaders(metadata("User-Data", "best data"), false); streamListener.stream.writeMessage(new ByteArrayInputStream("Howdy client".getBytes(UTF_8))); List
responseHeaders = Arrays.asList( new Header(":status", "200"), @@ -1211,7 +1211,7 @@ public class OkHttpServerTransportTest { pingPong(); assertThat(streamListener.messages.pop()).isEqualTo("Hello server"); - streamListener.stream.writeHeaders(metadata("User-Data", "best data")); + streamListener.stream.writeHeaders(metadata("User-Data", "best data"), false); streamListener.stream.writeMessage(new ByteArrayInputStream("Howdy client".getBytes(UTF_8))); streamListener.stream.flush(); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java index 0415eea942..b7ad6e0dec 100644 --- a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java @@ -225,7 +225,7 @@ final class ServletServerStream extends AbstractServerStream { final TrailerSupplier trailerSupplier = new TrailerSupplier(); @Override - public void writeHeaders(Metadata headers) { + public void writeHeaders(Metadata headers, boolean flush) { writeHeadersToServletResponse(headers); resp.setTrailerFields(trailerSupplier); try {