interop-testing: fix race in TestServiceImpl

This commit is contained in:
Eric Gribkoff 2018-01-10 14:27:15 -08:00 committed by GitHub
parent af0283477d
commit d0bbeced80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 4 deletions

View File

@ -179,7 +179,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
public void onNext(StreamingOutputCallRequest request) { public void onNext(StreamingOutputCallRequest request) {
if (request.hasResponseStatus()) { if (request.hasResponseStatus()) {
dispatcher.cancel(); dispatcher.cancel();
responseObserver.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
.withDescription(request.getResponseStatus().getMessage()) .withDescription(request.getResponseStatus().getMessage())
.asRuntimeException()); .asRuntimeException());
return; return;
@ -197,7 +197,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override @Override
public void onError(Throwable cause) { public void onError(Throwable cause) {
responseObserver.onError(cause); dispatcher.onError(cause);
} }
}; };
} }
@ -209,6 +209,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override @Override
public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall( public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
final Queue<Chunk> chunks = new ArrayDeque<Chunk>(); final Queue<Chunk> chunks = new ArrayDeque<Chunk>();
return new StreamObserver<StreamingOutputCallRequest>() { return new StreamObserver<StreamingOutputCallRequest>() {
@Override @Override
@ -219,12 +220,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override @Override
public void onCompleted() { public void onCompleted() {
// Dispatch all of the chunks in one shot. // Dispatch all of the chunks in one shot.
new ResponseDispatcher(responseObserver).enqueue(chunks).completeInput(); dispatcher.enqueue(chunks).completeInput();
} }
@Override @Override
public void onError(Throwable cause) { public void onError(Throwable cause) {
responseObserver.onError(cause); dispatcher.onError(cause);
} }
}; };
} }
@ -269,6 +270,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
} }
}; };
/**
* The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
* {@link StreamObserver} must be synchronized across threads, no further calls should be made
* directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
*/
public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) { public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
this.chunks = Queues.newLinkedBlockingQueue(); this.chunks = Queues.newLinkedBlockingQueue();
this.responseStream = responseStream; this.responseStream = responseStream;
@ -309,6 +315,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
return cancelled; return cancelled;
} }
private synchronized void onError(Throwable cause) {
responseStream.onError(cause);
}
/** /**
* Dispatches the current response chunk to the client. This is only called by the executor. At * Dispatches the current response chunk to the client. This is only called by the executor. At
* any time, a given dispatch task should only be registered with the executor once. * any time, a given dispatch task should only be registered with the executor once.