diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 6f4794fdd4..042f9e6363 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -34,6 +34,8 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; +import java.net.Inet4Address; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -58,17 +60,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName()); @VisibleForTesting static final int CONNECTION_DELAY_INTERVAL_MS = 250; + private final boolean enableHappyEyeballs = !isSerializingRetries() + && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs(); private final Helper helper; private final Map subchannels = new HashMap<>(); - private final Index addressIndex = new Index(ImmutableList.of()); + private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs); private int numTf = 0; private boolean firstPass = true; @Nullable private ScheduledHandle scheduleConnectionTask = null; private ConnectivityState rawConnectivityState = IDLE; private ConnectivityState concludedState = IDLE; - private final boolean enableHappyEyeballs = !isSerializingRetries() - && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs(); private boolean notAPetiolePolicy = true; // means not under a petiole policy private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider(); private BackoffPolicy reconnectPolicy; @@ -610,27 +612,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } /** - * Index as in 'i', the pointer to an entry. Not a "search index." + * This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry. * All updates should be done in a synchronization context. */ @VisibleForTesting static final class Index { - private List addressGroups; - private int size; - private int groupIndex; - private int addressIndex; + private List orderedAddresses; + private int activeElement = 0; + private boolean enableHappyEyeballs; - public Index(List groups) { + Index(List groups, boolean enableHappyEyeballs) { + this.enableHappyEyeballs = enableHappyEyeballs; updateGroups(groups); } public boolean isValid() { - // Is invalid if empty or has incremented off the end - return groupIndex < addressGroups.size(); + return activeElement < orderedAddresses.size(); } public boolean isAtBeginning() { - return groupIndex == 0 && addressIndex == 0; + return activeElement == 0; } /** @@ -642,79 +643,150 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return false; } - EquivalentAddressGroup group = addressGroups.get(groupIndex); - addressIndex++; - if (addressIndex >= group.getAddresses().size()) { - groupIndex++; - addressIndex = 0; - return groupIndex < addressGroups.size(); - } + activeElement++; - return true; + return isValid(); } public void reset() { - groupIndex = 0; - addressIndex = 0; + activeElement = 0; } public SocketAddress getCurrentAddress() { if (!isValid()) { throw new IllegalStateException("Index is past the end of the address group list"); } - return addressGroups.get(groupIndex).getAddresses().get(addressIndex); + return orderedAddresses.get(activeElement).address; } public Attributes getCurrentEagAttributes() { if (!isValid()) { throw new IllegalStateException("Index is off the end of the address group list"); } - return addressGroups.get(groupIndex).getAttributes(); + return orderedAddresses.get(activeElement).attributes; } public List getCurrentEagAsList() { - return Collections.singletonList( - new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes())); + return Collections.singletonList(getCurrentEag()); + } + + private EquivalentAddressGroup getCurrentEag() { + if (!isValid()) { + throw new IllegalStateException("Index is past the end of the address group list"); + } + return orderedAddresses.get(activeElement).asEag(); } /** * Update to new groups, resetting the current index. */ public void updateGroups(List newGroups) { - addressGroups = checkNotNull(newGroups, "newGroups"); + checkNotNull(newGroups, "newGroups"); + orderedAddresses = enableHappyEyeballs + ? updateGroupsHE(newGroups) + : updateGroupsNonHE(newGroups); reset(); - int size = 0; - for (EquivalentAddressGroup eag : newGroups) { - size += eag.getAddresses().size(); - } - this.size = size; } /** * Returns false if the needle was not found and the current index was left unchanged. */ public boolean seekTo(SocketAddress needle) { - for (int i = 0; i < addressGroups.size(); i++) { - EquivalentAddressGroup group = addressGroups.get(i); - int j = group.getAddresses().indexOf(needle); - if (j == -1) { - continue; + checkNotNull(needle, "needle"); + for (int i = 0; i < orderedAddresses.size(); i++) { + if (orderedAddresses.get(i).address.equals(needle)) { + this.activeElement = i; + return true; } - this.groupIndex = i; - this.addressIndex = j; - return true; } return false; } public int size() { - return size; + return orderedAddresses.size(); + } + + private List updateGroupsNonHE(List newGroups) { + List entries = new ArrayList<>(); + for (int g = 0; g < newGroups.size(); g++) { + EquivalentAddressGroup eag = newGroups.get(g); + for (int a = 0; a < eag.getAddresses().size(); a++) { + SocketAddress addr = eag.getAddresses().get(a); + entries.add(new UnwrappedEag(eag.getAttributes(), addr)); + } + } + + return entries; + } + + private List updateGroupsHE(List newGroups) { + Boolean firstIsV6 = null; + List v4Entries = new ArrayList<>(); + List v6Entries = new ArrayList<>(); + for (int g = 0; g < newGroups.size(); g++) { + EquivalentAddressGroup eag = newGroups.get(g); + for (int a = 0; a < eag.getAddresses().size(); a++) { + SocketAddress addr = eag.getAddresses().get(a); + boolean isIpV4 = addr instanceof InetSocketAddress + && ((InetSocketAddress) addr).getAddress() instanceof Inet4Address; + if (isIpV4) { + if (firstIsV6 == null) { + firstIsV6 = false; + } + v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr)); + } else { + if (firstIsV6 == null) { + firstIsV6 = true; + } + v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr)); + } + } + } + + return firstIsV6 != null && firstIsV6 + ? interleave(v6Entries, v4Entries) + : interleave(v4Entries, v6Entries); + } + + private List interleave(List firstFamily, + List secondFamily) { + if (firstFamily.isEmpty()) { + return secondFamily; + } + if (secondFamily.isEmpty()) { + return firstFamily; + } + + List result = new ArrayList<>(firstFamily.size() + secondFamily.size()); + for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) { + if (i < firstFamily.size()) { + result.add(firstFamily.get(i)); + } + if (i < secondFamily.size()) { + result.add(secondFamily.get(i)); + } + } + return result; + } + + private static final class UnwrappedEag { + private final Attributes attributes; + private final SocketAddress address; + + public UnwrappedEag(Attributes attributes, SocketAddress address) { + this.attributes = attributes; + this.address = address; + } + + private EquivalentAddressGroup asEag() { + return new EquivalentAddressGroup(address, attributes); + } } } @VisibleForTesting - int getGroupIndex() { - return addressIndex.groupIndex; + int getIndexLocation() { + return addressIndex.activeElement; } @VisibleForTesting @@ -778,4 +850,5 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { this.randomSeed = randomSeed; } } + } diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index f0031a6ae6..61bcb5c05a 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assume.assumeTrue; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; @@ -67,6 +68,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -2618,7 +2620,7 @@ public class PickFirstLeafLoadBalancerTest { forwardTimeByBackoffDelay(); // should trigger retry again for (int i = 0; i < subchannels.length; i++) { inOrder.verify(subchannels[i]).requestConnection(); - assertEquals(i, loadBalancer.getGroupIndex()); + assertEquals(i, loadBalancer.getIndexLocation()); listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade } } @@ -2637,7 +2639,7 @@ public class PickFirstLeafLoadBalancerTest { PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1), new EquivalentAddressGroup(Arrays.asList(addr3), attr2), - new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3))); + new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)), enableHappyEyeballs); assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); assertThat(index.isAtBeginning()).isTrue(); @@ -2696,7 +2698,7 @@ public class PickFirstLeafLoadBalancerTest { SocketAddress addr3 = new FakeSocketAddress("addr3"); PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( new EquivalentAddressGroup(Arrays.asList(addr1)), - new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + new EquivalentAddressGroup(Arrays.asList(addr2, addr3))), enableHappyEyeballs); index.increment(); index.increment(); // We want to make sure both groupIndex and addressIndex are reset @@ -2713,7 +2715,7 @@ public class PickFirstLeafLoadBalancerTest { SocketAddress addr3 = new FakeSocketAddress("addr3"); PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), - new EquivalentAddressGroup(Arrays.asList(addr3)))); + new EquivalentAddressGroup(Arrays.asList(addr3))), enableHappyEyeballs); assertThat(index.seekTo(addr3)).isTrue(); assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3); assertThat(index.seekTo(addr1)).isTrue(); @@ -2725,6 +2727,83 @@ public class PickFirstLeafLoadBalancerTest { assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); } + @Test + public void index_interleaving() { + InetSocketAddress addr1_6 = new InetSocketAddress("f38:1:1", 1234); + InetSocketAddress addr1_4 = new InetSocketAddress("10.1.1.1", 1234); + InetSocketAddress addr2_4 = new InetSocketAddress("10.1.1.2", 1234); + InetSocketAddress addr3_4 = new InetSocketAddress("10.1.1.3", 1234); + InetSocketAddress addr4_4 = new InetSocketAddress("10.1.1.4", 1234); + InetSocketAddress addr4_6 = new InetSocketAddress("f38:1:4", 1234); + + Attributes attrs1 = Attributes.newBuilder().build(); + Attributes attrs2 = Attributes.newBuilder().build(); + Attributes attrs3 = Attributes.newBuilder().build(); + Attributes attrs4 = Attributes.newBuilder().build(); + + PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1_4, addr1_6), attrs1), + new EquivalentAddressGroup(Arrays.asList(addr2_4), attrs2), + new EquivalentAddressGroup(Arrays.asList(addr3_4), attrs3), + new EquivalentAddressGroup(Arrays.asList(addr4_4, addr4_6), attrs4)), enableHappyEyeballs); + + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1); + assertThat(index.isAtBeginning()).isTrue(); + + index.increment(); + assertThat(index.isValid()).isTrue(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_6); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1); + assertThat(index.isAtBeginning()).isFalse(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2_4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs2); + + index.increment(); + if (enableHappyEyeballs) { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4); + } else { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3); + } + + index.increment(); + if (enableHappyEyeballs) { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3); + } else { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4); + } + + // Move to last entry + assertThat(index.increment()).isTrue(); + assertThat(index.isValid()).isTrue(); + if (enableHappyEyeballs) { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4); + } else { + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6); + } + + // Move off of the end + assertThat(index.increment()).isFalse(); + assertThat(index.isValid()).isFalse(); + assertThrows(IllegalStateException.class, index::getCurrentAddress); + + // Reset + index.reset(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + // Seek to an address + assertThat(index.seekTo(addr4_4)).isTrue(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4); + } + private static class FakeSocketAddress extends SocketAddress { final String name;