all: use PerfMark.traceTask (#9950)

* all: use PerfMark.traceTask

* make linter happy
This commit is contained in:
Carl Mastrangelo 2023-04-18 10:46:54 -07:00 committed by GitHub
parent 2e5cc84c51
commit 4e3ee4471e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 295 additions and 451 deletions

View File

@ -25,6 +25,7 @@ import io.grpc.Compressor;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import java.io.InputStream; import java.io.InputStream;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
@ -219,25 +220,19 @@ public abstract class AbstractStream implements Stream {
*/ */
private void requestMessagesFromDeframer(final int numMessages) { private void requestMessagesFromDeframer(final int numMessages) {
if (deframer instanceof ThreadOptimizedDeframer) { if (deframer instanceof ThreadOptimizedDeframer) {
PerfMark.startTask("AbstractStream.request"); try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
try {
deframer.request(numMessages); deframer.request(numMessages);
} finally {
PerfMark.stopTask("AbstractStream.request");
} }
return; return;
} }
final Link link = PerfMark.linkOut(); final Link link = PerfMark.linkOut();
class RequestRunnable implements Runnable { class RequestRunnable implements Runnable {
@Override public void run() { @Override public void run() {
PerfMark.startTask("AbstractStream.request"); try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
PerfMark.linkIn(link); PerfMark.linkIn(link);
try {
deframer.request(numMessages); deframer.request(numMessages);
} catch (Throwable t) { } catch (Throwable t) {
deframeFailed(t); deframeFailed(t);
} finally {
PerfMark.stopTask("AbstractStream.request");
} }
} }
} }

View File

@ -52,6 +52,7 @@ import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Locale; import java.util.Locale;
@ -187,11 +188,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void start(Listener<RespT> observer, Metadata headers) { public void start(Listener<RespT> observer, Metadata headers) {
PerfMark.startTask("ClientCall.start", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
try { PerfMark.attachTag(tag);
startInternal(observer, headers); startInternal(observer, headers);
} finally {
PerfMark.stopTask("ClientCall.start", tag);
} }
} }
@ -446,23 +445,19 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void request(int numMessages) { public void request(int numMessages) {
PerfMark.startTask("ClientCall.request", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
try { PerfMark.attachTag(tag);
checkState(stream != null, "Not started"); checkState(stream != null, "Not started");
checkArgument(numMessages >= 0, "Number requested must be non-negative"); checkArgument(numMessages >= 0, "Number requested must be non-negative");
stream.request(numMessages); stream.request(numMessages);
} finally {
PerfMark.stopTask("ClientCall.request", tag);
} }
} }
@Override @Override
public void cancel(@Nullable String message, @Nullable Throwable cause) { public void cancel(@Nullable String message, @Nullable Throwable cause) {
PerfMark.startTask("ClientCall.cancel", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
try { PerfMark.attachTag(tag);
cancelInternal(message, cause); cancelInternal(message, cause);
} finally {
PerfMark.stopTask("ClientCall.cancel", tag);
} }
} }
@ -497,11 +492,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void halfClose() { public void halfClose() {
PerfMark.startTask("ClientCall.halfClose", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
try { PerfMark.attachTag(tag);
halfCloseInternal(); halfCloseInternal();
} finally {
PerfMark.stopTask("ClientCall.halfClose", tag);
} }
} }
@ -515,11 +508,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void sendMessage(ReqT message) { public void sendMessage(ReqT message) {
PerfMark.startTask("ClientCall.sendMessage", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
try { PerfMark.attachTag(tag);
sendMessageInternal(message); sendMessageInternal(message);
} finally {
PerfMark.stopTask("ClientCall.sendMessage", tag);
} }
} }
@ -603,104 +594,93 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void headersRead(final Metadata headers) { public void headersRead(final Metadata headers) {
PerfMark.startTask("ClientStreamListener.headersRead", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
final Link link = PerfMark.linkOut(); PerfMark.attachTag(tag);
final Link link = PerfMark.linkOut();
final class HeadersRead extends ContextRunnable {
HeadersRead() {
super(context);
}
final class HeadersRead extends ContextRunnable { @Override
HeadersRead() { public void runInContext() {
super(context); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
} PerfMark.attachTag(tag);
PerfMark.linkIn(link);
runInternal();
}
}
@Override private void runInternal() {
public void runInContext() { if (exceptionStatus != null) {
PerfMark.startTask("ClientCall$Listener.headersRead", tag); return;
PerfMark.linkIn(link); }
try { try {
runInternal(); observer.onHeaders(headers);
} finally { } catch (Throwable t) {
PerfMark.stopTask("ClientCall$Listener.headersRead", tag); exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
}
} }
} }
private void runInternal() {
if (exceptionStatus != null) {
return;
}
try {
observer.onHeaders(headers);
} catch (Throwable t) {
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
}
}
}
try {
callExecutor.execute(new HeadersRead()); callExecutor.execute(new HeadersRead());
} finally {
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
} }
} }
@Override @Override
public void messagesAvailable(final MessageProducer producer) { public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
final Link link = PerfMark.linkOut(); PerfMark.attachTag(tag);
final Link link = PerfMark.linkOut();
final class MessagesAvailable extends ContextRunnable { final class MessagesAvailable extends ContextRunnable {
MessagesAvailable() { MessagesAvailable() {
super(context); super(context);
}
@Override
public void runInContext() {
PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag);
PerfMark.linkIn(link);
try {
runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag);
} }
}
private void runInternal() { @Override
if (exceptionStatus != null) { public void runInContext() {
GrpcUtil.closeQuietly(producer); try (TaskCloseable ignore =
return; PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
} PerfMark.attachTag(tag);
try { PerfMark.linkIn(link);
InputStream message; runInternal();
while ((message = producer.next()) != null) { }
try { }
observer.onMessage(method.parseResponse(message));
} catch (Throwable t) { private void runInternal() {
GrpcUtil.closeQuietly(message); if (exceptionStatus != null) {
throw t; GrpcUtil.closeQuietly(producer);
} return;
message.close(); }
try {
InputStream message;
while ((message = producer.next()) != null) {
try {
observer.onMessage(method.parseResponse(message));
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
throw t;
}
message.close();
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
} }
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
} }
} }
}
try {
callExecutor.execute(new MessagesAvailable()); callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
} }
} }
@Override @Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
try { PerfMark.attachTag(tag);
closedInternal(status, rpcProgress, trailers); closedInternal(status, rpcProgress, trailers);
} finally {
PerfMark.stopTask("ClientStreamListener.closed", tag);
} }
} }
@ -730,12 +710,10 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void runInContext() { public void runInContext() {
PerfMark.startTask("ClientCall$Listener.onClose", tag); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
PerfMark.linkIn(link); PerfMark.attachTag(tag);
try { PerfMark.linkIn(link);
runInternal(); runInternal();
} finally {
PerfMark.stopTask("ClientCall$Listener.onClose", tag);
} }
} }
@ -770,43 +748,38 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
if (method.getType().clientSendsOneMessage()) { if (method.getType().clientSendsOneMessage()) {
return; return;
} }
try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
PerfMark.attachTag(tag);
final Link link = PerfMark.linkOut();
PerfMark.startTask("ClientStreamListener.onReady", tag); final class StreamOnReady extends ContextRunnable {
final Link link = PerfMark.linkOut(); StreamOnReady() {
super(context);
}
final class StreamOnReady extends ContextRunnable { @Override
StreamOnReady() { public void runInContext() {
super(context); try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
} PerfMark.attachTag(tag);
PerfMark.linkIn(link);
runInternal();
}
}
@Override private void runInternal() {
public void runInContext() { if (exceptionStatus != null) {
PerfMark.startTask("ClientCall$Listener.onReady", tag); return;
PerfMark.linkIn(link); }
try { try {
runInternal(); observer.onReady();
} finally { } catch (Throwable t) {
PerfMark.stopTask("ClientCall$Listener.onReady", tag); exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
}
} }
} }
private void runInternal() {
if (exceptionStatus != null) {
return;
}
try {
observer.onReady();
} catch (Throwable t) {
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
}
}
}
try {
callExecutor.execute(new StreamOnReady()); callExecutor.execute(new StreamOnReady());
} finally {
PerfMark.stopTask("ClientStreamListener.onReady", tag);
} }
} }
} }

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import java.io.Closeable; import java.io.Closeable;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -107,11 +108,9 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
} else { } else {
if (!alreadyEnqueued) { if (!alreadyEnqueued) {
if (currentThreadIsTransportThread) { if (currentThreadIsTransportThread) {
PerfMark.startTask("MigratingThreadDeframer.messageAvailable"); try (TaskCloseable ignore =
try { PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
transportListener.messagesAvailable(messageProducer); transportListener.messagesAvailable(messageProducer);
} finally {
PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
} }
} else { } else {
final Link link = PerfMark.linkOut(); final Link link = PerfMark.linkOut();
@ -119,12 +118,10 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
// MigratingThreadDeframer // MigratingThreadDeframer
transportExecutor.runOnTransportThread(new Runnable() { transportExecutor.runOnTransportThread(new Runnable() {
@Override public void run() { @Override public void run() {
PerfMark.startTask("MigratingThreadDeframer.messageAvailable"); try (TaskCloseable ignore =
PerfMark.linkIn(link); PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
try { PerfMark.linkIn(link);
transportListener.messagesAvailable(messageProducer); transportListener.messagesAvailable(messageProducer);
} finally {
PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
} }
} }
}); });
@ -145,28 +142,22 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
// necessary processing // necessary processing
transportExecutor.runOnTransportThread(new Runnable() { transportExecutor.runOnTransportThread(new Runnable() {
@Override public void run() { @Override public void run() {
PerfMark.startTask("MigratingThreadDeframer.request"); try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
PerfMark.linkIn(link); PerfMark.linkIn(link);
try {
// Since processing continues from transport thread while this runnable was // Since processing continues from transport thread while this runnable was
// enqueued, the state may have changed since we ran runOnTransportThread. So we // enqueued, the state may have changed since we ran runOnTransportThread. So we
// must make sure deframerOnTransportThread==true // must make sure deframerOnTransportThread==true
requestFromTransportThread(numMessages); requestFromTransportThread(numMessages);
} finally {
PerfMark.stopTask("MigratingThreadDeframer.request");
} }
} }
}); });
return; return;
} }
PerfMark.startTask("MigratingThreadDeframer.request"); try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
try {
deframer.request(numMessages); deframer.request(numMessages);
} catch (Throwable t) { } catch (Throwable t) {
appListener.deframeFailed(t); appListener.deframeFailed(t);
deframer.close(); // unrecoverable state deframer.close(); // unrecoverable state
} finally {
PerfMark.stopTask("MigratingThreadDeframer.request");
} }
} }
} }
@ -205,8 +196,7 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
public void deframe(final ReadableBuffer data) { public void deframe(final ReadableBuffer data) {
class DeframeOp implements Op, Closeable { class DeframeOp implements Op, Closeable {
@Override public void run(boolean isDeframerOnTransportThread) { @Override public void run(boolean isDeframerOnTransportThread) {
PerfMark.startTask("MigratingThreadDeframer.deframe"); try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.deframe")) {
try {
if (isDeframerOnTransportThread) { if (isDeframerOnTransportThread) {
deframer.deframe(data); deframer.deframe(data);
return; return;
@ -218,8 +208,6 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
appListener.deframeFailed(t); appListener.deframeFailed(t);
deframer.close(); // unrecoverable state deframer.close(); // unrecoverable state
} }
} finally {
PerfMark.stopTask("MigratingThreadDeframer.deframe");
} }
} }

View File

@ -43,6 +43,7 @@ import io.grpc.ServerCall;
import io.grpc.Status; import io.grpc.Status;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.io.InputStream; import java.io.InputStream;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -89,21 +90,17 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void request(int numMessages) { public void request(int numMessages) {
PerfMark.startTask("ServerCall.request", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) {
try { PerfMark.attachTag(tag);
stream.request(numMessages); stream.request(numMessages);
} finally {
PerfMark.stopTask("ServerCall.request", tag);
} }
} }
@Override @Override
public void sendHeaders(Metadata headers) { public void sendHeaders(Metadata headers) {
PerfMark.startTask("ServerCall.sendHeaders", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) {
try { PerfMark.attachTag(tag);
sendHeadersInternal(headers); sendHeadersInternal(headers);
} finally {
PerfMark.stopTask("ServerCall.sendHeaders", tag);
} }
} }
@ -149,11 +146,9 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void sendMessage(RespT message) { public void sendMessage(RespT message) {
PerfMark.startTask("ServerCall.sendMessage", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) {
try { PerfMark.attachTag(tag);
sendMessageInternal(message); sendMessageInternal(message);
} finally {
PerfMark.stopTask("ServerCall.sendMessage", tag);
} }
} }
@ -207,11 +202,9 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void close(Status status, Metadata trailers) { public void close(Status status, Metadata trailers) {
PerfMark.startTask("ServerCall.close", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) {
try { PerfMark.attachTag(tag);
closeInternal(status, trailers); closeInternal(status, trailers);
} finally {
PerfMark.stopTask("ServerCall.close", tag);
} }
} }
@ -311,11 +304,9 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void messagesAvailable(MessageProducer producer) { public void messagesAvailable(MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
try { PerfMark.attachTag(call.tag);
messagesAvailableInternal(producer); messagesAvailableInternal(producer);
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
} }
} }
@ -346,25 +337,21 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void halfClosed() { public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", call.tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
try { PerfMark.attachTag(call.tag);
if (call.cancelled) { if (call.cancelled) {
return; return;
} }
listener.onHalfClose(); listener.onHalfClose();
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
} }
} }
@Override @Override
public void closed(Status status) { public void closed(Status status) {
PerfMark.startTask("ServerStreamListener.closed", call.tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
try { PerfMark.attachTag(call.tag);
closedInternal(status); closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", call.tag);
} }
} }
@ -390,14 +377,12 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override @Override
public void onReady() { public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", call.tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
try { PerfMark.attachTag(call.tag);
if (call.cancelled) { if (call.cancelled) {
return; return;
} }
listener.onReady(); listener.onReady();
} finally {
PerfMark.stopTask("ServerCall.closed", call.tag);
} }
} }
} }

View File

@ -58,6 +58,7 @@ import io.grpc.Status;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -461,11 +462,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void streamCreated(ServerStream stream, String methodName, Metadata headers) { public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
Tag tag = PerfMark.createTag(methodName, stream.streamId()); Tag tag = PerfMark.createTag(methodName, stream.streamId());
PerfMark.startTask("ServerTransportListener.streamCreated", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) {
try { PerfMark.attachTag(tag);
streamCreatedInternal(stream, methodName, headers, tag); streamCreatedInternal(stream, methodName, headers, tag);
} finally {
PerfMark.stopTask("ServerTransportListener.streamCreated", tag);
} }
} }
@ -523,12 +522,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void runInContext() { public void runInContext() {
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag); try (TaskCloseable ignore =
PerfMark.linkIn(link); PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) {
try { PerfMark.attachTag(tag);
PerfMark.linkIn(link);
runInternal(); runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
} }
} }
@ -598,12 +596,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void runInContext() { public void runInContext() {
PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag); try (TaskCloseable ignore =
PerfMark.linkIn(link); PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) {
try { PerfMark.linkIn(link);
PerfMark.attachTag(tag);
runInternal(); runInternal();
} finally {
PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag);
} }
} }
@ -818,76 +815,65 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void messagesAvailable(final MessageProducer producer) { public void messagesAvailable(final MessageProducer producer) {
PerfMark.startTask("ServerStreamListener.messagesAvailable", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
final Link link = PerfMark.linkOut(); PerfMark.attachTag(tag);
final Link link = PerfMark.linkOut();
final class MessagesAvailable extends ContextRunnable {
final class MessagesAvailable extends ContextRunnable { MessagesAvailable() {
super(context);
}
MessagesAvailable() { @Override
super(context); public void runInContext() {
} try (TaskCloseable ignore =
PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) {
@Override PerfMark.attachTag(tag);
public void runInContext() { PerfMark.linkIn(link);
PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag); getListener().messagesAvailable(producer);
PerfMark.linkIn(link); } catch (Throwable t) {
try { internalClose(t);
getListener().messagesAvailable(producer); throw t;
} catch (Throwable t) { }
internalClose(t);
throw t;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
} }
} }
}
try {
callExecutor.execute(new MessagesAvailable()); callExecutor.execute(new MessagesAvailable());
} finally {
PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag);
} }
} }
@Override @Override
public void halfClosed() { public void halfClosed() {
PerfMark.startTask("ServerStreamListener.halfClosed", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
final Link link = PerfMark.linkOut(); PerfMark.attachTag(tag);
final Link link = PerfMark.linkOut();
final class HalfClosed extends ContextRunnable {
HalfClosed() {
super(context);
}
final class HalfClosed extends ContextRunnable { @Override
HalfClosed() { public void runInContext() {
super(context); try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) {
} PerfMark.attachTag(tag);
PerfMark.linkIn(link);
@Override getListener().halfClosed();
public void runInContext() { } catch (Throwable t) {
PerfMark.startTask("ServerCallListener(app).halfClosed", tag); internalClose(t);
PerfMark.linkIn(link); throw t;
try { }
getListener().halfClosed();
} catch (Throwable t) {
internalClose(t);
throw t;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
} }
} }
}
try {
callExecutor.execute(new HalfClosed()); callExecutor.execute(new HalfClosed());
} finally {
PerfMark.stopTask("ServerStreamListener.halfClosed", tag);
} }
} }
@Override @Override
public void closed(final Status status) { public void closed(final Status status) {
PerfMark.startTask("ServerStreamListener.closed", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
try { PerfMark.attachTag(tag);
closedInternal(status); closedInternal(status);
} finally {
PerfMark.stopTask("ServerStreamListener.closed", tag);
} }
} }
@ -917,12 +903,10 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void runInContext() { public void runInContext() {
PerfMark.startTask("ServerCallListener(app).closed", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) {
PerfMark.linkIn(link); PerfMark.attachTag(tag);
try { PerfMark.linkIn(link);
getListener().closed(status); getListener().closed(status);
} finally {
PerfMark.stopTask("ServerCallListener(app).closed", tag);
} }
} }
} }
@ -932,32 +916,29 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void onReady() { public void onReady() {
PerfMark.startTask("ServerStreamListener.onReady", tag); try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
final Link link = PerfMark.linkOut(); PerfMark.attachTag(tag);
final class OnReady extends ContextRunnable { final Link link = PerfMark.linkOut();
OnReady() {
super(context);
}
@Override final class OnReady extends ContextRunnable {
public void runInContext() { OnReady() {
PerfMark.startTask("ServerCallListener(app).onReady", tag); super(context);
PerfMark.linkIn(link); }
try {
getListener().onReady(); @Override
} catch (Throwable t) { public void runInContext() {
internalClose(t); try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) {
throw t; PerfMark.attachTag(tag);
} finally { PerfMark.linkIn(link);
PerfMark.stopTask("ServerCallListener(app).onReady", tag); getListener().onReady();
} catch (Throwable t) {
internalClose(t);
throw t;
}
} }
} }
}
try {
callExecutor.execute(new OnReady()); callExecutor.execute(new OnReady());
} finally {
PerfMark.stopTask("ServerStreamListener.onReady", tag);
} }
} }
} }

View File

@ -79,6 +79,7 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
@ -592,13 +593,11 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2Headers headers = command.headers(); Http2Headers headers = command.headers();
stream.setId(streamId); stream.setId(streamId);
PerfMark.startTask("NettyClientHandler.createStream", stream.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
PerfMark.linkIn(command.getLink()); PerfMark.linkIn(command.getLink());
try { PerfMark.attachTag(stream.tag());
createStreamTraced( createStreamTraced(
streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
} finally {
PerfMark.stopTask("NettyClientHandler.createStream", stream.tag());
} }
} }
@ -670,9 +669,9 @@ class NettyClientHandler extends AbstractNettyHandler {
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream(); NettyClientStream.TransportState stream = cmd.stream();
PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
PerfMark.linkIn(cmd.getLink()); PerfMark.attachTag(stream.tag());
try { PerfMark.linkIn(cmd.getLink());
Status reason = cmd.reason(); Status reason = cmd.reason();
if (reason != null) { if (reason != null) {
stream.transportReportStatus(reason, true, new Metadata()); stream.transportReportStatus(reason, true, new Metadata());
@ -682,8 +681,6 @@ class NettyClientHandler extends AbstractNettyHandler {
} else { } else {
promise.setSuccess(); promise.setSuccess();
} }
} finally {
PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag());
} }
} }
@ -692,25 +689,20 @@ class NettyClientHandler extends AbstractNettyHandler {
*/ */
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
PerfMark.linkIn(cmd.getLink()); PerfMark.attachTag(cmd.stream().tag());
try { PerfMark.linkIn(cmd.getLink());
// Call the base class to write the HTTP/2 DATA frame. // Call the base class to write the HTTP/2 DATA frame.
// Note: no need to flush since this is handled by the outbound flow controller. // Note: no need to flush since this is handled by the outbound flow controller.
encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
} finally {
PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag());
} }
} }
private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
ChannelPromise promise) { ChannelPromise promise) {
PerfMark.startTask("NettyClientHandler.sendPingFrame"); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
PerfMark.linkIn(msg.getLink()); PerfMark.linkIn(msg.getLink());
try {
sendPingFrameTraced(ctx, msg, promise); sendPingFrameTraced(ctx, msg, promise);
} finally {
PerfMark.stopTask("NettyClientHandler.sendPingFrame");
} }
} }
@ -788,17 +780,15 @@ class NettyClientHandler extends AbstractNettyHandler {
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream); NettyClientStream.TransportState clientStream = clientStream(stream);
Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyClientHandler.forcefulClose", tag); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
PerfMark.linkIn(msg.getLink()); PerfMark.linkIn(msg.getLink());
try { PerfMark.attachTag(tag);
if (clientStream != null) { if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} }
stream.close(); stream.close();
return true; return true;
} finally {
PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
} }
} }
}); });

