diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 8d0d656859..d0a6456c43 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -37,6 +37,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.base.Optional; @@ -95,6 +97,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContext; import io.netty.util.AsciiString; +import io.netty.util.ReferenceCountUtil; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -187,6 +190,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); // Send a single RPC and wait for the response. new Rpc(transport).halfClose().waitForResponse(); @@ -244,6 +248,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); new Rpc(transport, new Metadata()).halfClose().waitForResponse(); @@ -261,6 +266,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -287,6 +293,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); } + verify(clientTransportListener, timeout(5000).times(2)).transportReady(); // Send a single RPC on each transport. final List rpcs = new ArrayList<>(transports.size()); @@ -316,6 +323,7 @@ public class NettyClientTransportTest { failureStatus.asRuntimeException()); } }); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -349,6 +357,7 @@ public class NettyClientTransportTest { ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext); final NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -378,6 +387,7 @@ public class NettyClientTransportTest { callMeMaybe(transport.start(clientTransportListener)); final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException()); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -409,6 +419,7 @@ public class NettyClientTransportTest { } } }); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -428,6 +439,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS // has been received by the remote endpoint. @@ -579,6 +591,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -612,6 +625,7 @@ public class NettyClientTransportTest { longStringOfA); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); AtomicBoolean foundExpectedHeaderBytes = new AtomicBoolean(false); @@ -641,6 +655,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -685,6 +700,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -703,6 +719,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -715,6 +732,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -808,6 +826,7 @@ public class NettyClientTransportTest { assertEquals(true, clientExecutorPool.isInUse()); final NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); // closing the negotiators should return the executors back to pool, and release the resource @@ -1098,9 +1117,15 @@ public class NettyClientTransportTest { this.grpcHandler = grpcHandler; } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // Prevent any data being passed to NettyClientHandler + ReferenceCountUtil.release(msg); + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - ctx.pipeline().addBefore(ctx.name(), null, grpcHandler); + ctx.pipeline().addAfter(ctx.name(), null, grpcHandler); } public void fail(ChannelHandlerContext ctx, Throwable cause) {