From e61b03cb9f3575dff82b5adf19181b61b3d62f5e Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 7 Jan 2025 08:31:15 -0800 Subject: [PATCH] netty: Fix getAttributes() data races in NettyClientTransportTest Since approximately the LBv2 API (the current API) was introduced, gRPC won't use a transport until it is ready. Long ago, transports could be used before they were ready and these old tests were not waiting for the negotiator to complete before starting. We need them to wait for the handshake to complete to avoid a test-only data race in getAttributes() noticed by TSAN. Throwing away data frames in the Noop handshaker is necessary to act like a normal handshaker; they don't allow data frames to pass until the handshake is complete. Without the handling, it goes through invalid code paths in NettyClientHandler where a terminated transport becomes ready, and a similar data race. ``` Write of size 4 at 0x00008db31e2c by thread T37: #0 io.grpc.netty.NettyClientHandler.handleProtocolNegotiationCompleted(Lio/grpc/Attributes;Lio/grpc/InternalChannelz$Security;)V NettyClientHandler.java:517 #1 io.grpc.netty.ProtocolNegotiators$GrpcNegotiationHandler.userEventTriggered(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V ProtocolNegotiators.java:937 #2 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Ljava/lang/Object;)V AbstractChannelHandlerContext.java:398 #3 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V AbstractChannelHandlerContext.java:376 #4 io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; AbstractChannelHandlerContext.java:368 #5 io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1107 #6 io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1011 ... Previous read of size 4 at 0x00008db31e2c by thread T4 (mutexes: write M0, write M1, write M2, write M3): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:953 #4 io.grpc.netty.NettyClientTransportTest.huffmanCodingShouldNotBePerformed()V NettyClientTransportTest.java:631 ... ``` ``` Read of size 4 at 0x00008f983a3c by thread T4 (mutexes: write M0, write M1): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:973 #4 io.grpc.netty.NettyClientTransportTest$Rpc.(Lio/grpc/netty/NettyClientTransport;)V NettyClientTransportTest.java:969 #5 io.grpc.netty.NettyClientTransportTest.handlerExceptionDuringNegotiatonPropagatesToStatus()V NettyClientTransportTest.java:425 ... Previous write of size 4 at 0x00008f983a3c by thread T56: #0 io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V NettyClientHandler.java:960 ... ``` --- .../grpc/netty/NettyClientTransportTest.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 8d0d656859..d0a6456c43 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -37,6 +37,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.base.Optional; @@ -95,6 +97,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContext; import io.netty.util.AsciiString; +import io.netty.util.ReferenceCountUtil; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -187,6 +190,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); // Send a single RPC and wait for the response. new Rpc(transport).halfClose().waitForResponse(); @@ -244,6 +248,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); new Rpc(transport, new Metadata()).halfClose().waitForResponse(); @@ -261,6 +266,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -287,6 +293,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); } + verify(clientTransportListener, timeout(5000).times(2)).transportReady(); // Send a single RPC on each transport. final List rpcs = new ArrayList<>(transports.size()); @@ -316,6 +323,7 @@ public class NettyClientTransportTest { failureStatus.asRuntimeException()); } }); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -349,6 +357,7 @@ public class NettyClientTransportTest { ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext); final NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -378,6 +387,7 @@ public class NettyClientTransportTest { callMeMaybe(transport.start(clientTransportListener)); final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException()); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -409,6 +419,7 @@ public class NettyClientTransportTest { } } }); + verify(clientTransportListener, timeout(5000)).transportTerminated(); Rpc rpc = new Rpc(transport).halfClose(); try { @@ -428,6 +439,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS // has been received by the remote endpoint. @@ -579,6 +591,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -612,6 +625,7 @@ public class NettyClientTransportTest { longStringOfA); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); AtomicBoolean foundExpectedHeaderBytes = new AtomicBoolean(false); @@ -641,6 +655,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); try { // Send a single RPC and wait for the response. @@ -685,6 +700,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator()); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -703,6 +719,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -715,6 +732,7 @@ public class NettyClientTransportTest { NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); @@ -808,6 +826,7 @@ public class NettyClientTransportTest { assertEquals(true, clientExecutorPool.isInUse()); final NettyClientTransport transport = newTransport(negotiator); callMeMaybe(transport.start(clientTransportListener)); + verify(clientTransportListener, timeout(5000)).transportReady(); Rpc rpc = new Rpc(transport).halfClose(); rpc.waitForResponse(); // closing the negotiators should return the executors back to pool, and release the resource @@ -1098,9 +1117,15 @@ public class NettyClientTransportTest { this.grpcHandler = grpcHandler; } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // Prevent any data being passed to NettyClientHandler + ReferenceCountUtil.release(msg); + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - ctx.pipeline().addBefore(ctx.name(), null, grpcHandler); + ctx.pipeline().addAfter(ctx.name(), null, grpcHandler); } public void fail(ChannelHandlerContext ctx, Throwable cause) {