Implement grpc.lb.backend_service optional label

This completes gRFC A89. 7162d2d66 and fc86084df had already implemented
the LB plumbing for the optional label on RPC metrics. This observes the
value in OpenTelemetry and adds it to WRR metrics as well.

https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md
This commit is contained in:
Eric Anderson 2025-04-21 06:17:43 -07:00 committed by GitHub
parent 53de8a72ca
commit 9619453799
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 212 additions and 21 deletions

View File

@ -275,6 +275,11 @@ public abstract class NameResolver {
@Documented
public @interface ResolutionResultAttr {}
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989")
@ResolutionResultAttr
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE");
/**
* Information that a {@link Factory} uses to create a {@link NameResolver}.
*

View File

@ -17,6 +17,7 @@
package io.grpc.opentelemetry;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@ -70,7 +71,6 @@ import javax.annotation.Nullable;
*/
final class OpenTelemetryMetricsModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
ImmutableSet.of(
"grpc.client.attempt.started",
@ -90,6 +90,7 @@ final class OpenTelemetryMetricsModule {
private final OpenTelemetryMetricsResource resource;
private final Supplier<Stopwatch> stopwatchSupplier;
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
@ -97,7 +98,8 @@ final class OpenTelemetryMetricsModule {
List<OpenTelemetryPlugin> plugins) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
}
@ -162,6 +164,7 @@ final class OpenTelemetryMetricsModule {
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile String locality;
volatile String backendService;
long attemptNanos;
Code statusCode;
@ -206,9 +209,12 @@ final class OpenTelemetryMetricsModule {
@Override
public void addOptionalLabel(String key, String value) {
if (LOCALITY_LABEL_NAME.equals(key)) {
if ("grpc.lb.locality".equals(key)) {
locality = value;
}
if ("grpc.lb.backend_service".equals(key)) {
backendService = value;
}
}
@Override
@ -248,6 +254,13 @@ final class OpenTelemetryMetricsModule {
}
builder.put(LOCALITY_KEY, savedLocality);
}
if (module.backendServiceEnabled) {
String savedBackendService = backendService;
if (savedBackendService == null) {
savedBackendService = "";
}
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
}
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}

View File

@ -33,6 +33,9 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> LOCALITY_KEY =
AttributeKey.stringKey("grpc.lb.locality");
public static final AttributeKey<String> BACKEND_SERVICE_KEY =
AttributeKey.stringKey("grpc.lb.backend_service");
public static final List<Double> LATENCY_BUCKETS =
ImmutableList.of(
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,

View File

@ -51,6 +51,7 @@ import io.grpc.internal.FakeClock;
import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.grpc.testing.GrpcServerRule;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@ -1070,6 +1071,140 @@ public class OpenTelemetryMetricsModuleTest {
point -> point.hasAttributes(clientAttributes))));
}
@Test
public void clientBackendServiceMetrics_present() {
String target = "target:///";
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
emptyList());
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten");
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());
io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
= clientAttributes.toBuilder()
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon")
.build();
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}
@Test
public void clientBackendServiceMetrics_missing() {
String target = "target:///";
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
emptyList());
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());
io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
= clientAttributes.toBuilder()
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "")
.build();
assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}
@Test
public void serverBasicMetrics() {
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,

View File

@ -32,6 +32,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.GrpcUtil;
@ -150,7 +151,9 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
childSwitchLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setAttributes(attributes)
.setAttributes(attributes.toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
.build())
.setLoadBalancingPolicyConfig(config.childConfig)
.build());
return Status.OK;

View File

@ -102,32 +102,44 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
private final long infTime;
private final Ticker ticker;
private String locality = "";
private String backendService = "";
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
// The metric instruments are only registered once and shared by all instances of this LB.
static {
MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry();
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.rr_fallback",
"EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
+ "with valid weight, which caused the WRR policy to fall back to RR behavior",
"{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
"{update}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
+ "from each scheduler update that don't yet have usable weight information",
"{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
+ "weight information",
"{endpoint}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.endpoint_weight_stale",
"EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
+ "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality"), false);
+ "older than the expiration period",
"{endpoint}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
"grpc.lb.wrr.endpoint_weights",
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
"{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality"),
"{weight}",
Lists.newArrayList(),
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
}
@ -168,6 +180,13 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
} else {
this.locality = "";
}
String backendService
= resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
if (backendService != null) {
this.backendService = backendService;
} else {
this.backendService = "";
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
@ -232,7 +251,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
helper.getMetricRecorder()
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
}
@ -240,18 +259,19 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
}
if (notYetUsableEndpoints.get() > 0) {
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality, backendService));
}
boolean weightsEffective = picker.updateWeight(newWeights);
if (!weightsEffective) {
helper.getMetricRecorder()
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
}
}

View File

@ -50,6 +50,7 @@ import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
@ -198,6 +199,7 @@ public class ClusterImplLoadBalancerTest {
assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
assertThat(childBalancer.attributes.get(XdsAttributes.XDS_CLIENT_POOL))
.isSameInstanceAs(xdsClientPool);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
}
/**

View File

@ -58,6 +58,7 @@ import io.grpc.LongCounterMetricInstrument;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.MetricSink;
import io.grpc.NameResolver;
import io.grpc.NoopMetricSink;
import io.grpc.ServerCall;
import io.grpc.ServerServiceDefinition;
@ -161,6 +162,7 @@ public class WeightedRoundRobinLoadBalancerTest {
private String channelTarget = "channel-target";
private String locality = "locality";
private String backendService = "the-backend-service";
public WeightedRoundRobinLoadBalancerTest() {
testHelperInstance = new TestHelper();
@ -1119,7 +1121,9 @@ public class WeightedRoundRobinLoadBalancerTest {
public void metrics() {
// Give WRR some valid addresses to work with.
Attributes attributesWithLocality = Attributes.newBuilder()
.set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build();
.set(WeightedTargetLoadBalancer.CHILD_NAME, locality)
.set(NameResolver.ATTR_BACKEND_SERVICE, backendService)
.build();
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(attributesWithLocality).build()));
@ -1269,7 +1273,7 @@ public class WeightedRoundRobinLoadBalancerTest {
argThat((instr) -> instr.getName().equals("grpc.lb.wrr.rr_fallback")),
eq(1L),
eq(Arrays.asList("directaddress:///wrr-metrics")),
eq(Arrays.asList("")));
eq(Arrays.asList("", "")));
}
// Verifies that the MetricRecorder has been called to record a long counter value of 1 for the
@ -1281,7 +1285,10 @@ public class WeightedRoundRobinLoadBalancerTest {
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
return longCounterInstrument.getName().equals(name);
}
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
}),
eq(value),
eq(Lists.newArrayList(channelTarget)),
eq(Lists.newArrayList(locality, backendService)));
}
// Verifies that the MetricRecorder has been called to record a given double histogram value the
@ -1293,7 +1300,10 @@ public class WeightedRoundRobinLoadBalancerTest {
public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
return doubleHistogramInstrument.getName().equals(name);
}
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
}),
eq(value),
eq(Lists.newArrayList(channelTarget)),
eq(Lists.newArrayList(locality, backendService)));
}
private int getNumFilteredPendingTasks() {