View File

@ -46,6 +46,7 @@ import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -117,11 +118,9 @@ class NettyClientStream extends AbstractClientStream {
@Override @Override
public void writeHeaders(Metadata headers, byte[] requestPayload) { public void writeHeaders(Metadata headers, byte[] requestPayload) {
PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); try (TaskCloseable ignore =
try { PerfMark.traceTask("NettyClientStream$Sink.writeHeaders")) {
writeHeadersInternal(headers, requestPayload); writeHeadersInternal(headers, requestPayload);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeHeaders");
} }
} }
@ -207,21 +206,15 @@ class NettyClientStream extends AbstractClientStream {
@Override @Override
public void writeFrame( public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
PerfMark.startTask("NettyClientStream$Sink.writeFrame"); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.writeFrame")) {
try {
writeFrameInternal(frame, endOfStream, flush, numMessages); writeFrameInternal(frame, endOfStream, flush, numMessages);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.writeFrame");
} }
} }
@Override @Override
public void cancel(Status status) { public void cancel(Status status) {
PerfMark.startTask("NettyClientStream$Sink.cancel"); try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.cancel")) {
try {
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
} finally {
PerfMark.stopTask("NettyClientStream$Sink.cancel");
} }
} }
} }

View File

@ -93,6 +93,7 @@ import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.List; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -473,8 +474,8 @@ class NettyServerHandler extends AbstractNettyHandler {
transportTracer, transportTracer,
method); method);
PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
try { PerfMark.attachTag(state.tag());
String authority = getOrUpdateAuthority((AsciiString) headers.authority()); String authority = getOrUpdateAuthority((AsciiString) headers.authority());
NettyServerStream stream = new NettyServerStream( NettyServerStream stream = new NettyServerStream(
ctx.channel(), ctx.channel(),
@ -486,8 +487,6 @@ class NettyServerHandler extends AbstractNettyHandler {
transportListener.streamCreated(stream, method, metadata); transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated(); state.onStreamAllocated();
http2Stream.setProperty(streamKey, state); http2Stream.setProperty(streamKey, state);
} finally {
PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag());
} }
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "Exception in onHeadersRead()", e); logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
@ -513,11 +512,9 @@ class NettyServerHandler extends AbstractNettyHandler {
flowControlPing().onDataRead(data.readableBytes(), padding); flowControlPing().onDataRead(data.readableBytes(), padding);
try { try {
NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
try { PerfMark.attachTag(stream.tag());
stream.inboundDataReceived(data, endOfStream); stream.inboundDataReceived(data, endOfStream);
} finally {
PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag());
} }
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onDataRead()", e); logger.log(Level.WARNING, "Exception in onDataRead()", e);
@ -530,12 +527,10 @@ class NettyServerHandler extends AbstractNettyHandler {
try { try {
NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
if (stream != null) { if (stream != null) {
PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
try { PerfMark.attachTag(stream.tag());
stream.transportReportStatus( stream.transportReportStatus(
Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
} finally {
PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag());
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -564,16 +559,14 @@ class NettyServerHandler extends AbstractNettyHandler {
} }
logger.log(level, "Stream Error", cause); logger.log(level, "Stream Error", cause);
Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag(); Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyServerHandler.onStreamError", tag); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
try { PerfMark.attachTag(tag);
if (serverStream != null) { if (serverStream != null) {
serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
} }
// TODO(ejona): Abort the stream by sending headers to help the client with debugging. // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
// Delegate to the base class to send a RST_STREAM. // Delegate to the base class to send a RST_STREAM.
super.onStreamError(ctx, outbound, cause, http2Ex); super.onStreamError(ctx, outbound, cause, http2Ex);
} finally {
PerfMark.stopTask("NettyServerHandler.onStreamError", tag);
} }
} }
@ -699,16 +692,14 @@ class NettyServerHandler extends AbstractNettyHandler {
*/ */
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) throws Http2Exception { ChannelPromise promise) throws Http2Exception {
PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
PerfMark.linkIn(cmd.getLink()); PerfMark.attachTag(cmd.stream().tag());
try { PerfMark.linkIn(cmd.getLink());
if (cmd.endStream()) { if (cmd.endStream()) {
closeStreamWhenDone(promise, cmd.stream().id()); closeStreamWhenDone(promise, cmd.stream().id());
} }
// Call the base class to write the HTTP/2 DATA frame. // Call the base class to write the HTTP/2 DATA frame.
encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag());
} }
} }
@ -717,9 +708,9 @@ class NettyServerHandler extends AbstractNettyHandler {
*/ */
private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
ChannelPromise promise) throws Http2Exception { ChannelPromise promise) throws Http2Exception {
PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
PerfMark.linkIn(cmd.getLink()); PerfMark.attachTag(cmd.stream().tag());
try { PerfMark.linkIn(cmd.getLink());
// TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
// is fixed. // is fixed.
int streamId = cmd.stream().id(); int streamId = cmd.stream().id();
@ -732,22 +723,18 @@ class NettyServerHandler extends AbstractNettyHandler {
closeStreamWhenDone(promise, streamId); closeStreamWhenDone(promise, streamId);
} }
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag());
} }
} }
private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
PerfMark.linkIn(cmd.getLink()); PerfMark.attachTag(cmd.stream().tag());
try { PerfMark.linkIn(cmd.getLink());
// Notify the listener if we haven't already. // Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason()); cmd.stream().transportReportStatus(cmd.reason());
// Terminate the stream. // Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag());
} }
} }
@ -774,13 +761,11 @@ class NettyServerHandler extends AbstractNettyHandler {
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream); NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) { if (serverStream != null) {
PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag()); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
PerfMark.linkIn(msg.getLink()); PerfMark.attachTag(serverStream.tag());
try { PerfMark.linkIn(msg.getLink());
serverStream.transportReportStatus(msg.getStatus()); serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} finally {
PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
} }
} }
stream.close(); stream.close();

