logs the metrics inside sync context

This commit is contained in:
AgraVator 2025-07-22 23:25:11 +05:30
parent 623c0f91c5
commit c7135618ae
3 changed files with 87 additions and 45 deletions

View File

@ -201,8 +201,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
}
/**
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* more buffered streams.
*/
@Override

View File

@ -588,15 +588,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
@Override
public void transportReady() {
channelLogger.log(ChannelLogLevel.INFO, "READY");
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null,
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() {
@Override
public void run() {
@ -611,6 +602,15 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
pendingTransport = null;
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null,
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
}
}
});
@ -626,22 +626,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
channelLogger.log(
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
shutdownInitiated = true;
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null, null
));
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() {
@Override
public void run() {
@ -652,11 +636,27 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
activeTransport = null;
addressIndex.reset();
gotoNonErrorState(IDLE);
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
} else if (pendingTransport == transport) {
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null, null
));
Preconditions.checkState(state.getState() == CONNECTING,
"Expected state is CONNECTING, actual state is %s", state.getState());
addressIndex.increment();
// Continue reconnect if there are still addresses to try.
// Continue reconnecting with remaining addresses.
if (!addressIndex.isValid()) {
pendingTransport = null;
addressIndex.reset();

View File

@ -123,8 +123,6 @@ public class InternalSubchannelTest {
private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class,
delegatesTo(new MetricRecorderImpl()));
private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final LinkedList<String> callbackInvokes = new LinkedList<>();
private final InternalSubchannel.Callback mockInternalSubchannelCallback =
new InternalSubchannel.Callback() {
@ -1471,10 +1469,60 @@ public class InternalSubchannelTest {
}
@Test
public void subchannelStateChanges_triggersMetrics_disconnectionOnly() {
// 1. Mock the backoff policy
public void subchannelStateChanges_triggersAttemptFailedMetric() {
// 1. Setup: Standard subchannel initialization
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
SocketAddress addr = mock(SocketAddress.class);
Attributes eagAttributes = Attributes.newBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
.build();
List<EquivalentAddressGroup> addressGroups =
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes));
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
LoadBalancer.CreateSubchannelArgs createSubchannelArgs =
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build();
internalSubchannel = new InternalSubchannel(
createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider,
mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz,
CallTracer.getDefaultFactory().create(), subchannelTracer, logId,
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList(), AUTHORITY, mockMetricRecorder
);
// --- Action: Simulate the "connecting to failed" transition ---
// a. Initiate the connection attempt. The subchannel is now CONNECTING.
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull("A connection attempt should have been made", transportInfo);
// b. Fail the transport before it can signal `transportReady()`.
transportInfo.listener.transportShutdown(
Status.INTERNAL.withDescription("Simulated connect failure"));
fakeClock.runDueTasks(); // Process the failure event
// --- Verification ---
// a. Verify that the "connection_attempts_failed" metric was recorded exactly once.
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
);
// b. Verify no other metrics were recorded. This confirms it wasn't incorrectly
// logged as a success, disconnection, or open connection.
verifyNoMoreInteractions(mockMetricRecorder);
}
@Test
public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
// 1. Mock the backoff policy (needed for subchannel creation)
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
when(mockBackoffPolicy.nextBackoffNanos()).thenReturn(RECONNECT_BACKOFF_DELAY_NANOS);
// 2. Setup Subchannel with attributes
SocketAddress addr = mock(SocketAddress.class);
@ -1500,15 +1548,16 @@ public class InternalSubchannelTest {
Collections.emptyList(), AUTHORITY, mockMetricRecorder
);
// --- Action ---
// --- Action: Successful connection ---
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
fakeClock.runDueTasks();
fakeClock.runDueTasks(); // Process the successful connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
fakeClock.runDueTasks();
// --- Action: Transport is shut down by the "peer" ---
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("Peer Pressure"));
fakeClock.runDueTasks(); // Process the shutdown
// --- Verification ---
InOrder inOrder = inOrder(mockMetricRecorder);
@ -1527,14 +1576,7 @@ public class InternalSubchannelTest {
eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY))
);
inOrder.verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
);
// Verify disconnection and automatic failure metrics
// Verify disconnection metrics
inOrder.verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.disconnections"),
eq(1L),