mirror of https://github.com/grpc/grpc-java.git
xds: Allow FaultFilter's interceptor to be reused
This is the only usage of PickSubchannelArgs when creating a filter's ClientInterceptor, and a follow-up commit will remove the argument and actually reuse the interceptors. Other filter's interceptors can already be reused. There doesn't seem to be any significant loss of legibility by making FaultFilter a more ordinary interceptor, but the change does cause the ForwardingClientCall to be present when faultDelay is configured, independent of whether the fault delay ends up being triggered. Reusing interceptors will move more state management out of the RPC path which will be more relevant with RLQS.
This commit is contained in:
parent
9e8629914f
commit
b3db8c2489
|
@ -190,29 +190,30 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
|
||||||
config = overrideConfig;
|
config = overrideConfig;
|
||||||
}
|
}
|
||||||
FaultConfig faultConfig = (FaultConfig) config;
|
FaultConfig faultConfig = (FaultConfig) config;
|
||||||
Long delayNanos = null;
|
|
||||||
Status abortStatus = null;
|
|
||||||
if (faultConfig.maxActiveFaults() == null
|
|
||||||
|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
|
|
||||||
Metadata headers = args.getHeaders();
|
|
||||||
if (faultConfig.faultDelay() != null) {
|
|
||||||
delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
|
|
||||||
}
|
|
||||||
if (faultConfig.faultAbort() != null) {
|
|
||||||
abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (delayNanos == null && abortStatus == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
final Long finalDelayNanos = delayNanos;
|
|
||||||
final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus);
|
|
||||||
|
|
||||||
final class FaultInjectionInterceptor implements ClientInterceptor {
|
final class FaultInjectionInterceptor implements ClientInterceptor {
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||||
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
|
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
|
||||||
final Channel next) {
|
final Channel next) {
|
||||||
|
boolean checkFault = false;
|
||||||
|
if (faultConfig.maxActiveFaults() == null
|
||||||
|
|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
|
||||||
|
checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
|
||||||
|
}
|
||||||
|
if (!checkFault) {
|
||||||
|
return next.newCall(method, callOptions);
|
||||||
|
}
|
||||||
|
final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
|
||||||
|
private ClientCall<ReqT, RespT> delegate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClientCall<ReqT, RespT> delegate() {
|
||||||
|
return delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(Listener<RespT> listener, Metadata headers) {
|
||||||
Executor callExecutor = callOptions.getExecutor();
|
Executor callExecutor = callOptions.getExecutor();
|
||||||
if (callExecutor == null) { // This should never happen in practice because
|
if (callExecutor == null) { // This should never happen in practice because
|
||||||
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
|
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
|
||||||
|
@ -220,11 +221,23 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
|
||||||
// TODO(https://github.com/grpc/grpc-java/issues/7868)
|
// TODO(https://github.com/grpc/grpc-java/issues/7868)
|
||||||
callExecutor = MoreExecutors.directExecutor();
|
callExecutor = MoreExecutors.directExecutor();
|
||||||
}
|
}
|
||||||
if (finalDelayNanos != null) {
|
|
||||||
|
Long delayNanos;
|
||||||
|
Status abortStatus = null;
|
||||||
|
if (faultConfig.faultDelay() != null) {
|
||||||
|
delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
|
||||||
|
} else {
|
||||||
|
delayNanos = null;
|
||||||
|
}
|
||||||
|
if (faultConfig.faultAbort() != null) {
|
||||||
|
abortStatus = getAbortStatusWithDescription(
|
||||||
|
determineFaultAbortStatus(faultConfig.faultAbort(), headers));
|
||||||
|
}
|
||||||
|
|
||||||
Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
|
Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
|
||||||
if (finalAbortStatus != null) {
|
if (abortStatus != null) {
|
||||||
callSupplier = Suppliers.ofInstance(
|
callSupplier = Suppliers.ofInstance(
|
||||||
new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor));
|
new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
|
||||||
} else {
|
} else {
|
||||||
callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
|
callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -233,17 +246,15 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
final DelayInjectedCall<ReqT, RespT> delayInjectedCall = new DelayInjectedCall<>(
|
if (delayNanos == null) {
|
||||||
finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
|
delegate = callSupplier.get();
|
||||||
|
delegate().start(listener, headers);
|
||||||
final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
|
return;
|
||||||
@Override
|
|
||||||
protected ClientCall<ReqT, RespT> delegate() {
|
|
||||||
return delayInjectedCall;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
delegate = new DelayInjectedCall<>(
|
||||||
public void start(Listener<RespT> listener, Metadata headers) {
|
delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
|
||||||
|
|
||||||
Listener<RespT> finalListener =
|
Listener<RespT> finalListener =
|
||||||
new SimpleForwardingClientCallListener<RespT>(listener) {
|
new SimpleForwardingClientCallListener<RespT>(listener) {
|
||||||
@Override
|
@Override
|
||||||
|
@ -258,7 +269,7 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
|
||||||
String description = String.format(
|
String description = String.format(
|
||||||
Locale.US,
|
Locale.US,
|
||||||
"Deadline exceeded after up to %d ns of fault-injected delay",
|
"Deadline exceeded after up to %d ns of fault-injected delay",
|
||||||
finalDelayNanos);
|
delayNanos);
|
||||||
if (status.getDescription() != null) {
|
if (status.getDescription() != null) {
|
||||||
description = description + ": " + status.getDescription();
|
description = description + ": " + status.getDescription();
|
||||||
}
|
}
|
||||||
|
@ -275,9 +286,6 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
return new DeadlineInsightForwardingCall();
|
return new DeadlineInsightForwardingCall();
|
||||||
} else {
|
|
||||||
return new FailingClientCall<>(finalAbortStatus, callExecutor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue