core: make ManagedChannel honor Service config LB

I think the idle state transitions are correct. I have looked at them and tried tracing a few paths through. This doesn't go full idle because the name resolver doesn't need to be restarted. Also, the LB transition happens inside of a NR callback, so it would be odd to have the NR terminate itself upon successful resolution. (Might this cause recursion? I think it may).
This commit is contained in:
Carl Mastrangelo 2018-01-12 16:56:54 -08:00 committed by GitHub
parent bd7080337b
commit 98aa69af72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 409 additions and 25 deletions

View File

@ -42,6 +42,7 @@ java_library(
"//context", "//context",
"@com_google_code_findbugs_jsr305//jar", "@com_google_code_findbugs_jsr305//jar",
"@com_google_errorprone_error_prone_annotations//jar", "@com_google_errorprone_error_prone_annotations//jar",
"@com_google_code_gson_gson//jar",
"@com_google_guava_guava//jar", "@com_google_guava_guava//jar",
"@com_google_instrumentation_instrumentation_api//jar", "@com_google_instrumentation_instrumentation_api//jar",
"@io_opencensus_opencensus_api//jar", "@io_opencensus_opencensus_api//jar",

View File

@ -31,7 +31,6 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.NameResolverProvider; import io.grpc.NameResolverProvider;
import io.grpc.PickFirstBalancerFactory;
import io.opencensus.trace.Tracing; import io.opencensus.trace.Tracing;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
@ -85,9 +84,6 @@ public abstract class AbstractManagedChannelImplBuilder
private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY = private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY =
NameResolverProvider.asFactory(); NameResolverProvider.asFactory();
private static final LoadBalancer.Factory DEFAULT_LOAD_BALANCER_FACTORY =
PickFirstBalancerFactory.getInstance();
private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
DecompressorRegistry.getDefaultInstance(); DecompressorRegistry.getDefaultInstance();
@ -113,8 +109,7 @@ public abstract class AbstractManagedChannelImplBuilder
@Nullable @Nullable
String authorityOverride; String authorityOverride;
@Nullable LoadBalancer.Factory loadBalancerFactory;
LoadBalancer.Factory loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY;
boolean fullStreamDecompression; boolean fullStreamDecompression;
@ -223,11 +218,7 @@ public abstract class AbstractManagedChannelImplBuilder
Preconditions.checkState(directServerAddress == null, Preconditions.checkState(directServerAddress == null,
"directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory", "directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory",
directServerAddress); directServerAddress);
if (loadBalancerFactory != null) { this.loadBalancerFactory = loadBalancerFactory;
this.loadBalancerFactory = loadBalancerFactory;
} else {
this.loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY;
}
return thisT(); return thisT();
} }

View File

@ -0,0 +1,197 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonObject;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.Status;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Locale;
import java.util.logging.Logger;
import javax.annotation.Nullable;
final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
private static final Logger logger =
Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName());
@VisibleForTesting
static final String ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME =
"io.grpc.util.RoundRobinLoadBalancerFactory";
@VisibleForTesting
static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME =
"io.grpc.grpclb.GrpclbLoadBalancerFactory";
AutoConfiguredLoadBalancerFactory() {}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new AutoConfiguredLoadBalancer(helper);
}
private static final class EmptySubchannelPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
}
@VisibleForTesting
static final class AutoConfiguredLoadBalancer extends LoadBalancer {
private final Helper helper;
private LoadBalancer delegate;
private LoadBalancer.Factory delegateFactory;
AutoConfiguredLoadBalancer(Helper helper) {
this.helper = helper;
setDelegateFactory(PickFirstBalancerFactory.getInstance());
setDelegate(getDelegateFactory().newLoadBalancer(helper));
}
// Must be run inside ChannelExecutor.
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
if (attributes.keys().contains(GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG)) {
Factory newlbf = decideLoadBalancerFactory(
servers, attributes.get(GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG));
if (newlbf != null && newlbf != delegateFactory) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker());
getDelegate().shutdown();
setDelegateFactory(newlbf);
setDelegate(getDelegateFactory().newLoadBalancer(helper));
}
}
getDelegate().handleResolvedAddressGroups(servers, attributes);
}
@Override
public void handleNameResolutionError(Status error) {
getDelegate().handleNameResolutionError(error);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
getDelegate().handleSubchannelState(subchannel, stateInfo);
}
@Override
public void shutdown() {
getDelegate().shutdown();
setDelegate(null);
}
@VisibleForTesting
LoadBalancer getDelegate() {
return delegate;
}
@VisibleForTesting
void setDelegate(LoadBalancer delegate) {
this.delegate = delegate;
}
@VisibleForTesting
LoadBalancer.Factory getDelegateFactory() {
return delegateFactory;
}
@VisibleForTesting
void setDelegateFactory(LoadBalancer.Factory delegateFactory) {
this.delegateFactory = delegateFactory;
}
/**
* Picks a load balancer based on given criteria. In order of preference:
*
* <ol>
* <li>User provided lb on the channel. This is a degenerate case and not handled here.</li>
* <li>gRPCLB if on the class path and any gRPC LB balancer addresses are present</li>
* <li>RoundRobin if on the class path and picked by the service config</li>
* <li>PickFirst if the service config choice does not specify</li>
* </ol>
*
* @param servers The list of servers reported
* @param choice the service config object
* @return the new load balancer factory, or null if the existing lb should be used.
*/
@Nullable
@VisibleForTesting
static LoadBalancer.Factory decideLoadBalancerFactory(
List<EquivalentAddressGroup> servers, JsonObject choice) {
boolean loadBalancingPolicyPresent = choice.has("loadBalancingPolicy");
// Check for balancer addresses
boolean haveBalancerAddress = false;
for (EquivalentAddressGroup s : servers) {
if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) {
haveBalancerAddress = true;
break;
}
}
if (haveBalancerAddress) {
try {
Class<?> lbFactoryClass;
lbFactoryClass = Class.forName(GRPCLB_LOAD_BALANCER_FACTORY_NAME);
Method getInstance = lbFactoryClass.getMethod("getInstance");
return (LoadBalancer.Factory) getInstance.invoke(null);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Can't get GRPCLB, but balancer addresses were present", e);
}
}
// Check for an explicitly present lb choice
if (loadBalancingPolicyPresent) {
String serviceConfigChoiceBalancingPolicy = null;
serviceConfigChoiceBalancingPolicy = choice.get("loadBalancingPolicy").getAsString();
String policy = checkNotNull(serviceConfigChoiceBalancingPolicy, "policy");
if (policy.toUpperCase(Locale.ROOT).equals("ROUND_ROBIN")) {
ClassNotFoundException caught = null;
try {
Class<?> lbFactoryClass;
lbFactoryClass = Class.forName(ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME);
Method getInstance = lbFactoryClass.getMethod("getInstance");
return (LoadBalancer.Factory) getInstance.invoke(null);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Can't get Round Robin LB", e);
}
}
throw new IllegalArgumentException("Unknown service config policy: " + policy);
}
return PickFirstBalancerFactory.getInstance();
}
}
}

View File

@ -194,9 +194,13 @@ final class DnsNameResolver extends NameResolver {
Attributes.Builder attrs = Attributes.newBuilder(); Attributes.Builder attrs = Attributes.newBuilder();
if (!resolvedInetAddrs.txtRecords.isEmpty()) { if (!resolvedInetAddrs.txtRecords.isEmpty()) {
// TODO(carl-mastrangelo): re enable this
/*
attrs.set( attrs.set(
GrpcAttributes.NAME_RESOLVER_ATTR_DNS_TXT, GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG,
Collections.unmodifiableList(new ArrayList<String>(resolvedInetAddrs.txtRecords))); Collections.unmodifiableList(new ArrayList<String>(resolvedInetAddrs.txtRecords)));
*/
} }
savedListener.onAddresses(servers, attrs.build()); savedListener.onAddresses(servers, attrs.build());
} finally { } finally {

View File

@ -16,8 +16,8 @@
package io.grpc.internal; package io.grpc.internal;
import com.google.gson.JsonObject;
import io.grpc.Attributes; import io.grpc.Attributes;
import java.util.List;
/** /**
* Special attributes that are only useful to gRPC. * Special attributes that are only useful to gRPC.
@ -26,8 +26,8 @@ public final class GrpcAttributes {
/** /**
* Attribute key TXT DNS records. * Attribute key TXT DNS records.
*/ */
public static final Attributes.Key<List<String>> NAME_RESOLVER_ATTR_DNS_TXT = public static final Attributes.Key<JsonObject> NAME_RESOLVER_ATTR_SERVICE_CONFIG =
Attributes.Key.of("dns-txt"); Attributes.Key.of("service-config");
/** /**
* The naming authority of a gRPC LB server address. It is an address-group-level attribute, * The naming authority of a gRPC LB server address. It is an address-group-level attribute,
@ -36,6 +36,5 @@ public final class GrpcAttributes {
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY = public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority"); Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority");
private GrpcAttributes() {} private GrpcAttributes() {}
} }

View File

@ -113,6 +113,7 @@ public final class ManagedChannelImpl
private final NameResolver.Factory nameResolverFactory; private final NameResolver.Factory nameResolverFactory;
private final Attributes nameResolverParams; private final Attributes nameResolverParams;
private final LoadBalancer.Factory loadBalancerFactory; private final LoadBalancer.Factory loadBalancerFactory;
private final ClientTransportFactory transportFactory; private final ClientTransportFactory transportFactory;
private final Executor executor; private final Executor executor;
private final ObjectPool<? extends Executor> executorPool; private final ObjectPool<? extends Executor> executorPool;
@ -459,8 +460,11 @@ public final class ManagedChannelImpl
this.nameResolverFactory = builder.getNameResolverFactory(); this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
this.loadBalancerFactory = if (builder.loadBalancerFactory == null) {
checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory"); this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
} else {
this.loadBalancerFactory = builder.loadBalancerFactory;
}
this.executorPool = checkNotNull(builder.executorPool, "executorPool"); this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor"); this.executor = checkNotNull(executorPool.getObject(), "executor");
@ -983,11 +987,9 @@ public final class ManagedChannelImpl
} }
private class NameResolverListenerImpl implements NameResolver.Listener { private class NameResolverListenerImpl implements NameResolver.Listener {
final LoadBalancer balancer; final LbHelperImpl helper;
final LoadBalancer.Helper helper;
NameResolverListenerImpl(LbHelperImpl helperImpl) { NameResolverListenerImpl(LbHelperImpl helperImpl) {
this.balancer = helperImpl.lb;
this.helper = helperImpl; this.helper = helperImpl;
} }
@ -1002,6 +1004,7 @@ public final class ManagedChannelImpl
new Object[]{getLogId(), servers, config}); new Object[]{getLogId(), servers, config});
} }
final class NamesResolved implements Runnable { final class NamesResolved implements Runnable {
@Override @Override
public void run() { public void run() {
@ -1010,13 +1013,13 @@ public final class ManagedChannelImpl
return; return;
} }
try { try {
balancer.handleResolvedAddressGroups(servers, config); helper.lb.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) { } catch (Throwable e) {
logger.log( logger.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e); Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may // It must be a bug! Push the exception back to LoadBalancer in the hope that it may
// be propagated to the application. // be propagated to the application.
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e) helper.lb.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e)); .withDescription("Thrown from handleResolvedAddresses(): " + e));
} }
} }
@ -1037,7 +1040,7 @@ public final class ManagedChannelImpl
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return; return;
} }
balancer.handleNameResolutionError(error); lbHelper.lb.handleNameResolutionError(error);
} }
}).drain(); }).drain();
} }

View File

@ -117,7 +117,7 @@ public class AbstractManagedChannelImplBuilderTest {
@Test @Test
public void loadBalancerFactory_default() { public void loadBalancerFactory_default() {
assertNotNull(builder.loadBalancerFactory); assertNull(builder.loadBalancerFactory);
} }
@Test @Test

View File

@ -0,0 +1,189 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver.Factory;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.Status;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link AutoConfiguredLoadBalancerFactory}.
*/
// TODO(carl-mastrangelo): Add tests for selection logic.
@RunWith(JUnit4.class)
public class AutoConfiguredLoadBalancerFactoryTest {
private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory();
@Test
public void newLoadBalancer_isAuto() {
LoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
assertThat(lb).isInstanceOf(AutoConfiguredLoadBalancer.class);
}
@Test
public void defaultIsPickFirst() {
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper());
assertThat(lb.getDelegateFactory()).isInstanceOf(PickFirstBalancerFactory.class);
assertThat(lb.getDelegate().getClass().getName()).contains("PickFirst");
}
@Test
public void forwardsCalls() {
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper());
final AtomicInteger calls = new AtomicInteger();
TestLoadBalancer testlb = new TestLoadBalancer() {
@Override
public void handleNameResolutionError(Status error) {
calls.getAndSet(1);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
calls.getAndSet(2);
}
@Override
public void shutdown() {
calls.getAndSet(3);
}
};
lb.setDelegate(testlb);
lb.handleNameResolutionError(Status.RESOURCE_EXHAUSTED);
assertThat(calls.getAndSet(0)).isEqualTo(1);
lb.handleSubchannelState(null, null);
assertThat(calls.getAndSet(0)).isEqualTo(2);
lb.shutdown();
assertThat(calls.getAndSet(0)).isEqualTo(3);
}
public static class ForwardingLoadBalancer extends LoadBalancer {
private final LoadBalancer delegate;
public ForwardingLoadBalancer(LoadBalancer delegate) {
this.delegate = delegate;
}
protected LoadBalancer delegate() {
return delegate;
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
delegate().handleResolvedAddressGroups(servers, attributes);
}
@Override
public void handleNameResolutionError(Status error) {
delegate().handleNameResolutionError(error);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
delegate().handleSubchannelState(subchannel, stateInfo);
}
@Override
public void shutdown() {
delegate().shutdown();
}
}
public static class ForwardingLoadBalancerHelper extends Helper {
private final Helper delegate;
public ForwardingLoadBalancerHelper(Helper delegate) {
this.delegate = delegate;
}
protected Helper delegate() {
return delegate;
}
@Override
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
return delegate().createSubchannel(addrs, attrs);
}
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
return delegate().createOobChannel(eag, authority);
}
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
delegate().updateBalancingState(newState, newPicker);
}
@Override
public void runSerialized(Runnable task) {
delegate().runSerialized(task);
}
@Override
public Factory getNameResolverFactory() {
return delegate().getNameResolverFactory();
}
@Override
public String getAuthority() {
return delegate().getAuthority();
}
}
private static class TestLoadBalancer extends ForwardingLoadBalancer {
TestLoadBalancer() {
super(null);
}
}
private static class TestHelper extends ForwardingLoadBalancerHelper {
TestHelper() {
super(null);
}
}
}