core:Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful (#11849)

* Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful.
Move cleanup of removed subchannels into a method so it can be called from 2 places in acceptResolvedAddresses.
Since the seek could mean we never looked at the first address, if we go off the end of the index and haven't looked at the all of the addresses then instead of scheduleBackoff() we reset the index and request a connection.
This commit is contained in:
Larry Safran 2025-01-24 16:42:56 -08:00 committed by GitHub
parent 67351c0c53
commit 87aa6deadf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 26 deletions

View File

@ -137,14 +137,15 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups = final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build(); ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
if (rawConnectivityState == READY) { if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) {
// If the previous ready subchannel exists in new address list, // If the previous ready (or connecting) subchannel exists in new address list,
// keep this connection and don't create new subchannels // keep this connection and don't create new subchannels
SocketAddress previousAddress = addressIndex.getCurrentAddress(); SocketAddress previousAddress = addressIndex.getCurrentAddress();
addressIndex.updateGroups(newImmutableAddressGroups); addressIndex.updateGroups(newImmutableAddressGroups);
if (addressIndex.seekTo(previousAddress)) { if (addressIndex.seekTo(previousAddress)) {
SubchannelData subchannelData = subchannels.get(previousAddress); SubchannelData subchannelData = subchannels.get(previousAddress);
subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList()); subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
shutdownRemovedAddresses(newImmutableAddressGroups);
return Status.OK; return Status.OK;
} }
// Previous ready subchannel not in the new list of addresses // Previous ready subchannel not in the new list of addresses
@ -152,23 +153,11 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
addressIndex.updateGroups(newImmutableAddressGroups); addressIndex.updateGroups(newImmutableAddressGroups);
} }
// remove old subchannels that were not in new address list // No old addresses means first time through, so we will do an explicit move to CONNECTING
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet()); // which is what we implicitly started with
boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
// Flatten the new EAGs addresses if (noOldAddrs) {
Set<SocketAddress> newAddrs = new HashSet<>();
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
newAddrs.addAll(endpoint.getAddresses());
}
// Shut them down and remove them
for (SocketAddress oldAddr : oldAddrs) {
if (!newAddrs.contains(oldAddr)) {
subchannels.remove(oldAddr).getSubchannel().shutdown();
}
}
if (oldAddrs.size() == 0) {
// Make tests happy; they don't properly assume starting in CONNECTING // Make tests happy; they don't properly assume starting in CONNECTING
rawConnectivityState = CONNECTING; rawConnectivityState = CONNECTING;
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
@ -188,6 +177,31 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
return Status.OK; return Status.OK;
} }
/**
* Compute the difference between the flattened new addresses and the old addresses that had been
* made into subchannels and then shutdown the matching subchannels.
* @return true if there were no old addresses
*/
private boolean shutdownRemovedAddresses(
ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
// Flatten the new EAGs addresses
Set<SocketAddress> newAddrs = new HashSet<>();
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
newAddrs.addAll(endpoint.getAddresses());
}
// Shut them down and remove them
for (SocketAddress oldAddr : oldAddrs) {
if (!newAddrs.contains(oldAddr)) {
subchannels.remove(oldAddr).getSubchannel().shutdown();
}
}
return oldAddrs.isEmpty();
}
private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) { private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
Set<SocketAddress> seenAddresses = new HashSet<>(); Set<SocketAddress> seenAddresses = new HashSet<>();
List<EquivalentAddressGroup> newGroups = new ArrayList<>(); List<EquivalentAddressGroup> newGroups = new ArrayList<>();
@ -290,7 +304,14 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
cancelScheduleTask(); cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses requestConnection(); // is recursive so might hit the end of the addresses
} else { } else {
scheduleBackoff(); if (subchannels.size() >= addressIndex.size()) {
scheduleBackoff();
} else {
// We must have done a seek to the middle of the list lets start over from the
// beginning
addressIndex.reset();
requestConnection();
}
} }
} }

View File

@ -2133,18 +2133,20 @@ public class PickFirstLeafLoadBalancerTest {
loadBalancer.acceptResolvedAddresses( loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
// Verify that no new subchannels were created or started // Subchannel 2 should be reused since it was trying to connect and is present.
inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockSubchannel1).shutdown();
inOrder.verify(mockSubchannel3, never()).start(stateListenerCaptor.capture());
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
// Second address connection attempt is unsuccessful, so since at end, but don't have all
// subchannels, schedule a backoff for the first address
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
fakeClock.forwardTime(1, TimeUnit.SECONDS);
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
inOrder.verify(mockSubchannel3).requestConnection();
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
// Second address connection attempt is unsuccessful, but should not go into transient failure // Third address connection attempt is unsuccessful, now we enter TF, do name resolution
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
// Third address connection attempt is unsuccessful, now we enter transient failure
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());