View File

@ -36,6 +36,7 @@ import io.netty.handler.codec.http2.Http2Stream;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -94,15 +95,12 @@ class NettyServerStream extends AbstractServerStream {
private class Sink implements AbstractServerStream.Sink { private class Sink implements AbstractServerStream.Sink {
@Override @Override
public void writeHeaders(Metadata headers) { public void writeHeaders(Metadata headers) {
PerfMark.startTask("NettyServerStream$Sink.writeHeaders"); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) {
try {
writeQueue.enqueue( writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders( SendResponseHeadersCommand.createHeaders(
transportState(), transportState(),
Utils.convertServerHeaders(headers)), Utils.convertServerHeaders(headers)),
true); true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeHeaders");
} }
} }
@ -128,34 +126,25 @@ class NettyServerStream extends AbstractServerStream {
@Override @Override
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
PerfMark.startTask("NettyServerStream$Sink.writeFrame"); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeFrame")) {
try {
writeFrameInternal(frame, flush, numMessages); writeFrameInternal(frame, flush, numMessages);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeFrame");
} }
} }
@Override @Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeTrailers")) {
try {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue( writeQueue.enqueue(
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
true); true);
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeTrailers");
} }
} }
@Override @Override
public void cancel(Status status) { public void cancel(Status status) {
PerfMark.startTask("NettyServerStream$Sink.cancel"); try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
try {
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
} finally {
PerfMark.startTask("NettyServerStream$Sink.cancel");
} }
} }
} }
@ -192,12 +181,11 @@ class NettyServerStream extends AbstractServerStream {
eventLoop.execute(new Runnable() { eventLoop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
PerfMark.startTask("NettyServerStream$TransportState.runOnTransportThread", tag); try (TaskCloseable ignore =
PerfMark.linkIn(link); PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) {
try { PerfMark.attachTag(tag);
PerfMark.linkIn(link);
r.run(); r.run();
} finally {
PerfMark.stopTask("NettyServerStream$TransportState.runOnTransportThread", tag);
} }
} }
}); });

View File

@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -119,8 +120,7 @@ class WriteQueue {
* called in the event loop * called in the event loop
*/ */
private void flush() { private void flush() {
PerfMark.startTask("WriteQueue.periodicFlush"); try (TaskCloseable ignore = PerfMark.traceTask("WriteQueue.periodicFlush")) {
try {
QueuedCommand cmd; QueuedCommand cmd;
int i = 0; int i = 0;
boolean flushedOnce = false; boolean flushedOnce = false;
@ -131,26 +131,19 @@ class WriteQueue {
// Flush each chunk so we are releasing buffers periodically. In theory this loop // Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never // might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM. // flushed in that case we would be guaranteed to OOM.
PerfMark.startTask("WriteQueue.flush0"); try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush0")) {
try {
channel.flush(); channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush0");
} }
flushedOnce = true; flushedOnce = true;
} }
} }
// Must flush at least once, even if there were no writes. // Must flush at least once, even if there were no writes.
if (i != 0 || !flushedOnce) { if (i != 0 || !flushedOnce) {
PerfMark.startTask("WriteQueue.flush1"); try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush1")) {
try {
channel.flush(); channel.flush();
} finally {
PerfMark.stopTask("WriteQueue.flush1");
} }
} }
} finally { } finally {
PerfMark.stopTask("WriteQueue.periodicFlush");
// Mark the write as done, if the queue is non-empty after marking trigger a new write. // Mark the write as done, if the queue is non-empty after marking trigger a new write.
scheduled.set(false); scheduled.set(false);
if (!queue.isEmpty()) { if (!queue.isEmpty()) {

View File

@ -26,6 +26,7 @@ import io.grpc.okhttp.internal.framed.FrameWriter;
import io.grpc.okhttp.internal.framed.Settings; import io.grpc.okhttp.internal.framed.Settings;
import io.perfmark.Link; import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -100,8 +101,7 @@ final class AsyncSink implements Sink {
if (closed) { if (closed) {
throw new IOException("closed"); throw new IOException("closed");
} }
PerfMark.startTask("AsyncSink.write"); try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.write")) {
try {
boolean closeSocket = false; boolean closeSocket = false;
synchronized (lock) { synchronized (lock) {
buffer.write(source, byteCount); buffer.write(source, byteCount);
@ -130,10 +130,9 @@ final class AsyncSink implements Sink {
final Link link = PerfMark.linkOut(); final Link link = PerfMark.linkOut();
@Override @Override
public void doRun() throws IOException { public void doRun() throws IOException {
PerfMark.startTask("WriteRunnable.runWrite");
PerfMark.linkIn(link);
Buffer buf = new Buffer(); Buffer buf = new Buffer();
try { try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runWrite")) {
PerfMark.linkIn(link);
int writingControlFrames; int writingControlFrames;
synchronized (lock) { synchronized (lock) {
buf.write(buffer, buffer.completeSegmentByteCount()); buf.write(buffer, buffer.completeSegmentByteCount());
@ -146,13 +145,9 @@ final class AsyncSink implements Sink {
synchronized (lock) { synchronized (lock) {
queuedControlFrames -= writingControlFrames; queuedControlFrames -= writingControlFrames;
} }
} finally {
PerfMark.stopTask("WriteRunnable.runWrite");
} }
} }
}); });
} finally {
PerfMark.stopTask("AsyncSink.write");
} }
} }
@ -161,8 +156,7 @@ final class AsyncSink implements Sink {
if (closed) { if (closed) {
throw new IOException("closed"); throw new IOException("closed");
} }
PerfMark.startTask("AsyncSink.flush"); try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.flush")) {
try {
synchronized (lock) { synchronized (lock) {
if (flushEnqueued) { if (flushEnqueued) {
return; return;
@ -173,23 +167,18 @@ final class AsyncSink implements Sink {
final Link link = PerfMark.linkOut(); final Link link = PerfMark.linkOut();
@Override @Override
public void doRun() throws IOException { public void doRun() throws IOException {
PerfMark.startTask("WriteRunnable.runFlush");
PerfMark.linkIn(link);
Buffer buf = new Buffer(); Buffer buf = new Buffer();
try { try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runFlush")) {
PerfMark.linkIn(link);
synchronized (lock) { synchronized (lock) {
buf.write(buffer, buffer.size()); buf.write(buffer, buffer.size());
flushEnqueued = false; flushEnqueued = false;
} }
sink.write(buf, buf.size()); sink.write(buf, buf.size());
sink.flush(); sink.flush();
} finally {
PerfMark.stopTask("WriteRunnable.runFlush");
} }
} }
}); });
} finally {
PerfMark.stopTask("AsyncSink.flush");
} }
} }

View File

@ -35,6 +35,7 @@ import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header; import io.grpc.okhttp.internal.framed.Header;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.util.List; import java.util.List;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import okio.Buffer; import okio.Buffer;
@ -139,55 +140,46 @@ class OkHttpClientStream extends AbstractClientStream {
class Sink implements AbstractClientStream.Sink { class Sink implements AbstractClientStream.Sink {
@Override @Override
public void writeHeaders(Metadata metadata, byte[] payload) { public void writeHeaders(Metadata metadata, byte[] payload) {
PerfMark.startTask("OkHttpClientStream$Sink.writeHeaders"); try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeHeaders")) {
String defaultPath = "/" + method.getFullMethodName(); String defaultPath = "/" + method.getFullMethodName();
if (payload != null) { if (payload != null) {
useGet = true; useGet = true;
defaultPath += "?" + BaseEncoding.base64().encode(payload); defaultPath += "?" + BaseEncoding.base64().encode(payload);
} }
try {
synchronized (state.lock) { synchronized (state.lock) {
state.streamReady(metadata, defaultPath); state.streamReady(metadata, defaultPath);
} }
} finally {
PerfMark.stopTask("OkHttpClientStream$Sink.writeHeaders");
} }
} }
@Override @Override
public void writeFrame( public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
PerfMark.startTask("OkHttpClientStream$Sink.writeFrame"); try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeFrame")) {
Buffer buffer; Buffer buffer;
if (frame == null) { if (frame == null) {
buffer = EMPTY_BUFFER; buffer = EMPTY_BUFFER;
} else { } else {
buffer = ((OkHttpWritableBuffer) frame).buffer(); buffer = ((OkHttpWritableBuffer) frame).buffer();
int size = (int) buffer.size(); int size = (int) buffer.size();
if (size > 0) { if (size > 0) {
onSendingBytes(size); onSendingBytes(size);
}
} }
}
try {
synchronized (state.lock) { synchronized (state.lock) {
state.sendBuffer(buffer, endOfStream, flush); state.sendBuffer(buffer, endOfStream, flush);
getTransportTracer().reportMessageSent(numMessages); getTransportTracer().reportMessageSent(numMessages);
} }
} finally {
PerfMark.stopTask("OkHttpClientStream$Sink.writeFrame");
} }
} }
@Override @Override
public void cancel(Status reason) { public void cancel(Status reason) {
PerfMark.startTask("OkHttpClientStream$Sink.cancel"); try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) {
try {
synchronized (state.lock) { synchronized (state.lock) {
state.cancel(reason, true, null); state.cancel(reason, true, null);
} }
} finally {
PerfMark.stopTask("OkHttpClientStream$Sink.cancel");
} }
} }
} }

View File

@ -28,6 +28,7 @@ import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header; import io.grpc.okhttp.internal.framed.Header;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag; import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
import java.util.List; import java.util.List;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import okio.Buffer; import okio.Buffer;
@ -83,58 +84,49 @@ class OkHttpServerStream extends AbstractServerStream {
class Sink implements AbstractServerStream.Sink { class Sink implements AbstractServerStream.Sink {
@Override @Override
public void writeHeaders(Metadata metadata) { public void writeHeaders(Metadata metadata) {
PerfMark.startTask("OkHttpServerStream$Sink.writeHeaders"); try (TaskCloseable ignore =
try { PerfMark.traceTask("OkHttpServerStream$Sink.writeHeaders")) {
List<Header> responseHeaders = Headers.createResponseHeaders(metadata); List<Header> responseHeaders = Headers.createResponseHeaders(metadata);
synchronized (state.lock) { synchronized (state.lock) {
state.sendHeaders(responseHeaders); state.sendHeaders(responseHeaders);
} }
} finally {
PerfMark.stopTask("OkHttpServerStream$Sink.writeHeaders");
} }
} }
@Override @Override
public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) { public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) {
PerfMark.startTask("OkHttpServerStream$Sink.writeFrame"); try (TaskCloseable ignore =
Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); PerfMark.traceTask("OkHttpServerStream$Sink.writeFrame")) {
int size = (int) buffer.size(); Buffer buffer = ((OkHttpWritableBuffer) frame).buffer();
if (size > 0) { int size = (int) buffer.size();
onSendingBytes(size); if (size > 0) {
} onSendingBytes(size);
}
try {
synchronized (state.lock) { synchronized (state.lock) {
state.sendBuffer(buffer, flush); state.sendBuffer(buffer, flush);
transportTracer.reportMessageSent(numMessages); transportTracer.reportMessageSent(numMessages);
} }
} finally {
PerfMark.stopTask("OkHttpServerStream$Sink.writeFrame");
} }
} }
@Override @Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
PerfMark.startTask("OkHttpServerStream$Sink.writeTrailers"); try (TaskCloseable ignore =
try { PerfMark.traceTask("OkHttpServerStream$Sink.writeTrailers")) {
List<Header> responseTrailers = Headers.createResponseTrailers(trailers, headersSent); List<Header> responseTrailers = Headers.createResponseTrailers(trailers, headersSent);
synchronized (state.lock) { synchronized (state.lock) {
state.sendTrailers(responseTrailers); state.sendTrailers(responseTrailers);
} }
} finally {
PerfMark.stopTask("OkHttpServerStream$Sink.writeTrailers");
} }
} }
@Override @Override
public void cancel(Status reason) { public void cancel(Status reason) {
PerfMark.startTask("OkHttpServerStream$Sink.cancel"); try (TaskCloseable ignore =
try { PerfMark.traceTask("OkHttpServerStream$Sink.cancel")) {
synchronized (state.lock) { synchronized (state.lock) {
state.cancel(ErrorCode.CANCEL, reason); state.cancel(ErrorCode.CANCEL, reason);
} }
} finally {
PerfMark.stopTask("OkHttpServerStream$Sink.cancel");
} }
} }
} }