add test case

This commit is contained in:
AgraVator 2025-07-15 15:29:24 +05:30
parent 2293e7fada
commit 623c0f91c5
3 changed files with 117 additions and 11 deletions

View File

@ -633,6 +633,15 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null, null null, null
)); ));
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -673,15 +682,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
for (ClientTransportFilter filter : transportFilters) { for (ClientTransportFilter filter : transportFilters) {
filter.transportTerminated(transport.getAttributes()); filter.transportTerminated(transport.getAttributes());
} }
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -39,7 +39,7 @@ public final class SubchannelMetrics {
MetricInstrumentRegistry metricInstrumentRegistry MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry(); = MetricInstrumentRegistry.getDefaultRegistry();
disconnections = metricInstrumentRegistry.registerLongCounter( disconnections = metricInstrumentRegistry.registerLongCounter(
"grpc.subchannel.disconnections1", "grpc.subchannel.disconnections",
"EXPERIMENTAL. Number of times the selected subchannel becomes disconnected", "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected",
"{disconnection}", "{disconnection}",
Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.target"),

View File

@ -29,10 +29,13 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -48,7 +51,10 @@ import io.grpc.InternalChannelz;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId; import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder; import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
import io.grpc.SecurityLevel;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport; import io.grpc.internal.InternalSubchannel.CallTracingTransport;
@ -69,6 +75,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule; import org.mockito.junit.MockitoRule;
@ -82,6 +89,9 @@ public class InternalSubchannelTest {
public final MockitoRule mocks = MockitoJUnit.rule(); public final MockitoRule mocks = MockitoJUnit.rule();
private static final String AUTHORITY = "fakeauthority"; private static final String AUTHORITY = "fakeauthority";
private static final String BACKEND_SERVICE = "ice-cream-factory-service";
private static final String LOCALITY = "mars-olympus-mons-datacenter";
private static final SecurityLevel SECURITY_LEVEL = SecurityLevel.PRIVACY_AND_INTEGRITY;
private static final String USER_AGENT = "mosaic"; private static final String USER_AGENT = "mosaic";
private static final ConnectivityStateInfo UNAVAILABLE_STATE = private static final ConnectivityStateInfo UNAVAILABLE_STATE =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
@ -109,6 +119,12 @@ public class InternalSubchannelTest {
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private ClientTransportFactory mockTransportFactory; @Mock private ClientTransportFactory mockTransportFactory;
@Mock private BackoffPolicy mockBackoffPolicy;
private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class,
delegatesTo(new MetricRecorderImpl()));
private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final LinkedList<String> callbackInvokes = new LinkedList<>(); private final LinkedList<String> callbackInvokes = new LinkedList<>();
private final InternalSubchannel.Callback mockInternalSubchannelCallback = private final InternalSubchannel.Callback mockInternalSubchannelCallback =
new InternalSubchannel.Callback() { new InternalSubchannel.Callback() {
@ -1449,10 +1465,92 @@ public class InternalSubchannelTest {
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList(), Collections.emptyList(),
"", "",
new MetricRecorder() {} new MetricRecorder() {
}
); );
} }
@Test
public void subchannelStateChanges_triggersMetrics_disconnectionOnly() {
// 1. Mock the backoff policy
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
when(mockBackoffPolicy.nextBackoffNanos()).thenReturn(RECONNECT_BACKOFF_DELAY_NANOS);
// 2. Setup Subchannel with attributes
SocketAddress addr = mock(SocketAddress.class);
Attributes eagAttributes = Attributes.newBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
.set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
.build();
List<EquivalentAddressGroup> addressGroups =
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes));
createInternalSubchannel(new EquivalentAddressGroup(addr));
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
LoadBalancer.CreateSubchannelArgs createSubchannelArgs =
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build();
internalSubchannel = new InternalSubchannel(
createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider,
mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz,
CallTracer.getDefaultFactory().create(), subchannelTracer, logId,
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
Collections.emptyList(), AUTHORITY, mockMetricRecorder
);
// --- Action ---
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
fakeClock.runDueTasks();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
fakeClock.runDueTasks();
// --- Verification ---
InOrder inOrder = inOrder(mockMetricRecorder);
// Verify successful connection metrics
inOrder.verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
);
inOrder.verify(mockMetricRecorder).addLongUpDownCounter(
eqMetricInstrumentName("grpc.subchannel.open_connections"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY))
);
inOrder.verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
);
// Verify disconnection and automatic failure metrics
inOrder.verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.subchannel.disconnections"),
eq(1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "Peer Pressure"))
);
inOrder.verify(mockMetricRecorder).addLongUpDownCounter(
eqMetricInstrumentName("grpc.subchannel.open_connections"),
eq(-1L),
eq(Arrays.asList(AUTHORITY)),
eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY))
);
inOrder.verifyNoMoreInteractions();
}
private void assertNoCallbackInvoke() { private void assertNoCallbackInvoke() {
while (fakeExecutor.runDueTasks() > 0) {} while (fakeExecutor.runDueTasks() > 0) {}
assertEquals(0, callbackInvokes.size()); assertEquals(0, callbackInvokes.size());
@ -1463,5 +1561,13 @@ public class InternalSubchannelTest {
callbackInvokes.clear(); callbackInvokes.clear();
} }
static class MetricRecorderImpl implements MetricRecorder {
}
@SuppressWarnings("TypeParameterUnusedInFormals")
private <T extends MetricInstrument> T eqMetricInstrumentName(String name) {
return argThat(instrument -> instrument.getName().equals(name));
}
private static class FakeSocketAddress extends SocketAddress {} private static class FakeSocketAddress extends SocketAddress {}
} }