diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 53ba60fdc9..d2fd8409e0 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -1189,6 +1189,10 @@ public abstract class LoadBalancer { * Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context * as that the callback methods on the {@link LoadBalancer} interface are run in. * + *

Work added to the synchronization context might not run immediately, so LB implementations + * must be careful to ensure that any assumptions still hold when it is executed. In particular, + * the LB might have been shut down or subchannels might have changed state. + * *

Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a * {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your * convenience. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index e8f106c777..16b8adbd34 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1967,6 +1967,9 @@ final class ManagedChannelImpl extends ManagedChannel implements public void requestConnection() { syncContext.throwIfNotInThisSynchronizationContext(); checkState(started, "not started"); + if (shutdown) { + return; + } subchannel.obtainActiveTransport(); } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index acef79d3d9..a23855e67e 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -134,7 +134,7 @@ final class PickFirstLoadBalancer extends LoadBalancer { SubchannelPicker picker; switch (newState) { case IDLE: - picker = new RequestConnectionPicker(subchannel); + picker = new RequestConnectionPicker(); break; case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave @@ -197,22 +197,12 @@ final class PickFirstLoadBalancer extends LoadBalancer { /** Picker that requests connection during the first pick, and returns noResult. */ private final class RequestConnectionPicker extends SubchannelPicker { - private final Subchannel subchannel; private final AtomicBoolean connectionRequested = new AtomicBoolean(false); - RequestConnectionPicker(Subchannel subchannel) { - this.subchannel = checkNotNull(subchannel, "subchannel"); - } - @Override public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { - helper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - subchannel.requestConnection(); - } - }); + helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection); } return PickResult.withNoResult(); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 6dfba7404c..efc582703b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -1767,6 +1767,19 @@ public class ManagedChannelImplTest { any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); } + @Test + public void subchannelsRequestConnectionNoopAfterShutdown() { + createChannel(); + Subchannel sub1 = + createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); + + shutdownSafely(helper, sub1); + requestConnectionSafely(helper, sub1); + verify(mockTransportFactory, never()) + .newClientTransport( + any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); + } + @Test public void subchannelsNoConnectionShutdownNow() { createChannel(); diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java index 4cf09170c8..b49c856c4d 100644 --- a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java @@ -122,7 +122,7 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer { SubchannelPicker picker; switch (currentState) { case IDLE: - picker = new RequestConnectionPicker(subchannel); + picker = new RequestConnectionPicker(); break; case CONNECTING: picker = new Picker(PickResult.withNoResult()); @@ -182,24 +182,15 @@ class ShufflingPickFirstLoadBalancer extends LoadBalancer { */ private final class RequestConnectionPicker extends SubchannelPicker { - private final Subchannel subchannel; private final AtomicBoolean connectionRequested = new AtomicBoolean(false); - RequestConnectionPicker(Subchannel subchannel) { - this.subchannel = checkNotNull(subchannel, "subchannel"); - } - @Override public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { - helper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - subchannel.requestConnection(); - } - }); + helper.getSynchronizationContext().execute( + ShufflingPickFirstLoadBalancer.this::requestConnection); } return PickResult.withNoResult(); } } -} \ No newline at end of file +} diff --git a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java index 8ba1cb28c6..b5f09c4ea9 100644 --- a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java @@ -99,11 +99,13 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer { @Override public void shutdown() { + delegate = new NoopLoadBalancer(); } private final class LazyPicker extends SubchannelPicker { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { + // activate() is a no-op after shutdown() helper.getSynchronizationContext().execute(LazyDelegate.this::activate); return PickResult.withNoResult(); } @@ -121,4 +123,17 @@ final class LazyLoadBalancer extends ForwardingLoadBalancer { return new LazyLoadBalancer(helper, delegate); } } + + private static final class NoopLoadBalancer extends LoadBalancer { + @Override + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + return Status.OK; + } + + @Override + public void handleNameResolutionError(Status error) {} + + @Override + public void shutdown() {} + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 96ce5b9702..21ee914ff8 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -438,7 +438,9 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer { if (subchannelView.connectivityState == IDLE) { syncContext.execute(() -> { - childLbState.getLb().requestConnection(); + if (childLbState.getCurrentState() == IDLE) { + childLbState.getLb().requestConnection(); + } }); return PickResult.withNoResult(); // Indicates that this should be retried after backoff @@ -456,10 +458,11 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer { return childLbState.getCurrentPicker().pickSubchannel(args); } if (!requestedConnection && subchannelView.connectivityState == IDLE) { - syncContext.execute( - () -> { - childLbState.getLb().requestConnection(); - }); + syncContext.execute(() -> { + if (childLbState.getCurrentState() == IDLE) { + childLbState.getLb().requestConnection(); + } + }); requestedConnection = true; } } diff --git a/xds/src/test/java/io/grpc/xds/LazyLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LazyLoadBalancerTest.java new file mode 100644 index 0000000000..c79d048c9d --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/LazyLoadBalancerTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.CallOptions; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.SynchronizationContext; +import io.grpc.internal.PickSubchannelArgsImpl; +import io.grpc.testing.TestMethodDescriptors; +import java.util.Arrays; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit test for {@link io.grpc.xds.LazyLoadBalancer}. */ +@RunWith(JUnit4.class) +public final class LazyLoadBalancerTest { + private SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { + throw new AssertionError(e); + }); + private LoadBalancer.PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), + new Metadata(), + CallOptions.DEFAULT, + new LoadBalancer.PickDetailsConsumer() {}); + private FakeHelper helper = new FakeHelper(); + + @Test + public void pickerIsNoopAfterEarlyShutdown() { + LazyLoadBalancer lb = new LazyLoadBalancer(helper, new LoadBalancer.Factory() { + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + throw new AssertionError("unexpected"); + } + }); + lb.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Arrays.asList()) + .build()); + SubchannelPicker picker = helper.picker; + assertThat(picker).isNotNull(); + lb.shutdown(); + + picker.pickSubchannel(args); + } + + class FakeHelper extends LoadBalancer.Helper { + ConnectivityState state; + SubchannelPicker picker; + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + this.state = newState; + this.picker = newPicker; + } + + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + + @Override + public String getAuthority() { + return "localhost"; + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index fc5f21156d..d65cf96c00 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -261,7 +261,7 @@ public class RingHashLoadBalancerTest { private void verifyConnection(int times) { for (int i = 0; i < times; i++) { Subchannel connectOnce = connectionRequestedQueue.poll(); - assertWithMessage("Null connection is at (%s) of (%s)", i, times) + assertWithMessage("Expected %s new connections, but found %s", times, i) .that(connectOnce).isNotNull(); clearInvocations(connectOnce); } @@ -648,7 +648,7 @@ public class RingHashLoadBalancerTest { getSubchannel(servers, 2), ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied"))); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); verifyConnection(0); PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel assertThat(result.getStatus().isOk()).isTrue();