refactor: Stops exception allocation on channel shutdown

This fixes #11955.

Stops exception allocation and its propagation on channel shutdown.
This commit is contained in:
Abhishek Agrawal 2025-03-19 03:57:34 +00:00 committed by GitHub
parent e80c197455
commit a57c14a51e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 56 additions and 66 deletions

View File

@ -99,7 +99,7 @@ final class PingTracker {
private synchronized void fail(Status status) {
if (!done) {
done = true;
executor.execute(() -> callback.onFailure(status.asException()));
executor.execute(() -> callback.onFailure(status));
}
}

View File

@ -96,7 +96,7 @@ public final class PingTrackerTest {
private int numCallbacks;
private boolean success;
private boolean failure;
private Throwable failureException;
private Status failureStatus;
private long roundtripTimeNanos;
@Override
@ -107,10 +107,10 @@ public final class PingTrackerTest {
}
@Override
public synchronized void onFailure(Throwable failureException) {
public synchronized void onFailure(Status failureStatus) {
numCallbacks += 1;
failure = true;
this.failureException = failureException;
this.failureStatus = failureStatus;
}
public void assertNotCalled() {
@ -130,13 +130,13 @@ public final class PingTrackerTest {
public void assertFailure(Status status) {
assertThat(numCallbacks).isEqualTo(1);
assertThat(failure).isTrue();
assertThat(((StatusException) failureException).getStatus()).isSameInstanceAs(status);
assertThat(failureStatus).isSameInstanceAs(status);
}
public void assertFailure(Status.Code statusCode) {
assertThat(numCallbacks).isEqualTo(1);
assertThat(failure).isTrue();
assertThat(((StatusException) failureException).getStatus().getCode()).isEqualTo(statusCode);
assertThat(failureStatus.getCode()).isEqualTo(statusCode);
}
}
}

View File

@ -22,6 +22,7 @@ import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
@ -90,6 +91,6 @@ public interface ClientTransport extends InternalInstrumented<SocketStats> {
*
* @param cause the cause of the ping failure
*/
void onFailure(Throwable cause);
void onFailure(Status cause);
}
}

View File

@ -55,7 +55,7 @@ class FailingClientTransport implements ClientTransport {
public void ping(final PingCallback callback, Executor executor) {
executor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(error.asException());
callback.onFailure(error);
}
});
}

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import com.google.common.base.Stopwatch;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.internal.ClientTransport.PingCallback;
import java.util.LinkedHashMap;
import java.util.Map;
@ -62,7 +63,7 @@ public class Http2Ping {
/**
* If non-null, indicates the ping failed.
*/
@GuardedBy("this") private Throwable failureCause;
@GuardedBy("this") private Status failureCause;
/**
* The round-trip time for the ping, in nanoseconds. This value is only meaningful when
@ -144,7 +145,7 @@ public class Http2Ping {
*
* @param failureCause the cause of failure
*/
public void failed(Throwable failureCause) {
public void failed(Status failureCause) {
Map<ClientTransport.PingCallback, Executor> callbacks;
synchronized (this) {
if (completed) {
@ -167,7 +168,7 @@ public class Http2Ping {
* @param executor the executor used to invoke the callback
* @param cause the cause of failure
*/
public static void notifyFailed(PingCallback callback, Executor executor, Throwable cause) {
public static void notifyFailed(PingCallback callback, Executor executor, Status cause) {
doExecute(executor, asRunnable(callback, cause));
}
@ -203,7 +204,7 @@ public class Http2Ping {
* failure.
*/
private static Runnable asRunnable(final ClientTransport.PingCallback callback,
final Throwable failureCause) {
final Status failureCause) {
return new Runnable() {
@Override
public void run() {

View File

@ -275,7 +275,7 @@ public class KeepAliveManager {
public void onSuccess(long roundTripTimeNanos) {}
@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
transport.shutdownNow(Status.UNAVAILABLE.withDescription(
"Keepalive failed. The connection is likely gone"));
}

View File

@ -127,7 +127,7 @@ public final class KeepAliveManagerTest {
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
ClientTransport.PingCallback pingCallback = pingCallbackCaptor.getValue();
pingCallback.onFailure(new Throwable());
pingCallback.onFailure(Status.UNAVAILABLE.withDescription("I must write descriptions"));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(transport).shutdownNow(statusCaptor.capture());

View File

@ -181,7 +181,7 @@ public abstract class AbstractTransportTest {
protected ManagedClientTransport.Listener mockClientTransportListener
= mock(ManagedClientTransport.Listener.class);
protected MockServerListener serverListener = new MockServerListener();
private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
private ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
protected final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
private final TestClientStreamTracer clientStreamTracer2 = new TestHeaderClientStreamTracer();
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
@ -626,8 +626,8 @@ public abstract class AbstractTransportTest {
// Transport doesn't support ping, so this neither passes nor fails.
assumeTrue(false);
}
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture());
Status status = Status.fromThrowable(throwableCaptor.getValue());
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertSame(shutdownReason, status);
}

View File

@ -246,7 +246,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(shutdownStatus.asRuntimeException());
callback.onFailure(shutdownStatus);
}
});
} else {

View File

@ -30,7 +30,6 @@ final class ClientTransportLifecycleManager {
/** null iff !transportShutdown. */
private Status shutdownStatus;
/** null iff !transportShutdown. */
private Throwable shutdownThrowable;
private boolean transportTerminated;
public ClientTransportLifecycleManager(ManagedClientTransport.Listener listener) {
@ -72,7 +71,6 @@ final class ClientTransportLifecycleManager {
return false;
}
shutdownStatus = s;
shutdownThrowable = s.asException();
return true;
}
@ -97,7 +95,4 @@ final class ClientTransportLifecycleManager {
return shutdownStatus;
}
public Throwable getShutdownThrowable() {
return shutdownThrowable;
}
}

View File

@ -499,7 +499,7 @@ class NettyClientHandler extends AbstractNettyHandler {
streamStatus = lifecycleManager.getShutdownStatus();
}
try {
cancelPing(lifecycleManager.getShutdownThrowable());
cancelPing(lifecycleManager.getShutdownStatus());
// Report status to the application layer for any open streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
@ -593,13 +593,14 @@ class NettyClientHandler extends AbstractNettyHandler {
*/
private void createStream(CreateStreamCommand command, ChannelPromise promise)
throws Exception {
if (lifecycleManager.getShutdownThrowable() != null) {
if (lifecycleManager.getShutdownStatus() != null) {
command.stream().setNonExistent();
// The connection is going away (it is really the GOAWAY case),
// just terminate the stream now.
command.stream().transportReportStatus(
lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
promise.setFailure(lifecycleManager.getShutdownThrowable());
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
lifecycleManager.getShutdownStatus(), null));
return;
}
@ -852,19 +853,21 @@ class NettyClientHandler extends AbstractNettyHandler {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
transportTracer.reportKeepAliveSent();
return;
}
Throwable cause = future.cause();
Status status = lifecycleManager.getShutdownStatus();
if (cause instanceof ClosedChannelException) {
if (status == null) {
status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
.withCause(future.cause());
}
} else {
Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
cause = lifecycleManager.getShutdownThrowable();
if (cause == null) {
cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
.withCause(future.cause()).asException();
}
}
finalPing.failed(cause);
if (ping == finalPing) {
ping = null;
}
status = Utils.statusFromThrowable(cause);
}
finalPing.failed(status);
if (ping == finalPing) {
ping = null;
}
}
});
@ -963,9 +966,9 @@ class NettyClientHandler extends AbstractNettyHandler {
}
}
private void cancelPing(Throwable t) {
private void cancelPing(Status s) {
if (ping != null) {
ping.failed(t);
ping.failed(s);
ping = null;
}
}

View File

@ -165,7 +165,7 @@ class NettyClientTransport implements ConnectionClientTransport {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(statusExplainingWhyTheChannelIsNull.asException());
callback.onFailure(statusExplainingWhyTheChannelIsNull);
}
});
return;
@ -177,7 +177,7 @@ class NettyClientTransport implements ConnectionClientTransport {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Status s = statusFromFailedFuture(future);
Http2Ping.notifyFailed(callback, executor, s.asException());
Http2Ping.notifyFailed(callback, executor, s);
}
}
};

View File

@ -59,7 +59,6 @@ import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientStreamListener.RpcProgress;
@ -812,9 +811,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
handler().channelInactive(ctx());
// ping failed on channel going inactive
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());
// A failed ping is still counted
assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@ -1169,7 +1166,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private static class PingCallbackImpl implements ClientTransport.PingCallback {
int invocationCount;
long roundTripTime;
Throwable failureCause;
Status failureCause;
@Override
public void onSuccess(long roundTripTimeNanos) {
@ -1178,7 +1175,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
}
@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
invocationCount++;
this.failureCause = cause;
}

View File

@ -548,8 +548,8 @@ public class NettyClientTransportTest {
}
@Override
public void onFailure(Throwable cause) {
pingResult.setException(cause);
public void onFailure(Status cause) {
pingResult.setException(cause.asException());
}
};
transport.ping(pingCallback, clock.getScheduledExecutorService());

View File

@ -1062,12 +1062,12 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
}
}
private Throwable getPingFailure() {
private Status getPingFailure() {
synchronized (lock) {
if (goAwayStatus != null) {
return goAwayStatus.asException();
return goAwayStatus;
} else {
return Status.UNAVAILABLE.withDescription("Connection closed").asException();
return Status.UNAVAILABLE.withDescription("Connection closed");
}
}
}

View File

@ -67,7 +67,6 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
@ -1664,16 +1663,14 @@ public class OkHttpClientTransportTest {
clientTransport.shutdown(SHUTDOWN_REASON);
// ping failed on channel shutdown
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
assertSame(SHUTDOWN_REASON, callback.failureCause);
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
assertSame(SHUTDOWN_REASON, callback.failureCause);
shutdownAndVerify();
}
@ -1688,18 +1685,14 @@ public class OkHttpClientTransportTest {
clientTransport.onException(new IOException());
// ping failed on error
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());
shutdownAndVerify();
}
@ -2385,7 +2378,7 @@ public class OkHttpClientTransportTest {
static class PingCallbackImpl implements ClientTransport.PingCallback {
int invocationCount;
long roundTripTime;
Throwable failureCause;
Status failureCause;
@Override
public void onSuccess(long roundTripTimeNanos) {
@ -2394,7 +2387,7 @@ public class OkHttpClientTransportTest {
}
@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
invocationCount++;
this.failureCause = cause;
}