mirror of https://github.com/grpc/grpc-java.git
netty: Fix getAttributes() data races in NettyClientTransportTest
Since approximately the LBv2 API (the current API) was introduced, gRPC won't use a transport until it is ready. Long ago, transports could be used before they were ready and these old tests were not waiting for the negotiator to complete before starting. We need them to wait for the handshake to complete to avoid a test-only data race in getAttributes() noticed by TSAN. Throwing away data frames in the Noop handshaker is necessary to act like a normal handshaker; they don't allow data frames to pass until the handshake is complete. Without the handling, it goes through invalid code paths in NettyClientHandler where a terminated transport becomes ready, and a similar data race. ``` Write of size 4 at 0x00008db31e2c by thread T37: #0 io.grpc.netty.NettyClientHandler.handleProtocolNegotiationCompleted(Lio/grpc/Attributes;Lio/grpc/InternalChannelz$Security;)V NettyClientHandler.java:517 #1 io.grpc.netty.ProtocolNegotiators$GrpcNegotiationHandler.userEventTriggered(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V ProtocolNegotiators.java:937 #2 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Ljava/lang/Object;)V AbstractChannelHandlerContext.java:398 #3 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V AbstractChannelHandlerContext.java:376 #4 io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; AbstractChannelHandlerContext.java:368 #5 io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1107 #6 io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1011 ... Previous read of size 4 at 0x00008db31e2c by thread T4 (mutexes: write M0, write M1, write M2, write M3): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:953 #4 io.grpc.netty.NettyClientTransportTest.huffmanCodingShouldNotBePerformed()V NettyClientTransportTest.java:631 ... ``` ``` Read of size 4 at 0x00008f983a3c by thread T4 (mutexes: write M0, write M1): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:973 #4 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;)V NettyClientTransportTest.java:969 #5 io.grpc.netty.NettyClientTransportTest.handlerExceptionDuringNegotiatonPropagatesToStatus()V NettyClientTransportTest.java:425 ... Previous write of size 4 at 0x00008f983a3c by thread T56: #0 io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V NettyClientHandler.java:960 ... ```
This commit is contained in:
parent
ae109727d3
commit
e61b03cb9f
|
@ -37,6 +37,8 @@ import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
@ -95,6 +97,7 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder;
|
||||||
import io.netty.handler.ssl.ClientAuth;
|
import io.netty.handler.ssl.ClientAuth;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.util.AsciiString;
|
import io.netty.util.AsciiString;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -187,6 +190,7 @@ public class NettyClientTransportTest {
|
||||||
startServer();
|
startServer();
|
||||||
NettyClientTransport transport = newTransport(newNegotiator());
|
NettyClientTransport transport = newTransport(newNegotiator());
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
// Send a single RPC and wait for the response.
|
// Send a single RPC and wait for the response.
|
||||||
new Rpc(transport).halfClose().waitForResponse();
|
new Rpc(transport).halfClose().waitForResponse();
|
||||||
|
@ -244,6 +248,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
|
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
|
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
|
||||||
|
|
||||||
|
@ -261,6 +266,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||||
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
|
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Send a single RPC and wait for the response.
|
// Send a single RPC and wait for the response.
|
||||||
|
@ -287,6 +293,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport = newTransport(negotiator);
|
NettyClientTransport transport = newTransport(negotiator);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
}
|
}
|
||||||
|
verify(clientTransportListener, timeout(5000).times(2)).transportReady();
|
||||||
|
|
||||||
// Send a single RPC on each transport.
|
// Send a single RPC on each transport.
|
||||||
final List<Rpc> rpcs = new ArrayList<>(transports.size());
|
final List<Rpc> rpcs = new ArrayList<>(transports.size());
|
||||||
|
@ -316,6 +323,7 @@ public class NettyClientTransportTest {
|
||||||
failureStatus.asRuntimeException());
|
failureStatus.asRuntimeException());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportTerminated();
|
||||||
|
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
try {
|
try {
|
||||||
|
@ -349,6 +357,7 @@ public class NettyClientTransportTest {
|
||||||
ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext);
|
ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext);
|
||||||
final NettyClientTransport transport = newTransport(negotiator);
|
final NettyClientTransport transport = newTransport(negotiator);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportTerminated();
|
||||||
|
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
try {
|
try {
|
||||||
|
@ -378,6 +387,7 @@ public class NettyClientTransportTest {
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
|
final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
|
||||||
transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException());
|
transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException());
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportTerminated();
|
||||||
|
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
try {
|
try {
|
||||||
|
@ -409,6 +419,7 @@ public class NettyClientTransportTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportTerminated();
|
||||||
|
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
try {
|
try {
|
||||||
|
@ -428,6 +439,7 @@ public class NettyClientTransportTest {
|
||||||
|
|
||||||
NettyClientTransport transport = newTransport(newNegotiator());
|
NettyClientTransport transport = newTransport(newNegotiator());
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
// Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS
|
// Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS
|
||||||
// has been received by the remote endpoint.
|
// has been received by the remote endpoint.
|
||||||
|
@ -579,6 +591,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport =
|
NettyClientTransport transport =
|
||||||
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
|
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Send a single RPC and wait for the response.
|
// Send a single RPC and wait for the response.
|
||||||
|
@ -612,6 +625,7 @@ public class NettyClientTransportTest {
|
||||||
longStringOfA);
|
longStringOfA);
|
||||||
|
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
AtomicBoolean foundExpectedHeaderBytes = new AtomicBoolean(false);
|
AtomicBoolean foundExpectedHeaderBytes = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@ -641,6 +655,7 @@ public class NettyClientTransportTest {
|
||||||
|
|
||||||
NettyClientTransport transport = newTransport(newNegotiator());
|
NettyClientTransport transport = newTransport(newNegotiator());
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Send a single RPC and wait for the response.
|
// Send a single RPC and wait for the response.
|
||||||
|
@ -685,6 +700,7 @@ public class NettyClientTransportTest {
|
||||||
startServer();
|
startServer();
|
||||||
NettyClientTransport transport = newTransport(newNegotiator());
|
NettyClientTransport transport = newTransport(newNegotiator());
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
rpc.waitForResponse();
|
rpc.waitForResponse();
|
||||||
|
|
||||||
|
@ -703,6 +719,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
rpc.waitForResponse();
|
rpc.waitForResponse();
|
||||||
|
|
||||||
|
@ -715,6 +732,7 @@ public class NettyClientTransportTest {
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
rpc.waitForResponse();
|
rpc.waitForResponse();
|
||||||
|
|
||||||
|
@ -808,6 +826,7 @@ public class NettyClientTransportTest {
|
||||||
assertEquals(true, clientExecutorPool.isInUse());
|
assertEquals(true, clientExecutorPool.isInUse());
|
||||||
final NettyClientTransport transport = newTransport(negotiator);
|
final NettyClientTransport transport = newTransport(negotiator);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
verify(clientTransportListener, timeout(5000)).transportReady();
|
||||||
Rpc rpc = new Rpc(transport).halfClose();
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
rpc.waitForResponse();
|
rpc.waitForResponse();
|
||||||
// closing the negotiators should return the executors back to pool, and release the resource
|
// closing the negotiators should return the executors back to pool, and release the resource
|
||||||
|
@ -1098,9 +1117,15 @@ public class NettyClientTransportTest {
|
||||||
this.grpcHandler = grpcHandler;
|
this.grpcHandler = grpcHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
// Prevent any data being passed to NettyClientHandler
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
|
ctx.pipeline().addAfter(ctx.name(), null, grpcHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void fail(ChannelHandlerContext ctx, Throwable cause) {
|
public void fail(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
|
Loading…
Reference in New Issue