interop-testing: Move soak out of AbstractInteropTest

The soak code grew considerably in 6a92a2a22e. Since it isn't a JUnit
test and doesn't resemble the other tests, it doesn't belong in
AbstractInteropTest. AbstractInteropTest has lots of users, including it
being re-compiled for use on Android, so moving it out makes the
remaining code more clear for the more common cases.
This commit is contained in:
Eric Anderson 2025-01-14 07:15:29 -08:00
parent 87b27b1545
commit 87c7b7a375
5 changed files with 330 additions and 269 deletions

View File

@ -73,7 +73,6 @@ dependencies {
project(':grpc-protobuf-lite'),
project(':grpc-stub'),
project(':grpc-testing'),
libraries.hdrhistogram,
libraries.junit,
libraries.truth,
libraries.androidx.test.rules,

View File

@ -28,7 +28,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
@ -120,7 +119,6 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.HdrHistogram.Histogram;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@ -1681,235 +1679,6 @@ public abstract class AbstractInteropTest {
assertNotNull(obtainLocalClientAddr());
}
private static class SoakIterationResult {
public SoakIterationResult(long latencyMs, Status status) {
this.latencyMs = latencyMs;
this.status = status;
}
public long getLatencyMs() {
return latencyMs;
}
public Status getStatus() {
return status;
}
private long latencyMs = -1;
private Status status = Status.OK;
}
private static class ThreadResults {
private int threadFailures = 0;
private int iterationsDone = 0;
private Histogram latencies = new Histogram(4);
public int getThreadFailures() {
return threadFailures;
}
public int getIterationsDone() {
return iterationsDone;
}
public Histogram getLatencies() {
return latencies;
}
}
private SoakIterationResult performOneSoakIteration(
TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize)
throws InterruptedException {
long startNs = System.nanoTime();
Status status = Status.OK;
try {
final SimpleRequest request =
SimpleRequest.newBuilder()
.setResponseSize(soakResponseSize)
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize])))
.build();
final SimpleResponse goldenResponse =
SimpleResponse.newBuilder()
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize])))
.build();
assertResponse(goldenResponse, soakStub.unaryCall(request));
} catch (StatusRuntimeException e) {
status = e.getStatus();
}
long elapsedNs = System.nanoTime() - startNs;
return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status);
}
/**
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
*/
public void performSoakTest(
String serverUri,
int soakIterations,
int maxFailures,
int maxAcceptablePerIterationLatencyMs,
int minTimeMsBetweenRpcs,
int overallTimeoutSeconds,
int soakRequestSize,
int soakResponseSize,
int numThreads,
Function<ManagedChannel, ManagedChannel> createNewChannel)
throws InterruptedException {
if (soakIterations % numThreads != 0) {
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
}
ManagedChannel sharedChannel = createChannel();
long startNs = System.nanoTime();
Thread[] threads = new Thread[numThreads];
int soakIterationsPerThread = soakIterations / numThreads;
List<ThreadResults> threadResultsList = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threadResultsList.add(new ThreadResults());
}
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
final int currentThreadInd = threadInd;
threads[threadInd] = new Thread(() -> {
try {
executeSoakTestInThread(
soakIterationsPerThread,
startNs,
minTimeMsBetweenRpcs,
soakRequestSize,
soakResponseSize,
maxAcceptablePerIterationLatencyMs,
overallTimeoutSeconds,
serverUri,
threadResultsList.get(currentThreadInd),
sharedChannel,
createNewChannel);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted: " + e.getMessage(), e);
}
});
threads[threadInd].start();
}
for (Thread thread : threads) {
thread.join();
}
int totalFailures = 0;
int iterationsDone = 0;
Histogram latencies = new Histogram(4);
for (ThreadResults threadResult :threadResultsList) {
totalFailures += threadResult.getThreadFailures();
iterationsDone += threadResult.getIterationsDone();
latencies.add(threadResult.getLatencies());
}
System.err.println(
String.format(
Locale.US,
"(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. "
+ "p50: %d ms, p90: %d ms, p100: %d ms",
serverUri,
iterationsDone,
soakIterations,
totalFailures,
latencies.getValueAtPercentile(50),
latencies.getValueAtPercentile(90),
latencies.getValueAtPercentile(100)));
// check if we timed out
String timeoutErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test consumed all %d seconds of time and quit early, "
+ "only having ran %d out of desired %d iterations.",
serverUri,
overallTimeoutSeconds,
iterationsDone,
soakIterations);
assertEquals(timeoutErrorMessage, iterationsDone, soakIterations);
// check if we had too many failures
String tooManyFailuresErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test total failures: %d exceeds max failures "
+ "threshold: %d.",
serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
shutdownChannel(sharedChannel);
}
private void shutdownChannel(ManagedChannel channel) throws InterruptedException {
if (channel != null) {
channel.shutdownNow();
channel.awaitTermination(10, TimeUnit.SECONDS);
}
}
protected ManagedChannel createNewChannel(ManagedChannel currentChannel) {
try {
shutdownChannel(currentChannel);
return createChannel();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while creating a new channel", e);
}
}
private void executeSoakTestInThread(
int soakIterationsPerThread,
long startNs,
int minTimeMsBetweenRpcs,
int soakRequestSize,
int soakResponseSize,
int maxAcceptablePerIterationLatencyMs,
int overallTimeoutSeconds,
String serverUri,
ThreadResults threadResults,
ManagedChannel sharedChannel,
Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException {
ManagedChannel currentChannel = sharedChannel;
for (int i = 0; i < soakIterationsPerThread; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
// recordClientCallInterceptor takes an AtomicReference.
AtomicReference<ClientCall<?, ?>> soakThreadClientCallCapture = new AtomicReference<>();
currentChannel = maybeCreateChannel.apply(currentChannel);
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture));
SoakIterationResult result = performOneSoakIteration(currentStub,
soakRequestSize, soakResponseSize);
SocketAddress peer = soakThreadClientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
Thread.currentThread().getId(),
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
threadResults.threadFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
threadResults.threadFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
threadResults.iterationsDone++;
threadResults.getLatencies().recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
}
}
private static void assertSuccess(StreamRecorder<?> recorder) {
if (recorder.getError() != null) {
throw new AssertionError(recorder.getError());

View File

@ -0,0 +1,295 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.testing.integration.Messages.Payload;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.HdrHistogram.Histogram;
/**
* Shared implementation for rpc_soak and channel_soak. Unlike the tests in AbstractInteropTest,
* these "test cases" are only intended to be run from the command line. They don't fit the regular
* test patterns of AbstractInteropTest.
* https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#rpc_soak
*/
final class SoakClient {
private static class SoakIterationResult {
public SoakIterationResult(long latencyMs, Status status) {
this.latencyMs = latencyMs;
this.status = status;
}
public long getLatencyMs() {
return latencyMs;
}
public Status getStatus() {
return status;
}
private long latencyMs = -1;
private Status status = Status.OK;
}
private static class ThreadResults {
private int threadFailures = 0;
private int iterationsDone = 0;
private Histogram latencies = new Histogram(4);
public int getThreadFailures() {
return threadFailures;
}
public int getIterationsDone() {
return iterationsDone;
}
public Histogram getLatencies() {
return latencies;
}
}
private static SoakIterationResult performOneSoakIteration(
TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize)
throws InterruptedException {
long startNs = System.nanoTime();
Status status = Status.OK;
try {
final SimpleRequest request =
SimpleRequest.newBuilder()
.setResponseSize(soakResponseSize)
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize])))
.build();
final SimpleResponse goldenResponse =
SimpleResponse.newBuilder()
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize])))
.build();
assertResponse(goldenResponse, soakStub.unaryCall(request));
} catch (StatusRuntimeException e) {
status = e.getStatus();
}
long elapsedNs = System.nanoTime() - startNs;
return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status);
}
/**
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
*/
public static void performSoakTest(
String serverUri,
int soakIterations,
int maxFailures,
int maxAcceptablePerIterationLatencyMs,
int minTimeMsBetweenRpcs,
int overallTimeoutSeconds,
int soakRequestSize,
int soakResponseSize,
int numThreads,
ManagedChannel sharedChannel,
Function<ManagedChannel, ManagedChannel> maybeCreateChannel)
throws InterruptedException {
if (soakIterations % numThreads != 0) {
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
}
long startNs = System.nanoTime();
Thread[] threads = new Thread[numThreads];
int soakIterationsPerThread = soakIterations / numThreads;
List<ThreadResults> threadResultsList = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threadResultsList.add(new ThreadResults());
}
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
final int currentThreadInd = threadInd;
threads[threadInd] = new Thread(() -> {
try {
executeSoakTestInThread(
soakIterationsPerThread,
startNs,
minTimeMsBetweenRpcs,
soakRequestSize,
soakResponseSize,
maxAcceptablePerIterationLatencyMs,
overallTimeoutSeconds,
serverUri,
threadResultsList.get(currentThreadInd),
sharedChannel,
maybeCreateChannel);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted: " + e.getMessage(), e);
}
});
threads[threadInd].start();
}
for (Thread thread : threads) {
thread.join();
}
int totalFailures = 0;
int iterationsDone = 0;
Histogram latencies = new Histogram(4);
for (ThreadResults threadResult :threadResultsList) {
totalFailures += threadResult.getThreadFailures();
iterationsDone += threadResult.getIterationsDone();
latencies.add(threadResult.getLatencies());
}
System.err.println(
String.format(
Locale.US,
"(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. "
+ "p50: %d ms, p90: %d ms, p100: %d ms",
serverUri,
iterationsDone,
soakIterations,
totalFailures,
latencies.getValueAtPercentile(50),
latencies.getValueAtPercentile(90),
latencies.getValueAtPercentile(100)));
// check if we timed out
String timeoutErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test consumed all %d seconds of time and quit early, "
+ "only having ran %d out of desired %d iterations.",
serverUri,
overallTimeoutSeconds,
iterationsDone,
soakIterations);
assertEquals(timeoutErrorMessage, iterationsDone, soakIterations);
// check if we had too many failures
String tooManyFailuresErrorMessage =
String.format(
Locale.US,
"(server_uri: %s) soak test total failures: %d exceeds max failures "
+ "threshold: %d.",
serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
sharedChannel.shutdownNow();
sharedChannel.awaitTermination(10, TimeUnit.SECONDS);
}
private static void executeSoakTestInThread(
int soakIterationsPerThread,
long startNs,
int minTimeMsBetweenRpcs,
int soakRequestSize,
int soakResponseSize,
int maxAcceptablePerIterationLatencyMs,
int overallTimeoutSeconds,
String serverUri,
ThreadResults threadResults,
ManagedChannel sharedChannel,
Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException {
ManagedChannel currentChannel = sharedChannel;
for (int i = 0; i < soakIterationsPerThread; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
// recordClientCallInterceptor takes an AtomicReference.
AtomicReference<ClientCall<?, ?>> soakThreadClientCallCapture = new AtomicReference<>();
currentChannel = maybeCreateChannel.apply(currentChannel);
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture));
SoakIterationResult result = performOneSoakIteration(currentStub,
soakRequestSize, soakResponseSize);
SocketAddress peer = soakThreadClientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
Thread.currentThread().getId(),
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
threadResults.threadFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
threadResults.threadFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
threadResults.iterationsDone++;
threadResults.getLatencies().recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
}
}
private static void assertResponse(SimpleResponse expected, SimpleResponse actual) {
assertPayload(expected.getPayload(), actual.getPayload());
assertEquals(expected.getUsername(), actual.getUsername());
assertEquals(expected.getOauthScope(), actual.getOauthScope());
}
private static void assertPayload(Payload expected, Payload actual) {
// Compare non deprecated fields in Payload, to make this test forward compatible.
if (expected == null || actual == null) {
assertEquals(expected, actual);
} else {
assertEquals(expected.getBody(), actual.getBody());
}
}
/**
* Captures the ClientCall. Useful for testing {@link ClientCall#getAttributes()}
*/
private static ClientInterceptor recordClientCallInterceptor(
final AtomicReference<ClientCall<?, ?>> clientCallCapture) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> clientCall = next.newCall(method,callOptions);
clientCallCapture.set(clientCall);
return clientCall;
}
};
}
}

