mirror of https://github.com/grpc/grpc-java.git
otel: plumb optional labels other than disconnect_error
This commit is contained in:
parent
735665bcdf
commit
a664b2fd92
|
@ -135,6 +135,12 @@ public abstract class LoadBalancer {
|
||||||
public static final Attributes.Key<Boolean> IS_PETIOLE_POLICY =
|
public static final Attributes.Key<Boolean> IS_PETIOLE_POLICY =
|
||||||
Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY");
|
Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The name of the locality that this EquivalentAddressGroup is in.
|
||||||
|
*/
|
||||||
|
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
|
||||||
|
Attributes.Key.create("io.grpc.lb.locality");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A picker that always returns an erring pick.
|
* A picker that always returns an erring pick.
|
||||||
*
|
*
|
||||||
|
|
|
@ -49,7 +49,9 @@ import io.grpc.LoadBalancer;
|
||||||
import io.grpc.Metadata;
|
import io.grpc.Metadata;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.MetricRecorder;
|
import io.grpc.MetricRecorder;
|
||||||
|
import io.grpc.NameResolver;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.SecurityLevel;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
@ -163,9 +165,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
private volatile Attributes connectedAddressAttributes;
|
private volatile Attributes connectedAddressAttributes;
|
||||||
private final SubchannelMetrics subchannelMetrics;
|
private final SubchannelMetrics subchannelMetrics;
|
||||||
private final String target;
|
private final String target;
|
||||||
private final String backendService;
|
|
||||||
private final String locality;
|
|
||||||
private final String securityLevel;
|
|
||||||
|
|
||||||
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
|
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
|
||||||
BackoffPolicy.Provider backoffPolicyProvider,
|
BackoffPolicy.Provider backoffPolicyProvider,
|
||||||
|
@ -175,7 +174,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
|
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
|
||||||
ChannelTracer channelTracer, InternalLogId logId,
|
ChannelTracer channelTracer, InternalLogId logId,
|
||||||
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
|
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
|
||||||
String target, String backendService, String locality, String securityLevel,
|
String target,
|
||||||
MetricRecorder metricRecorder) {
|
MetricRecorder metricRecorder) {
|
||||||
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
|
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
|
||||||
Preconditions.checkNotNull(addressGroups, "addressGroups");
|
Preconditions.checkNotNull(addressGroups, "addressGroups");
|
||||||
|
@ -201,9 +200,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
this.transportFilters = transportFilters;
|
this.transportFilters = transportFilters;
|
||||||
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
|
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
|
||||||
this.target = target;
|
this.target = target;
|
||||||
this.backendService = backendService;
|
|
||||||
this.locality = locality;
|
|
||||||
this.securityLevel = securityLevel;
|
|
||||||
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
|
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -592,8 +588,13 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
@Override
|
@Override
|
||||||
public void transportReady() {
|
public void transportReady() {
|
||||||
channelLogger.log(ChannelLogLevel.INFO, "READY");
|
channelLogger.log(ChannelLogLevel.INFO, "READY");
|
||||||
subchannelMetrics.recordConnectionAttemptSucceeded(
|
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
|
||||||
buildLabelSet(null, extractSecurityLevel()));
|
addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE),
|
||||||
|
addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME),
|
||||||
|
null,
|
||||||
|
extractSecurityLevel(
|
||||||
|
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
|
||||||
|
));
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -623,7 +624,11 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
channelLogger.log(
|
channelLogger.log(
|
||||||
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
|
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
|
||||||
shutdownInitiated = true;
|
shutdownInitiated = true;
|
||||||
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet("Peer Pressure", null));
|
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
|
||||||
|
addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE),
|
||||||
|
addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME),
|
||||||
|
null, null
|
||||||
|
));
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -664,8 +669,13 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
for (ClientTransportFilter filter : transportFilters) {
|
for (ClientTransportFilter filter : transportFilters) {
|
||||||
filter.transportTerminated(transport.getAttributes());
|
filter.transportTerminated(transport.getAttributes());
|
||||||
}
|
}
|
||||||
subchannelMetrics.recordDisconnection(buildLabelSet("Peer Pressure",
|
subchannelMetrics.recordDisconnection(buildLabelSet(
|
||||||
null));
|
addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE),
|
||||||
|
addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME),
|
||||||
|
"Peer Pressure",
|
||||||
|
extractSecurityLevel(
|
||||||
|
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
|
||||||
|
));
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -677,8 +687,20 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private String extractSecurityLevel() {
|
private String extractSecurityLevel(SecurityLevel securityLevel) {
|
||||||
return "Hold the door!";
|
if (securityLevel == null) {
|
||||||
|
return "none";
|
||||||
|
}
|
||||||
|
switch (securityLevel) {
|
||||||
|
case NONE:
|
||||||
|
return "none";
|
||||||
|
case INTEGRITY:
|
||||||
|
return "integrity_only";
|
||||||
|
case PRIVACY_AND_INTEGRITY:
|
||||||
|
return "privacy_and_integrity";
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -839,13 +861,14 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
|
||||||
return buffer.toString();
|
return buffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private OtelMetricsAttributes buildLabelSet(String disconnectError, String secLevel) {
|
private OtelMetricsAttributes buildLabelSet(String backendService, String locality,
|
||||||
|
String disconnectError, String securityLevel) {
|
||||||
return new OtelMetricsAttributes(
|
return new OtelMetricsAttributes(
|
||||||
target,
|
target,
|
||||||
backendService,
|
backendService,
|
||||||
locality,
|
locality,
|
||||||
disconnectError,
|
disconnectError,
|
||||||
secLevel != null ? secLevel : securityLevel
|
securityLevel
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -415,7 +415,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
LbHelperImpl lbHelper = new LbHelperImpl();
|
LbHelperImpl lbHelper = new LbHelperImpl();
|
||||||
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
|
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
|
||||||
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
|
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
|
||||||
// may throw. We don't want to confuse our state, even if we will enter panic mode.
|
// may throw. We don't want to confuse our state, even if we enter panic mode.
|
||||||
this.lbHelper = lbHelper;
|
this.lbHelper = lbHelper;
|
||||||
|
|
||||||
channelStateManager.gotoState(CONNECTING);
|
channelStateManager.gotoState(CONNECTING);
|
||||||
|
@ -1466,9 +1466,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
subchannelLogger,
|
subchannelLogger,
|
||||||
transportFilters,
|
transportFilters,
|
||||||
target,
|
target,
|
||||||
"",
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
lbHelper.getMetricRecorder());
|
lbHelper.getMetricRecorder());
|
||||||
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
|
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
|
||||||
.setDescription("Child Subchannel created")
|
.setDescription("Child Subchannel created")
|
||||||
|
@ -1901,9 +1898,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
||||||
subchannelLogId,
|
subchannelLogId,
|
||||||
subchannelLogger,
|
subchannelLogger,
|
||||||
transportFilters, target,
|
transportFilters, target,
|
||||||
"",
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
lbHelper.getMetricRecorder());
|
lbHelper.getMetricRecorder());
|
||||||
|
|
||||||
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
|
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
|
||||||
|
|
|
@ -92,7 +92,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
|
||||||
return Status.FAILED_PRECONDITION.withDescription("Already shut down");
|
return Status.FAILED_PRECONDITION.withDescription("Already shut down");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache whether or not this is a petiole policy, which is based off of an address attribute
|
// Check weather or not this is a petiole policy, which is based off of an address attribute
|
||||||
Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
|
Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
|
||||||
this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
|
this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
|
||||||
|
|
||||||
|
|
|
@ -1449,9 +1449,6 @@ public class InternalSubchannelTest {
|
||||||
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
|
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
|
||||||
Collections.emptyList(),
|
Collections.emptyList(),
|
||||||
"",
|
"",
|
||||||
"",
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
new MetricRecorder() {}
|
new MetricRecorder() {}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,7 +305,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
|
private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
|
||||||
Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
|
Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
|
||||||
String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME);
|
String localityName = addressAttributes.get(LoadBalancer.ATTR_LOCALITY_NAME);
|
||||||
|
|
||||||
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
|
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
|
||||||
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
|
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
|
||||||
|
|
|
@ -428,7 +428,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
||||||
Attributes attr =
|
Attributes attr =
|
||||||
endpoint.eag().getAttributes().toBuilder()
|
endpoint.eag().getAttributes().toBuilder()
|
||||||
.set(XdsAttributes.ATTR_LOCALITY, locality)
|
.set(XdsAttributes.ATTR_LOCALITY, locality)
|
||||||
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
|
.set(LoadBalancer.ATTR_LOCALITY_NAME, localityName)
|
||||||
.set(XdsAttributes.ATTR_LOCALITY_WEIGHT,
|
.set(XdsAttributes.ATTR_LOCALITY_WEIGHT,
|
||||||
localityLbInfo.localityWeight())
|
localityLbInfo.localityWeight())
|
||||||
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
|
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
|
||||||
|
@ -679,7 +679,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
||||||
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
|
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
|
||||||
Attributes attr = eag.getAttributes().toBuilder()
|
Attributes attr = eag.getAttributes().toBuilder()
|
||||||
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
|
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
|
||||||
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
|
.set(LoadBalancer.ATTR_LOCALITY_NAME, localityName)
|
||||||
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
|
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
|
||||||
.build();
|
.build();
|
||||||
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
|
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
|
||||||
|
|
|
@ -74,7 +74,7 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
|
||||||
Map<String, Integer> localityWeights = new HashMap<>();
|
Map<String, Integer> localityWeights = new HashMap<>();
|
||||||
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
|
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
|
||||||
Attributes eagAttrs = eag.getAttributes();
|
Attributes eagAttrs = eag.getAttributes();
|
||||||
String locality = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_NAME);
|
String locality = eagAttrs.get(LoadBalancer.ATTR_LOCALITY_NAME);
|
||||||
Integer localityWeight = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_WEIGHT);
|
Integer localityWeight = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_WEIGHT);
|
||||||
|
|
||||||
if (locality == null) {
|
if (locality == null) {
|
||||||
|
|
|
@ -81,13 +81,6 @@ final class XdsAttributes {
|
||||||
static final Attributes.Key<Locality> ATTR_LOCALITY =
|
static final Attributes.Key<Locality> ATTR_LOCALITY =
|
||||||
Attributes.Key.create("io.grpc.xds.XdsAttributes.locality");
|
Attributes.Key.create("io.grpc.xds.XdsAttributes.locality");
|
||||||
|
|
||||||
/**
|
|
||||||
* The name of the locality that this EquivalentAddressGroup is in.
|
|
||||||
*/
|
|
||||||
@EquivalentAddressGroup.Attr
|
|
||||||
static final Attributes.Key<String> ATTR_LOCALITY_NAME =
|
|
||||||
Attributes.Key.create("io.grpc.xds.XdsAttributes.localityName");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Endpoint weight for load balancing purposes.
|
* Endpoint weight for load balancing purposes.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1017,7 +1017,7 @@ public class ClusterImplLoadBalancerTest {
|
||||||
Attributes.Builder attributes = Attributes.newBuilder()
|
Attributes.Builder attributes = Attributes.newBuilder()
|
||||||
.set(XdsAttributes.ATTR_LOCALITY, locality)
|
.set(XdsAttributes.ATTR_LOCALITY, locality)
|
||||||
// Unique but arbitrary string
|
// Unique but arbitrary string
|
||||||
.set(XdsAttributes.ATTR_LOCALITY_NAME, locality.toString());
|
.set(LoadBalancer.ATTR_LOCALITY_NAME, locality.toString());
|
||||||
if (authorityHostname != null) {
|
if (authorityHostname != null) {
|
||||||
attributes.set(XdsAttributes.ATTR_ADDRESS_NAME, authorityHostname);
|
attributes.set(XdsAttributes.ATTR_ADDRESS_NAME, authorityHostname);
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,7 +254,7 @@ public class WrrLocalityLoadBalancerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
Attributes.Builder attrBuilder = Attributes.newBuilder()
|
Attributes.Builder attrBuilder = Attributes.newBuilder()
|
||||||
.set(XdsAttributes.ATTR_LOCALITY_NAME, locality);
|
.set(LoadBalancer.ATTR_LOCALITY_NAME, locality);
|
||||||
if (localityWeight != null) {
|
if (localityWeight != null) {
|
||||||
attrBuilder.set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
|
attrBuilder.set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue