This commit is contained in:
François JACQUES 2025-07-15 21:02:33 -07:00 committed by GitHub
commit 6ebfd7ae9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 55 additions and 57 deletions

View File

@ -1095,9 +1095,9 @@ public abstract class AbstractTransportTest {
Metadata clientStreamTrailers =
clientStreamListener.awaitTrailers(TIMEOUT_MS, TimeUnit.MILLISECONDS);
checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
assertAsciiMetadataValuesEqual(
trailers.getAll(asciiKey),
clientStreamTrailers.getAll(asciiKey));
assertEquals(
Lists.newArrayList(trailers.getAll(binaryKey)),
Lists.newArrayList(clientStreamTrailers.getAll(binaryKey)));

View File

@ -69,7 +69,7 @@ public class JettyTransportTest extends AbstractTransportTest {
listener.transportCreated(new ServletServerBuilder.ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories,
Integer.MAX_VALUE);
Integer.MAX_VALUE, false);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
jettyServer = new Server(0);

View File

@ -77,15 +77,18 @@ public final class ServletAdapter {
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final int maxInboundMessageSize;
private final Attributes attributes;
private final boolean forceTrailers;
ServletAdapter(
ServerTransportListener transportListener,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
int maxInboundMessageSize) {
int maxInboundMessageSize,
boolean forceTrailers) {
this.transportListener = transportListener;
this.streamTracerFactories = streamTracerFactories;
this.maxInboundMessageSize = maxInboundMessageSize;
attributes = transportListener.transportReady(Attributes.EMPTY);
this.forceTrailers = forceTrailers;
}
/**
@ -148,7 +151,8 @@ public final class ServletAdapter {
new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
.build(),
getAuthority(req),
logId);
logId,
forceTrailers);
transportListener.streamCreated(stream, method, headers);
stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);

View File

@ -72,6 +72,7 @@ public final class ServletServerBuilder extends ForwardingServerBuilder<ServletS
private boolean internalCaller;
private boolean usingCustomScheduler;
private InternalServerImpl internalServer;
private boolean forceTrailers;
public ServletServerBuilder() {
serverImplBuilder = new ServerImplBuilder(this::buildTransportServers);
@ -98,7 +99,8 @@ public final class ServletServerBuilder extends ForwardingServerBuilder<ServletS
* Creates a {@link ServletAdapter}.
*/
public ServletAdapter buildServletAdapter() {
return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize);
return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize,
forceTrailers);
}
/**
@ -183,6 +185,19 @@ public final class ServletServerBuilder extends ForwardingServerBuilder<ServletS
return this;
}
/**
* Some servlet containers don't support sending trailers only (Tomcat).
* They send an empty data frame with an end_stream flag.
* This is not supported by gRPC as is expects end_stream flag in trailer or trailer-only frame
* To avoid this empty data frame, force the servlet container to either
* - send a header frame, an empty data frame and a trailer frame with end_stream (Tomcat)
* - send a header frame and a trailer frame with end_stream (Jetty, Undertow)
*/
public ServletServerBuilder forceTrailers(boolean forceTrailers) {
this.forceTrailers = forceTrailers;
return this;
}
/**
* Provides a custom scheduled executor service to the server builder.
*

View File

@ -64,6 +64,7 @@ final class ServletServerStream extends AbstractServerStream {
private final String authority;
private final InternalLogId logId;
private final AsyncServletOutputStreamWriter writer;
private final boolean forceTrailers;
ServletServerStream(
AsyncContext asyncCtx,
@ -71,7 +72,8 @@ final class ServletServerStream extends AbstractServerStream {
int maxInboundMessageSize,
Attributes attributes,
String authority,
InternalLogId logId) throws IOException {
InternalLogId logId,
boolean forceTrailers) throws IOException {
super(ByteArrayWritableBuffer::new, statsTraceCtx);
transportState =
new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
@ -82,6 +84,7 @@ final class ServletServerStream extends AbstractServerStream {
this.resp = (HttpServletResponse) asyncCtx.getResponse();
this.writer = new AsyncServletOutputStreamWriter(
asyncCtx, transportState, logId);
this.forceTrailers = forceTrailers;
resp.getOutputStream().setWriteListener(new GrpcWriteListener());
}
@ -276,20 +279,30 @@ final class ServletServerStream extends AbstractServerStream {
new Object[] {logId, trailers, headersSent, status});
}
if (!headersSent) {
writeHeadersToServletResponse(trailers);
} else {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
trailerSupplier.get().putIfAbsent(key, newValue);
if (forceTrailers) {
writeHeadersToServletResponse(new Metadata());
resp.setTrailerFields(trailerSupplier);
serializeTrailers(trailers);
} else {
writeHeadersToServletResponse(trailers);
}
} else {
serializeTrailers(trailers);
}
writer.complete();
}
private void serializeTrailers(Metadata trailers) {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
trailerSupplier.get().putIfAbsent(key, newValue);
}
}
@Override
public void cancel(Status status) {
if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {

View File

@ -60,7 +60,9 @@ public class TomcatInteropTest extends AbstractInteropTest {
@Override
protected ServerBuilder<?> getServerBuilder() {
return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
return new ServletServerBuilder()
.forceTrailers(true)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
}
@Override
@ -113,27 +115,4 @@ public class TomcatInteropTest extends AbstractInteropTest {
@Test
public void gracefulShutdown() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void specialStatusMessage() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void unimplementedMethod() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void statusCodeAndMessage() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void emptyStream() {}
}

View File

@ -81,7 +81,8 @@ public class TomcatTransportTest extends AbstractTransportTest {
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE,
true);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
tomcatServer = new Tomcat();
@ -255,21 +256,6 @@ public class TomcatTransportTest extends AbstractTransportTest {
@Test
public void clientCancel() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_noServerHeaders() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() {}
@Override
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test

View File

@ -100,7 +100,8 @@ public class UndertowTransportTest extends AbstractTransportTest {
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE,
false);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
InstanceFactory<? extends Servlet> instanceFactory =
() -> new ImmediateInstanceHandle<>(grpcServlet);