View File

@ -523,7 +523,7 @@ public class TestServiceClient {
}
case RPC_SOAK: {
tester.performSoakTest(
SoakClient.performSoakTest(
serverHost,
soakIterations,
soakMaxFailures,
@ -533,12 +533,13 @@ public class TestServiceClient {
soakRequestSize,
soakResponseSize,
numThreads,
tester.createChannelBuilder().build(),
(currentChannel) -> currentChannel);
break;
}
case CHANNEL_SOAK: {
tester.performSoakTest(
SoakClient.performSoakTest(
serverHost,
soakIterations,
soakMaxFailures,
@ -548,6 +549,7 @@ public class TestServiceClient {
soakRequestSize,
soakResponseSize,
numThreads,
tester.createChannelBuilder().build(),
(currentChannel) -> tester.createNewChannel(currentChannel));
break;
}
@ -711,6 +713,16 @@ public class TestServiceClient {
return okBuilder.intercept(createCensusStatsClientInterceptor());
}
ManagedChannel createNewChannel(ManagedChannel currentChannel) {
currentChannel.shutdownNow();
try {
currentChannel.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while creating a new channel", e);
}
return createChannel();
}
/**
* Assuming "pick_first" policy is used, tests that all requests are sent to the same server.
*/

View File

@ -22,9 +22,10 @@ import static org.junit.Assert.assertTrue;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.alts.ComputeEngineChannelCredentials;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
@ -44,26 +45,8 @@ public final class XdsFederationTestClient {
public static void main(String[] args) throws Exception {
final XdsFederationTestClient client = new XdsFederationTestClient();
client.parseArgs(args);
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
@SuppressWarnings("CatchAndPrintStackTrace")
public void run() {
System.out.println("Shutting down");
try {
client.tearDown();
} catch (RuntimeException e) {
e.printStackTrace();
}
}
});
client.setUp();
try {
client.run();
} finally {
client.tearDown();
}
client.run();
System.exit(0);
}
@ -209,22 +192,13 @@ public final class XdsFederationTestClient {
for (int i = 0; i < uris.length; i++) {
clients.add(new InnerClient(creds[i], uris[i]));
}
for (InnerClient c : clients) {
c.setUp();
}
}
private synchronized void tearDown() {
for (InnerClient c : clients) {
c.tearDown();
}
}
/**
* Wraps a single client stub configuration and executes a
* soak test case with that configuration.
*/
class InnerClient extends AbstractInteropTest {
class InnerClient {
private final String credentialsType;
private final String serverUri;
private boolean runSucceeded = false;
@ -249,7 +223,7 @@ public final class XdsFederationTestClient {
try {
switch (testCase) {
case "rpc_soak": {
performSoakTest(
SoakClient.performSoakTest(
serverUri,
soakIterations,
soakMaxFailures,
@ -259,11 +233,12 @@ public final class XdsFederationTestClient {
soakRequestSize,
soakResponseSize,
1,
createChannel(),
(currentChannel) -> currentChannel);
}
break;
case "channel_soak": {
performSoakTest(
SoakClient.performSoakTest(
serverUri,
soakIterations,
soakMaxFailures,
@ -273,6 +248,7 @@ public final class XdsFederationTestClient {
soakRequestSize,
soakResponseSize,
1,
createChannel(),
(currentChannel) -> createNewChannel(currentChannel));
}
break;
@ -288,8 +264,7 @@ public final class XdsFederationTestClient {
}
}
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
ManagedChannel createChannel() {
ChannelCredentials channelCredentials;
switch (credentialsType) {
case "compute_engine_channel_creds":
@ -303,7 +278,18 @@ public final class XdsFederationTestClient {
}
return Grpc.newChannelBuilder(serverUri, channelCredentials)
.keepAliveTime(3600, SECONDS)
.keepAliveTimeout(20, SECONDS);
.keepAliveTimeout(20, SECONDS)
.build();
}
ManagedChannel createNewChannel(ManagedChannel currentChannel) {
currentChannel.shutdownNow();
try {
currentChannel.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while creating a new channel", e);
}
return createChannel();
}
}