otel: subchannel metrics

This commit is contained in:
AgraVator 2025-07-07 11:47:08 +05:30
parent 6ff8ecac09
commit a06568cdce
6 changed files with 233 additions and 4 deletions

View File

@ -48,6 +48,7 @@ import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
@ -160,6 +161,11 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private Status shutdownReason; private Status shutdownReason;
private volatile Attributes connectedAddressAttributes; private volatile Attributes connectedAddressAttributes;
private final SubchannelMetrics subchannelMetrics;
private final String target;
private final String backendService;
private final String locality;
private final String securityLevel;
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent, InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
@ -168,7 +174,9 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
Callback callback, InternalChannelz channelz, CallTracer callsTracer, Callback callback, InternalChannelz channelz, CallTracer callsTracer,
ChannelTracer channelTracer, InternalLogId logId, ChannelTracer channelTracer, InternalLogId logId,
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) { ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
String target, String backendService, String locality, String securityLevel,
MetricRecorder metricRecorder) {
List<EquivalentAddressGroup> addressGroups = args.getAddresses(); List<EquivalentAddressGroup> addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups"); Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
@ -192,6 +200,11 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters; this.transportFilters = transportFilters;
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY); this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
this.target = target;
this.backendService = backendService;
this.locality = locality;
this.securityLevel = securityLevel;
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
} }
ChannelLogger getChannelLogger() { ChannelLogger getChannelLogger() {
@ -579,6 +592,8 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
@Override @Override
public void transportReady() { public void transportReady() {
channelLogger.log(ChannelLogLevel.INFO, "READY"); channelLogger.log(ChannelLogLevel.INFO, "READY");
subchannelMetrics.recordConnectionAttemptSucceeded(
buildLabelSet(null, extractSecurityLevel()));
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -608,6 +623,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
channelLogger.log( channelLogger.log(
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
shutdownInitiated = true; shutdownInitiated = true;
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet("Peer Pressure", null));
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -648,6 +664,8 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
for (ClientTransportFilter filter : transportFilters) { for (ClientTransportFilter filter : transportFilters) {
filter.transportTerminated(transport.getAttributes()); filter.transportTerminated(transport.getAttributes());
} }
subchannelMetrics.recordDisconnection(buildLabelSet("Peer Pressure",
null));
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -658,6 +676,10 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
} }
}); });
} }
private String extractSecurityLevel() {
return "Hold the door!";
}
} }
// All methods are called in syncContext // All methods are called in syncContext
@ -817,6 +839,17 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
return buffer.toString(); return buffer.toString();
} }
private OtelMetricsAttributes buildLabelSet(String disconnectError, String secLevel) {
return new OtelMetricsAttributes(
target,
backendService,
locality,
disconnectError,
secLevel != null ? secLevel : securityLevel
);
}
@VisibleForTesting @VisibleForTesting
static final class TransportLogger extends ChannelLogger { static final class TransportLogger extends ChannelLogger {
// Changed just after construction to break a cyclic dependency. // Changed just after construction to break a cyclic dependency.

View File

@ -1464,7 +1464,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
subchannelTracer, subchannelTracer,
subchannelLogId, subchannelLogId,
subchannelLogger, subchannelLogger,
transportFilters); transportFilters,
target,
"",
"",
"",
lbHelper.getMetricRecorder());
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel created") .setDescription("Child Subchannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
@ -1895,7 +1900,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
subchannelTracer, subchannelTracer,
subchannelLogId, subchannelLogId,
subchannelLogger, subchannelLogger,
transportFilters); transportFilters, target,
"",
"",
"",
lbHelper.getMetricRecorder());
channelTracer.reportEvent(new ChannelTrace.Event.Builder() channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started") .setDescription("Child Subchannel started")

View File

@ -0,0 +1,69 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import io.grpc.Attributes;
class OtelMetricsAttributes {
final String target;
final String backendService;
final String locality;
final String disconnectError;
final String securityLevel;
public OtelMetricsAttributes(String target, String backendService, String locality,
String disconnectError, String securityLevel) {
this.target = target;
this.backendService = backendService;
this.locality = locality;
this.disconnectError = disconnectError;
this.securityLevel = securityLevel;
}
public Attributes toOtelMetricsAttributes() {
Attributes attributes =
Attributes.EMPTY;
if (target != null) {
attributes.toBuilder()
.set(Attributes.Key.create("grpc.target"), target)
.build();
}
if (backendService != null) {
attributes.toBuilder()
.set(Attributes.Key.create("grpc.lb.backend_service"), backendService)
.build();
}
if (locality != null) {
attributes.toBuilder()
.set(Attributes.Key.create("grpc.lb.locality"), locality)
.build();
}
if (disconnectError != null) {
attributes.toBuilder()
.set(Attributes.Key.create("grpc.disconnect_error"), disconnectError)
.build();
}
if (securityLevel != null) {
attributes.toBuilder()
.set(Attributes.Key.create("grpc.security_level"), securityLevel)
.build();
}
return attributes;
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
public final class SubchannelMetrics {
private static final LongCounterMetricInstrument disconnections;
private static final LongCounterMetricInstrument connectionAttemptsSucceeded;
private static final LongCounterMetricInstrument connectionAttemptsFailed;
private static final LongCounterMetricInstrument openConnections;
private final MetricRecorder metricRecorder;
public SubchannelMetrics(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
}
static {
MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry();
disconnections = metricInstrumentRegistry.registerLongCounter(
"grpc.subchannel.disconnections1",
"EXPERIMENTAL. Number of times the selected subchannel becomes disconnected",
"{disconnection}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"),
false
);
connectionAttemptsSucceeded = metricInstrumentRegistry.registerLongCounter(
"grpc.subchannel.connection_attempts_succeeded",
"EXPERIMENTAL. Number of successful connection attempts",
"{attempt}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"),
false
);
connectionAttemptsFailed = metricInstrumentRegistry.registerLongCounter(
"grpc.subchannel.connection_attempts_failed",
"EXPERIMENTAL. Number of failed connection attempts",
"{attempt}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"),
false
);
openConnections = metricInstrumentRegistry.registerLongCounter(
"grpc.subchannel.open_connections",
"EXPERIMENTAL. Number of open connections.",
"{connection}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.security_level", "grpc.lb.backend_service", "grpc.lb.locality"),
false
);
}
public void recordConnectionAttemptSucceeded(OtelMetricsAttributes labelSet) {
metricRecorder
.addLongCounter(connectionAttemptsSucceeded, 1,
ImmutableList.of(labelSet.target),
ImmutableList.of(labelSet.backendService, labelSet.locality));
metricRecorder
.addLongCounter(openConnections, 1,
ImmutableList.of(labelSet.target),
ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality));
}
public void recordConnectionAttemptFailed(OtelMetricsAttributes labelSet) {
metricRecorder
.addLongCounter(connectionAttemptsFailed, 1,
ImmutableList.of(labelSet.target),
ImmutableList.of(labelSet.backendService, labelSet.locality));
}
public void recordDisconnection(OtelMetricsAttributes labelSet) {
metricRecorder
.addLongCounter(disconnections, 1,
ImmutableList.of(labelSet.target),
ImmutableList.of(labelSet.backendService, labelSet.locality, labelSet.disconnectError));
metricRecorder
.addLongCounter(openConnections, -11,
ImmutableList.of(labelSet.target),
ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality));
}
}

View File

@ -48,6 +48,7 @@ import io.grpc.InternalChannelz;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId; import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.MetricRecorder;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport; import io.grpc.internal.InternalSubchannel.CallTracingTransport;
@ -1446,7 +1447,13 @@ public class InternalSubchannelTest {
subchannelTracer, subchannelTracer,
logId, logId,
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList()); Collections.emptyList(),
"",
"",
"",
"",
new MetricRecorder() {}
);
} }
private void assertNoCallbackInvoke() { private void assertNoCallbackInvoke() {

View File

@ -36,6 +36,12 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> BACKEND_SERVICE_KEY = public static final AttributeKey<String> BACKEND_SERVICE_KEY =
AttributeKey.stringKey("grpc.lb.backend_service"); AttributeKey.stringKey("grpc.lb.backend_service");
public static final AttributeKey<String> DISCONNECT_ERROR_KEY =
AttributeKey.stringKey("grpc.disconnect_error");
public static final AttributeKey<String> SECURITY_LEVEL_KEY =
AttributeKey.stringKey("grpc.security_level");
public static final List<Double> LATENCY_BUCKETS = public static final List<Double> LATENCY_BUCKETS =
ImmutableList.of( ImmutableList.of(
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d, 0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,