mirror of https://github.com/grpc/grpc-java.git
Merge branch 'grpc:master' into Issue_fixed_12142
This commit is contained in:
commit
f9e6703580
|
@ -1189,6 +1189,10 @@ public abstract class LoadBalancer {
|
||||||
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
|
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
|
||||||
* as that the callback methods on the {@link LoadBalancer} interface are run in.
|
* as that the callback methods on the {@link LoadBalancer} interface are run in.
|
||||||
*
|
*
|
||||||
|
* <p>Work added to the synchronization context might not run immediately, so LB implementations
|
||||||
|
* must be careful to ensure that any assumptions still hold when it is executed. In particular,
|
||||||
|
* the LB might have been shut down or subchannels might have changed state.
|
||||||
|
*
|
||||||
* <p>Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a
|
* <p>Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a
|
||||||
* {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your
|
* {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your
|
||||||
* convenience.
|
* convenience.
|
||||||
|
|
|
@ -239,6 +239,9 @@ public abstract class NameResolver {
|
||||||
* {@link ResolutionResult#getAddressesOrError()} is empty, {@link #onError(Status)} will be
|
* {@link ResolutionResult#getAddressesOrError()} is empty, {@link #onError(Status)} will be
|
||||||
* called.
|
* called.
|
||||||
*
|
*
|
||||||
|
* <p>Newer NameResolver implementations should prefer calling onResult2. This method exists to
|
||||||
|
* facilitate older {@link Listener} implementations to migrate to {@link Listener2}.
|
||||||
|
*
|
||||||
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
|
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
|
||||||
* @since 1.21.0
|
* @since 1.21.0
|
||||||
*/
|
*/
|
||||||
|
@ -248,6 +251,10 @@ public abstract class NameResolver {
|
||||||
* Handles a name resolving error from the resolver. The listener is responsible for eventually
|
* Handles a name resolving error from the resolver. The listener is responsible for eventually
|
||||||
* invoking {@link NameResolver#refresh()} to re-attempt resolution.
|
* invoking {@link NameResolver#refresh()} to re-attempt resolution.
|
||||||
*
|
*
|
||||||
|
* <p>New NameResolver implementations should prefer calling onResult2 which will have the
|
||||||
|
* address resolution error in {@link ResolutionResult}'s addressesOrError. This method exists
|
||||||
|
* to facilitate older implementations using {@link Listener} to migrate to {@link Listener2}.
|
||||||
|
*
|
||||||
* @param error a non-OK status
|
* @param error a non-OK status
|
||||||
* @since 1.21.0
|
* @since 1.21.0
|
||||||
*/
|
*/
|
||||||
|
@ -255,9 +262,14 @@ public abstract class NameResolver {
|
||||||
public abstract void onError(Status error);
|
public abstract void onError(Status error);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles updates on resolved addresses and attributes.
|
* Handles updates on resolved addresses and attributes. Must be called from the same
|
||||||
|
* {@link SynchronizationContext} available in {@link NameResolver.Args} that is passed
|
||||||
|
* from the channel.
|
||||||
*
|
*
|
||||||
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
|
* @param resolutionResult the resolved server addresses or error in address resolution,
|
||||||
|
* attributes, and Service Config or error
|
||||||
|
* @return status indicating whether the resolutionResult was accepted by the listener,
|
||||||
|
* typically the result from a load balancer.
|
||||||
* @since 1.66
|
* @since 1.66
|
||||||
*/
|
*/
|
||||||
public Status onResult2(ResolutionResult resolutionResult) {
|
public Status onResult2(ResolutionResult resolutionResult) {
|
||||||
|
|
|
@ -1967,6 +1967,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
public void requestConnection() {
|
public void requestConnection() {
|
||||||
syncContext.throwIfNotInThisSynchronizationContext();
|
syncContext.throwIfNotInThisSynchronizationContext();
|
||||||
checkState(started, "not started");
|
checkState(started, "not started");
|
||||||
|
if (shutdown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
subchannel.obtainActiveTransport();
|
subchannel.obtainActiveTransport();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,7 +134,7 @@ final class PickFirstLoadBalancer extends LoadBalancer {
|
||||||
SubchannelPicker picker;
|
SubchannelPicker picker;
|
||||||
switch (newState) {
|
switch (newState) {
|
||||||
case IDLE:
|
case IDLE:
|
||||||
picker = new RequestConnectionPicker(subchannel);
|
picker = new RequestConnectionPicker();
|
||||||
break;
|
break;
|
||||||
case CONNECTING:
|
case CONNECTING:
|
||||||
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
|
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
|
||||||
|
@ -197,22 +197,12 @@ final class PickFirstLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
/** Picker that requests connection during the first pick, and returns noResult. */
|
/** Picker that requests connection during the first pick, and returns noResult. */
|
||||||
private final class RequestConnectionPicker extends SubchannelPicker {
|
private final class RequestConnectionPicker extends SubchannelPicker {
|
||||||
private final Subchannel subchannel;
|
|
||||||
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
|
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
RequestConnectionPicker(Subchannel subchannel) {
|
|
||||||
this.subchannel = checkNotNull(subchannel, "subchannel");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||||
if (connectionRequested.compareAndSet(false, true)) {
|
if (connectionRequested.compareAndSet(false, true)) {
|
||||||
helper.getSynchronizationContext().execute(new Runnable() {
|
helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection);
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
subchannel.requestConnection();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return PickResult.withNoResult();
|
return PickResult.withNoResult();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1767,6 +1767,19 @@ public class ManagedChannelImplTest {
|
||||||
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
|
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void subchannelsRequestConnectionNoopAfterShutdown() {
|
||||||
|
createChannel();
|
||||||
|
Subchannel sub1 =
|
||||||
|
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
|
||||||
|
|
||||||
|
shutdownSafely(helper, sub1);
|
||||||
|
requestConnectionSafely(helper, sub1);
|
||||||
|
verify(mockTransportFactory, never())
|
||||||
|
.newClientTransport(
|
||||||
|
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void subchannelsNoConnectionShutdownNow() {
|
public void subchannelsNoConnectionShutdownNow() {
|
||||||
createChannel();
|
createChannel();
|
||||||
|
|
|
@ -122,7 +122,7 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer {
|
||||||
SubchannelPicker picker;
|
SubchannelPicker picker;
|
||||||
switch (currentState) {
|
switch (currentState) {
|
||||||
case IDLE:
|
case IDLE:
|
||||||
picker = new RequestConnectionPicker(subchannel);
|
picker = new RequestConnectionPicker();
|
||||||
break;
|
break;
|
||||||
case CONNECTING:
|
case CONNECTING:
|
||||||
picker = new Picker(PickResult.withNoResult());
|
picker = new Picker(PickResult.withNoResult());
|
||||||
|
@ -182,24 +182,15 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer {
|
||||||
*/
|
*/
|
||||||
private final class RequestConnectionPicker extends SubchannelPicker {
|
private final class RequestConnectionPicker extends SubchannelPicker {
|
||||||
|
|
||||||
private final Subchannel subchannel;
|
|
||||||
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
|
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
RequestConnectionPicker(Subchannel subchannel) {
|
|
||||||
this.subchannel = checkNotNull(subchannel, "subchannel");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||||
if (connectionRequested.compareAndSet(false, true)) {
|
if (connectionRequested.compareAndSet(false, true)) {
|
||||||
helper.getSynchronizationContext().execute(new Runnable() {
|
helper.getSynchronizationContext().execute(
|
||||||
@Override
|
ShufflingPickFirstLoadBalancer.this::requestConnection);
|
||||||
public void run() {
|
|
||||||
subchannel.requestConnection();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return PickResult.withNoResult();
|
return PickResult.withNoResult();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -738,14 +738,19 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
|
|
||||||
// Attach the client stream to the HTTP/2 stream object as user data.
|
// Attach the client stream to the HTTP/2 stream object as user data.
|
||||||
stream.setHttp2Stream(http2Stream);
|
stream.setHttp2Stream(http2Stream);
|
||||||
|
promise.setSuccess();
|
||||||
|
} else {
|
||||||
|
// Otherwise, the stream has been cancelled and Netty is sending a
|
||||||
|
// RST_STREAM frame which causes it to purge pending writes from the
|
||||||
|
// flow-controller and delete the http2Stream. The stream listener has already
|
||||||
|
// been notified of cancellation so there is nothing to do.
|
||||||
|
//
|
||||||
|
// This process has been observed to fail in some circumstances, leaving listeners
|
||||||
|
// unanswered. Ensure that some exception has been delivered consistent with the
|
||||||
|
// implied RST_STREAM result above.
|
||||||
|
Status status = Status.INTERNAL.withDescription("unknown stream for connection");
|
||||||
|
promise.setFailure(status.asRuntimeException());
|
||||||
}
|
}
|
||||||
// Otherwise, the stream has been cancelled and Netty is sending a
|
|
||||||
// RST_STREAM frame which causes it to purge pending writes from the
|
|
||||||
// flow-controller and delete the http2Stream. The stream listener has already
|
|
||||||
// been notified of cancellation so there is nothing to do.
|
|
||||||
|
|
||||||
// Just forward on the success status to the original promise.
|
|
||||||
promise.setSuccess();
|
|
||||||
} else {
|
} else {
|
||||||
Throwable cause = future.cause();
|
Throwable cause = future.cause();
|
||||||
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
|
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
|
||||||
|
@ -768,6 +773,19 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
// When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
|
||||||
|
// StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
|
||||||
|
// are delayed because the OS may have too much buffered and isn't accepting the write. The
|
||||||
|
// write promise is also delayed until flush(). However, we need to associate the netty stream
|
||||||
|
// with the transport state so that goingAway() and forcefulClose() and able to notify the
|
||||||
|
// stream of failures.
|
||||||
|
//
|
||||||
|
// This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
|
||||||
|
// it is better than nothing.
|
||||||
|
Http2Stream http2Stream = connection().stream(streamId);
|
||||||
|
if (http2Stream != null) {
|
||||||
|
http2Stream.setProperty(streamKey, stream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -268,7 +268,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
// Cancel the stream.
|
// Cancel the stream.
|
||||||
cancelStream(Status.CANCELLED);
|
cancelStream(Status.CANCELLED);
|
||||||
|
|
||||||
assertTrue(createFuture.isSuccess());
|
assertFalse(createFuture.isSuccess());
|
||||||
verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
|
verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -311,7 +311,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
ChannelFuture cancelFuture = cancelStream(Status.CANCELLED);
|
ChannelFuture cancelFuture = cancelStream(Status.CANCELLED);
|
||||||
assertTrue(cancelFuture.isSuccess());
|
assertTrue(cancelFuture.isSuccess());
|
||||||
assertTrue(createFuture.isDone());
|
assertTrue(createFuture.isDone());
|
||||||
assertTrue(createFuture.isSuccess());
|
assertFalse(createFuture.isSuccess());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -453,6 +453,26 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception {
|
||||||
|
// Purposefully avoid flush(), since we want the write to not actually complete.
|
||||||
|
// EmbeddedChannel doesn't support flow control, so this is the next closest approximation.
|
||||||
|
ChannelFuture future = channel().write(
|
||||||
|
newCreateStreamCommand(grpcHeaders, streamTransportState));
|
||||||
|
// Read a GOAWAY that indicates our stream can't be sent
|
||||||
|
channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8)));
|
||||||
|
|
||||||
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
|
verify(streamListener).closed(captor.capture(), same(REFUSED),
|
||||||
|
ArgumentMatchers.<Metadata>notNull());
|
||||||
|
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
|
||||||
|
assertEquals(
|
||||||
|
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, "
|
||||||
|
+ "debug data: this is a test",
|
||||||
|
captor.getValue().getDescription());
|
||||||
|
assertTrue(future.isDone());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
|
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
|
@ -101,10 +101,10 @@ def grpc_java_repositories(bzlmod = False):
|
||||||
if not native.existing_rule("com_github_cncf_xds"):
|
if not native.existing_rule("com_github_cncf_xds"):
|
||||||
http_archive(
|
http_archive(
|
||||||
name = "com_github_cncf_xds",
|
name = "com_github_cncf_xds",
|
||||||
strip_prefix = "xds-024c85f92f20cab567a83acc50934c7f9711d124",
|
strip_prefix = "xds-2ac532fd44436293585084f8d94c6bdb17835af0",
|
||||||
sha256 = "5f403aa681711500ca8e62387be3e37d971977db6e88616fc21862a406430649",
|
sha256 = "790c4c83b6950bb602fec221f6a529d9f368cdc8852aae7d2592d0d04b015f37",
|
||||||
urls = [
|
urls = [
|
||||||
"https://github.com/cncf/xds/archive/024c85f92f20cab567a83acc50934c7f9711d124.tar.gz",
|
"https://github.com/cncf/xds/archive/2ac532fd44436293585084f8d94c6bdb17835af0.tar.gz",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
if not bzlmod and not native.existing_rule("com_github_grpc_grpc"):
|
if not bzlmod and not native.existing_rule("com_github_grpc_grpc"):
|
||||||
|
|
|
@ -89,7 +89,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(discoveryMechanisms, lbConfig);
|
return Objects.hash(discoveryMechanisms, lbConfig, isHttp11ProxyAvailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -102,7 +102,8 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
||||||
}
|
}
|
||||||
ClusterResolverConfig that = (ClusterResolverConfig) o;
|
ClusterResolverConfig that = (ClusterResolverConfig) o;
|
||||||
return discoveryMechanisms.equals(that.discoveryMechanisms)
|
return discoveryMechanisms.equals(that.discoveryMechanisms)
|
||||||
&& lbConfig.equals(that.lbConfig);
|
&& lbConfig.equals(that.lbConfig)
|
||||||
|
&& isHttp11ProxyAvailable == that.isHttp11ProxyAvailable;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -110,6 +111,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
|
||||||
return MoreObjects.toStringHelper(this)
|
return MoreObjects.toStringHelper(this)
|
||||||
.add("discoveryMechanisms", discoveryMechanisms)
|
.add("discoveryMechanisms", discoveryMechanisms)
|
||||||
.add("lbConfig", lbConfig)
|
.add("lbConfig", lbConfig)
|
||||||
|
.add("isHttp11ProxyAvailable", isHttp11ProxyAvailable)
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,11 +99,13 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
delegate = new NoopLoadBalancer();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class LazyPicker extends SubchannelPicker {
|
private final class LazyPicker extends SubchannelPicker {
|
||||||
@Override
|
@Override
|
||||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||||
|
// activate() is a no-op after shutdown()
|
||||||
helper.getSynchronizationContext().execute(LazyDelegate.this::activate);
|
helper.getSynchronizationContext().execute(LazyDelegate.this::activate);
|
||||||
return PickResult.withNoResult();
|
return PickResult.withNoResult();
|
||||||
}
|
}
|
||||||
|
@ -121,4 +123,17 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer {
|
||||||
return new LazyLoadBalancer(helper, delegate);
|
return new LazyLoadBalancer(helper, delegate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class NoopLoadBalancer extends LoadBalancer {
|
||||||
|
@Override
|
||||||
|
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
|
return Status.OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleNameResolutionError(Status error) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -437,7 +438,9 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
||||||
|
|
||||||
if (subchannelView.connectivityState == IDLE) {
|
if (subchannelView.connectivityState == IDLE) {
|
||||||
syncContext.execute(() -> {
|
syncContext.execute(() -> {
|
||||||
childLbState.getLb().requestConnection();
|
if (childLbState.getCurrentState() == IDLE) {
|
||||||
|
childLbState.getLb().requestConnection();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return PickResult.withNoResult(); // Indicates that this should be retried after backoff
|
return PickResult.withNoResult(); // Indicates that this should be retried after backoff
|
||||||
|
@ -455,10 +458,11 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
||||||
return childLbState.getCurrentPicker().pickSubchannel(args);
|
return childLbState.getCurrentPicker().pickSubchannel(args);
|
||||||
}
|
}
|
||||||
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
|
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
|
||||||
syncContext.execute(
|
syncContext.execute(() -> {
|
||||||
() -> {
|
if (childLbState.getCurrentState() == IDLE) {
|
||||||
childLbState.getLb().requestConnection();
|
childLbState.getLb().requestConnection();
|
||||||
});
|
}
|
||||||
|
});
|
||||||
requestedConnection = true;
|
requestedConnection = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -523,6 +527,22 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
||||||
this.requestHashHeader = requestHashHeader;
|
this.requestHashHeader = requestHashHeader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (!(o instanceof RingHashConfig)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
RingHashConfig that = (RingHashConfig) o;
|
||||||
|
return this.minRingSize == that.minRingSize
|
||||||
|
&& this.maxRingSize == that.maxRingSize
|
||||||
|
&& Objects.equals(this.requestHashHeader, that.requestHashHeader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(minRingSize, maxRingSize, requestHashHeader);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return MoreObjects.toStringHelper(this)
|
return MoreObjects.toStringHelper(this)
|
||||||
|
|
|
@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.testing.EqualsTester;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
|
@ -1199,6 +1200,27 @@ public class ClusterResolverLoadBalancerTest {
|
||||||
any(ConnectivityState.class), any(SubchannelPicker.class));
|
any(ConnectivityState.class), any(SubchannelPicker.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void config_equalsTester() {
|
||||||
|
new EqualsTester()
|
||||||
|
.addEqualityGroup(
|
||||||
|
new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false),
|
||||||
|
new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, false))
|
||||||
|
.addEqualityGroup(new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanism1), roundRobin, false))
|
||||||
|
.addEqualityGroup(new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanism1), leastRequest, true))
|
||||||
|
.addEqualityGroup(new ClusterResolverConfig(
|
||||||
|
Collections.singletonList(edsDiscoveryMechanismWithOutlierDetection),
|
||||||
|
leastRequest,
|
||||||
|
false))
|
||||||
|
.addEqualityGroup(new ClusterResolverConfig(
|
||||||
|
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), leastRequest, false))
|
||||||
|
.testEquals();
|
||||||
|
}
|
||||||
|
|
||||||
private void deliverLbConfig(ClusterResolverConfig config) {
|
private void deliverLbConfig(ClusterResolverConfig config) {
|
||||||
loadBalancer.acceptResolvedAddresses(
|
loadBalancer.acceptResolvedAddresses(
|
||||||
ResolvedAddresses.newBuilder()
|
ResolvedAddresses.newBuilder()
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2025 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
|
||||||
|
import io.grpc.CallOptions;
|
||||||
|
import io.grpc.ConnectivityState;
|
||||||
|
import io.grpc.EquivalentAddressGroup;
|
||||||
|
import io.grpc.LoadBalancer;
|
||||||
|
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||||
|
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
|
import io.grpc.Metadata;
|
||||||
|
import io.grpc.SynchronizationContext;
|
||||||
|
import io.grpc.internal.PickSubchannelArgsImpl;
|
||||||
|
import io.grpc.testing.TestMethodDescriptors;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
/** Unit test for {@link io.grpc.xds.LazyLoadBalancer}. */
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public final class LazyLoadBalancerTest {
|
||||||
|
private SynchronizationContext syncContext =
|
||||||
|
new SynchronizationContext((t, e) -> {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
});
|
||||||
|
private LoadBalancer.PickSubchannelArgs args = new PickSubchannelArgsImpl(
|
||||||
|
TestMethodDescriptors.voidMethod(),
|
||||||
|
new Metadata(),
|
||||||
|
CallOptions.DEFAULT,
|
||||||
|
new LoadBalancer.PickDetailsConsumer() {});
|
||||||
|
private FakeHelper helper = new FakeHelper();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void pickerIsNoopAfterEarlyShutdown() {
|
||||||
|
LazyLoadBalancer lb = new LazyLoadBalancer(helper, new LoadBalancer.Factory() {
|
||||||
|
@Override
|
||||||
|
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
|
||||||
|
throw new AssertionError("unexpected");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
lb.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
|
||||||
|
.setAddresses(Arrays.asList())
|
||||||
|
.build());
|
||||||
|
SubchannelPicker picker = helper.picker;
|
||||||
|
assertThat(picker).isNotNull();
|
||||||
|
lb.shutdown();
|
||||||
|
|
||||||
|
picker.pickSubchannel(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
class FakeHelper extends LoadBalancer.Helper {
|
||||||
|
ConnectivityState state;
|
||||||
|
SubchannelPicker picker;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
|
||||||
|
this.state = newState;
|
||||||
|
this.picker = newPicker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SynchronizationContext getSynchronizationContext() {
|
||||||
|
return syncContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAuthority() {
|
||||||
|
return "localhost";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,6 +42,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.primitives.UnsignedInteger;
|
import com.google.common.primitives.UnsignedInteger;
|
||||||
|
import com.google.common.testing.EqualsTester;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
|
@ -260,7 +261,7 @@ public class RingHashLoadBalancerTest {
|
||||||
private void verifyConnection(int times) {
|
private void verifyConnection(int times) {
|
||||||
for (int i = 0; i < times; i++) {
|
for (int i = 0; i < times; i++) {
|
||||||
Subchannel connectOnce = connectionRequestedQueue.poll();
|
Subchannel connectOnce = connectionRequestedQueue.poll();
|
||||||
assertWithMessage("Null connection is at (%s) of (%s)", i, times)
|
assertWithMessage("Expected %s new connections, but found %s", times, i)
|
||||||
.that(connectOnce).isNotNull();
|
.that(connectOnce).isNotNull();
|
||||||
clearInvocations(connectOnce);
|
clearInvocations(connectOnce);
|
||||||
}
|
}
|
||||||
|
@ -647,7 +648,7 @@ public class RingHashLoadBalancerTest {
|
||||||
getSubchannel(servers, 2),
|
getSubchannel(servers, 2),
|
||||||
ConnectivityStateInfo.forTransientFailure(
|
ConnectivityStateInfo.forTransientFailure(
|
||||||
Status.PERMISSION_DENIED.withDescription("permission denied")));
|
Status.PERMISSION_DENIED.withDescription("permission denied")));
|
||||||
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||||
verifyConnection(0);
|
verifyConnection(0);
|
||||||
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
|
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
|
||||||
assertThat(result.getStatus().isOk()).isTrue();
|
assertThat(result.getStatus().isOk()).isTrue();
|
||||||
|
@ -1113,6 +1114,19 @@ public class RingHashLoadBalancerTest {
|
||||||
assertThat(picks).containsExactly(subchannel1);
|
assertThat(picks).containsExactly(subchannel1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void config_equalsTester() {
|
||||||
|
new EqualsTester()
|
||||||
|
.addEqualityGroup(
|
||||||
|
new RingHashConfig(1, 2, "headerA"),
|
||||||
|
new RingHashConfig(1, 2, "headerA"))
|
||||||
|
.addEqualityGroup(new RingHashConfig(1, 1, "headerA"))
|
||||||
|
.addEqualityGroup(new RingHashConfig(2, 2, "headerA"))
|
||||||
|
.addEqualityGroup(new RingHashConfig(1, 2, "headerB"))
|
||||||
|
.addEqualityGroup(new RingHashConfig(1, 2, ""))
|
||||||
|
.testEquals();
|
||||||
|
}
|
||||||
|
|
||||||
private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
|
private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
|
||||||
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {
|
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
# import VERSION from one of the google internal CLs
|
# import VERSION from one of the google internal CLs
|
||||||
VERSION=024c85f92f20cab567a83acc50934c7f9711d124
|
VERSION=2ac532fd44436293585084f8d94c6bdb17835af0
|
||||||
DOWNLOAD_URL="https://github.com/cncf/xds/archive/${VERSION}.tar.gz"
|
DOWNLOAD_URL="https://github.com/cncf/xds/archive/${VERSION}.tar.gz"
|
||||||
DOWNLOAD_BASE_DIR="xds-${VERSION}"
|
DOWNLOAD_BASE_DIR="xds-${VERSION}"
|
||||||
SOURCE_PROTO_BASE_DIR="${DOWNLOAD_BASE_DIR}"
|
SOURCE_PROTO_BASE_DIR="${DOWNLOAD_BASE_DIR}"
|
||||||
|
@ -40,6 +40,7 @@ xds/annotations/v3/versioning.proto
|
||||||
xds/core/v3/authority.proto
|
xds/core/v3/authority.proto
|
||||||
xds/core/v3/collection_entry.proto
|
xds/core/v3/collection_entry.proto
|
||||||
xds/core/v3/context_params.proto
|
xds/core/v3/context_params.proto
|
||||||
|
xds/core/v3/cidr.proto
|
||||||
xds/core/v3/extension.proto
|
xds/core/v3/extension.proto
|
||||||
xds/core/v3/resource_locator.proto
|
xds/core/v3/resource_locator.proto
|
||||||
xds/core/v3/resource_name.proto
|
xds/core/v3/resource_name.proto
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package xds.core.v3;
|
||||||
|
|
||||||
|
import "xds/annotations/v3/status.proto";
|
||||||
|
import "google/protobuf/wrappers.proto";
|
||||||
|
|
||||||
|
import "validate/validate.proto";
|
||||||
|
|
||||||
|
option java_outer_classname = "CidrRangeProto";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
option java_package = "com.github.xds.core.v3";
|
||||||
|
option go_package = "github.com/cncf/xds/go/xds/core/v3";
|
||||||
|
|
||||||
|
option (xds.annotations.v3.file_status).work_in_progress = true;
|
||||||
|
|
||||||
|
// CidrRange specifies an IP Address and a prefix length to construct
|
||||||
|
// the subnet mask for a `CIDR <https://tools.ietf.org/html/rfc4632>`_ range.
|
||||||
|
message CidrRange {
|
||||||
|
// IPv4 or IPv6 address, e.g. ``192.0.0.0`` or ``2001:db8::``.
|
||||||
|
string address_prefix = 1 [(validate.rules).string = {min_len: 1}];
|
||||||
|
|
||||||
|
// Length of prefix, e.g. 0, 32. Defaults to 0 when unset.
|
||||||
|
google.protobuf.UInt32Value prefix_len = 2 [(validate.rules).uint32 = {lte: 128}];
|
||||||
|
}
|
|
@ -10,7 +10,7 @@ option go_package = "github.com/cncf/xds/go/xds/data/orca/v3";
|
||||||
import "validate/validate.proto";
|
import "validate/validate.proto";
|
||||||
|
|
||||||
// See section `ORCA load report format` of the design document in
|
// See section `ORCA load report format` of the design document in
|
||||||
// :ref:`https://github.com/envoyproxy/envoy/issues/6614`.
|
// https://github.com/envoyproxy/envoy/issues/6614.
|
||||||
|
|
||||||
message OrcaLoadReport {
|
message OrcaLoadReport {
|
||||||
// CPU utilization expressed as a fraction of available CPU resources. This
|
// CPU utilization expressed as a fraction of available CPU resources. This
|
||||||
|
|
|
@ -2,9 +2,7 @@ syntax = "proto3";
|
||||||
|
|
||||||
package xds.type.matcher.v3;
|
package xds.type.matcher.v3;
|
||||||
|
|
||||||
import "xds/annotations/v3/status.proto";
|
|
||||||
import "xds/type/v3/cel.proto";
|
import "xds/type/v3/cel.proto";
|
||||||
|
|
||||||
import "validate/validate.proto";
|
import "validate/validate.proto";
|
||||||
|
|
||||||
option java_package = "com.github.xds.type.matcher.v3";
|
option java_package = "com.github.xds.type.matcher.v3";
|
||||||
|
@ -12,8 +10,6 @@ option java_outer_classname = "CelProto";
|
||||||
option java_multiple_files = true;
|
option java_multiple_files = true;
|
||||||
option go_package = "github.com/cncf/xds/go/xds/type/matcher/v3";
|
option go_package = "github.com/cncf/xds/go/xds/type/matcher/v3";
|
||||||
|
|
||||||
option (xds.annotations.v3.file_status).work_in_progress = true;
|
|
||||||
|
|
||||||
// [#protodoc-title: Common Expression Language (CEL) matchers]
|
// [#protodoc-title: Common Expression Language (CEL) matchers]
|
||||||
|
|
||||||
// Performs a match by evaluating a `Common Expression Language
|
// Performs a match by evaluating a `Common Expression Language
|
||||||
|
@ -24,14 +20,13 @@ option (xds.annotations.v3.file_status).work_in_progress = true;
|
||||||
//
|
//
|
||||||
// The match is ``true``, iff the result of the evaluation is a bool AND true.
|
// The match is ``true``, iff the result of the evaluation is a bool AND true.
|
||||||
// In all other cases, the match is ``false``, including but not limited to: non-bool types,
|
// In all other cases, the match is ``false``, including but not limited to: non-bool types,
|
||||||
// ``false``, ``null``,`` int(1)``, etc.
|
// ``false``, ``null``, ``int(1)``, etc.
|
||||||
// In case CEL expression raises an error, the result of the evaluation is interpreted "no match".
|
// In case CEL expression raises an error, the result of the evaluation is interpreted "no match".
|
||||||
//
|
//
|
||||||
// Refer to :ref:`Unified Matcher API <envoy_v3_api_msg_.xds.type.matcher.v3.Matcher>` documentation
|
// Refer to :ref:`Unified Matcher API <envoy_v3_api_msg_.xds.type.matcher.v3.Matcher>` documentation
|
||||||
// for usage details.
|
// for usage details.
|
||||||
//
|
//
|
||||||
// [#comment:TODO(sergiitk): Link HttpAttributesMatchInput + usage example.]
|
// [#comment: envoy.matching.matchers.cel_matcher]
|
||||||
// [#comment:TODO(sergiitk): When implemented, add the extension tag.]
|
|
||||||
message CelMatcher {
|
message CelMatcher {
|
||||||
// Either parsed or checked representation of the CEL program.
|
// Either parsed or checked representation of the CEL program.
|
||||||
type.v3.CelExpression expr_match = 1 [(validate.rules).message = {required: true}];
|
type.v3.CelExpression expr_match = 1 [(validate.rules).message = {required: true}];
|
||||||
|
|
|
@ -2,15 +2,11 @@ syntax = "proto3";
|
||||||
|
|
||||||
package xds.type.matcher.v3;
|
package xds.type.matcher.v3;
|
||||||
|
|
||||||
import "xds/annotations/v3/status.proto";
|
|
||||||
|
|
||||||
option java_package = "com.github.xds.type.matcher.v3";
|
option java_package = "com.github.xds.type.matcher.v3";
|
||||||
option java_outer_classname = "HttpInputsProto";
|
option java_outer_classname = "HttpInputsProto";
|
||||||
option java_multiple_files = true;
|
option java_multiple_files = true;
|
||||||
option go_package = "github.com/cncf/xds/go/xds/type/matcher/v3";
|
option go_package = "github.com/cncf/xds/go/xds/type/matcher/v3";
|
||||||
|
|
||||||
option (xds.annotations.v3.file_status).work_in_progress = true;
|
|
||||||
|
|
||||||
// [#protodoc-title: Common HTTP Inputs]
|
// [#protodoc-title: Common HTTP Inputs]
|
||||||
|
|
||||||
// Specifies that matching should be performed on the set of :ref:`HTTP attributes
|
// Specifies that matching should be performed on the set of :ref:`HTTP attributes
|
||||||
|
@ -22,6 +18,6 @@ option (xds.annotations.v3.file_status).work_in_progress = true;
|
||||||
// Refer to :ref:`Unified Matcher API <envoy_v3_api_msg_.xds.type.matcher.v3.Matcher>` documentation
|
// Refer to :ref:`Unified Matcher API <envoy_v3_api_msg_.xds.type.matcher.v3.Matcher>` documentation
|
||||||
// for usage details.
|
// for usage details.
|
||||||
//
|
//
|
||||||
// [#comment:TODO(sergiitk): When implemented, add the extension tag.]
|
// [#comment: envoy.matching.inputs.cel_data_input]
|
||||||
message HttpAttributesCelMatchInput {
|
message HttpAttributesCelMatchInput {
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ syntax = "proto3";
|
||||||
|
|
||||||
package xds.type.matcher.v3;
|
package xds.type.matcher.v3;
|
||||||
|
|
||||||
import "xds/annotations/v3/status.proto";
|
|
||||||
import "xds/core/v3/extension.proto";
|
import "xds/core/v3/extension.proto";
|
||||||
import "xds/type/matcher/v3/string.proto";
|
import "xds/type/matcher/v3/string.proto";
|
||||||
|
|
||||||
|
@ -21,8 +20,6 @@ option go_package = "github.com/cncf/xds/go/xds/type/matcher/v3";
|
||||||
// As an on_no_match might result in another matching tree being evaluated, this process
|
// As an on_no_match might result in another matching tree being evaluated, this process
|
||||||
// might repeat several times until the final OnMatch (or no match) is decided.
|
// might repeat several times until the final OnMatch (or no match) is decided.
|
||||||
message Matcher {
|
message Matcher {
|
||||||
option (xds.annotations.v3.message_status).work_in_progress = true;
|
|
||||||
|
|
||||||
// What to do if a match is successful.
|
// What to do if a match is successful.
|
||||||
message OnMatch {
|
message OnMatch {
|
||||||
oneof on_match {
|
oneof on_match {
|
||||||
|
@ -38,6 +35,14 @@ message Matcher {
|
||||||
// Protocol-specific action to take.
|
// Protocol-specific action to take.
|
||||||
core.v3.TypedExtensionConfig action = 2;
|
core.v3.TypedExtensionConfig action = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If true and the Matcher matches, the action will be taken but the caller
|
||||||
|
// will behave as if the Matcher did not match. A subsequent matcher or
|
||||||
|
// on_no_match action will be used instead.
|
||||||
|
// This field is not supported in all contexts in which the matcher API is
|
||||||
|
// used. If this field is set in a context in which it's not supported,
|
||||||
|
// the resource will be rejected.
|
||||||
|
bool keep_matching = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
// A linear list of field matchers.
|
// A linear list of field matchers.
|
||||||
|
|
|
@ -47,6 +47,13 @@ message CelExpression {
|
||||||
//
|
//
|
||||||
// If set, takes precedence over ``cel_expr_parsed``.
|
// If set, takes precedence over ``cel_expr_parsed``.
|
||||||
cel.expr.CheckedExpr cel_expr_checked = 4;
|
cel.expr.CheckedExpr cel_expr_checked = 4;
|
||||||
|
|
||||||
|
// Unparsed expression in string form. For example, ``request.headers['x-env'] == 'prod'`` will
|
||||||
|
// get ``x-env`` header value and compare it with ``prod``.
|
||||||
|
// Check the `Common Expression Language <https://github.com/google/cel-spec>`_ for more details.
|
||||||
|
//
|
||||||
|
// If set, takes precedence over ``cel_expr_parsed`` and ``cel_expr_checked``.
|
||||||
|
string cel_expr_string = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extracts a string by evaluating a `Common Expression Language
|
// Extracts a string by evaluating a `Common Expression Language
|
||||||
|
|
Loading…
Reference in New Issue