From d0bbeced806fa8c8bef7f693beab0ee29ca28d04 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 10 Jan 2018 14:27:15 -0800 Subject: [PATCH] interop-testing: fix race in TestServiceImpl --- .../testing/integration/TestServiceImpl.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 40dd78751a..6451b917d2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -179,7 +179,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { public void onNext(StreamingOutputCallRequest request) { if (request.hasResponseStatus()) { dispatcher.cancel(); - responseObserver.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) + dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) .withDescription(request.getResponseStatus().getMessage()) .asRuntimeException()); return; @@ -197,7 +197,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onError(Throwable cause) { - responseObserver.onError(cause); + dispatcher.onError(cause); } }; } @@ -209,6 +209,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public StreamObserver halfDuplexCall( final StreamObserver responseObserver) { + final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); final Queue chunks = new ArrayDeque(); return new StreamObserver() { @Override @@ -219,12 +220,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onCompleted() { // Dispatch all of the chunks in one shot. - new ResponseDispatcher(responseObserver).enqueue(chunks).completeInput(); + dispatcher.enqueue(chunks).completeInput(); } @Override public void onError(Throwable cause) { - responseObserver.onError(cause); + dispatcher.onError(cause); } }; } @@ -269,6 +270,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { } }; + /** + * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to + * {@link StreamObserver} must be synchronized across threads, no further calls should be made + * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}. + */ public ResponseDispatcher(StreamObserver responseStream) { this.chunks = Queues.newLinkedBlockingQueue(); this.responseStream = responseStream; @@ -309,6 +315,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return cancelled; } + private synchronized void onError(Throwable cause) { + responseStream.onError(cause); + } + /** * Dispatches the current response chunk to the client. This is only called by the executor. At * any time, a given dispatch task should only be registered with the executor once.