mirror of https://github.com/grpc/grpc-java.git
okhttp: tsan socket data race bug fix (#10279)
replaced use of bareSocket with a synchronized socket, added additional lock to synchronize initialization with shutdown() to fix a Java bug
This commit is contained in:
parent
f1de820c19
commit
2effae6249
|
@ -92,10 +92,10 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
|
private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
|
||||||
|
|
||||||
private final Config config;
|
private final Config config;
|
||||||
private final Socket bareSocket;
|
|
||||||
private final Variant variant = new Http2();
|
private final Variant variant = new Http2();
|
||||||
private final TransportTracer tracer;
|
private final TransportTracer tracer;
|
||||||
private final InternalLogId logId;
|
private final InternalLogId logId;
|
||||||
|
private Socket socket;
|
||||||
private ServerTransportListener listener;
|
private ServerTransportListener listener;
|
||||||
private Executor transportExecutor;
|
private Executor transportExecutor;
|
||||||
private ScheduledExecutorService scheduledExecutorService;
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
@ -141,11 +141,11 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
|
|
||||||
public OkHttpServerTransport(Config config, Socket bareSocket) {
|
public OkHttpServerTransport(Config config, Socket bareSocket) {
|
||||||
this.config = Preconditions.checkNotNull(config, "config");
|
this.config = Preconditions.checkNotNull(config, "config");
|
||||||
this.bareSocket = Preconditions.checkNotNull(bareSocket, "bareSocket");
|
this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
|
||||||
|
|
||||||
tracer = config.transportTracerFactory.create();
|
tracer = config.transportTracerFactory.create();
|
||||||
tracer.setFlowControlWindowReader(this::readFlowControlWindow);
|
tracer.setFlowControlWindowReader(this::readFlowControlWindow);
|
||||||
logId = InternalLogId.allocate(getClass(), bareSocket.getRemoteSocketAddress().toString());
|
logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
|
||||||
transportExecutor = config.transportExecutorPool.getObject();
|
transportExecutor = config.transportExecutorPool.getObject();
|
||||||
scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
|
scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
|
||||||
keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
|
keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
|
||||||
|
@ -161,10 +161,17 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
|
|
||||||
private void startIo(SerializingExecutor serializingExecutor) {
|
private void startIo(SerializingExecutor serializingExecutor) {
|
||||||
try {
|
try {
|
||||||
bareSocket.setTcpNoDelay(true);
|
// The socket implementation is lazily initialized, but had broken thread-safety
|
||||||
|
// for that laziness https://bugs.openjdk.org/browse/JDK-8278326.
|
||||||
|
// As a workaround, we lock to synchronize initialization with shutdown().
|
||||||
|
synchronized (lock) {
|
||||||
|
socket.setTcpNoDelay(true);
|
||||||
|
}
|
||||||
HandshakerSocketFactory.HandshakeResult result =
|
HandshakerSocketFactory.HandshakeResult result =
|
||||||
config.handshakerSocketFactory.handshake(bareSocket, Attributes.EMPTY);
|
config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
|
||||||
Socket socket = result.socket;
|
synchronized (lock) {
|
||||||
|
this.socket = result.socket;
|
||||||
|
}
|
||||||
this.attributes = result.attributes;
|
this.attributes = result.attributes;
|
||||||
|
|
||||||
int maxQueuedControlFrames = 10000;
|
int maxQueuedControlFrames = 10000;
|
||||||
|
@ -249,7 +256,7 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
log.log(Level.INFO, "Socket failed to handshake", ex);
|
log.log(Level.INFO, "Socket failed to handshake", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
terminated();
|
terminated();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -268,7 +275,7 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
|
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
|
||||||
if (frameWriter == null) {
|
if (frameWriter == null) {
|
||||||
handshakeShutdown = true;
|
handshakeShutdown = true;
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
} else {
|
} else {
|
||||||
// RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
|
// RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
|
||||||
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
|
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
|
||||||
|
@ -309,7 +316,7 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (frameWriter == null) {
|
if (frameWriter == null) {
|
||||||
handshakeShutdown = true;
|
handshakeShutdown = true;
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -360,7 +367,7 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
|
|
||||||
private void triggerForcefulClose() {
|
private void triggerForcefulClose() {
|
||||||
// Safe to do unconditionally; no need to check if timer cancellation raced
|
// Safe to do unconditionally; no need to check if timer cancellation raced
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void terminated() {
|
private void terminated() {
|
||||||
|
@ -396,9 +403,9 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
return Futures.immediateFuture(new InternalChannelz.SocketStats(
|
return Futures.immediateFuture(new InternalChannelz.SocketStats(
|
||||||
tracer.getStats(),
|
tracer.getStats(),
|
||||||
bareSocket.getLocalSocketAddress(),
|
socket.getLocalSocketAddress(),
|
||||||
bareSocket.getRemoteSocketAddress(),
|
socket.getRemoteSocketAddress(),
|
||||||
Utils.getSocketOptions(bareSocket),
|
Utils.getSocketOptions(socket),
|
||||||
securityInfo));
|
securityInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -593,12 +600,12 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
} finally {
|
} finally {
|
||||||
// Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
|
// Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
|
||||||
try {
|
try {
|
||||||
GrpcUtil.exhaust(bareSocket.getInputStream());
|
GrpcUtil.exhaust(socket.getInputStream());
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Unable to wait, so just proceed to tear-down. The socket is probably already closed so
|
// Unable to wait, so just proceed to tear-down. The socket is probably already closed so
|
||||||
// the GOAWAY can't be sent anyway.
|
// the GOAWAY can't be sent anyway.
|
||||||
}
|
}
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
terminated();
|
terminated();
|
||||||
Thread.currentThread().setName(threadName);
|
Thread.currentThread().setName(threadName);
|
||||||
}
|
}
|
||||||
|
@ -1108,7 +1115,7 @@ final class OkHttpServerTransport implements ServerTransport,
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
goAwayStatus = Status.UNAVAILABLE
|
goAwayStatus = Status.UNAVAILABLE
|
||||||
.withDescription("Keepalive failed. Considering connection dead");
|
.withDescription("Keepalive failed. Considering connection dead");
|
||||||
GrpcUtil.closeQuietly(bareSocket);
|
GrpcUtil.closeQuietly(socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue