core: add Grpc.TRANSPORT_ATTR_LOCAL_ADDR (#4906)

Resolves #4135
This commit is contained in:
Kun Zhang 2018-10-03 16:43:37 -07:00 committed by GitHub
parent 31328652d4
commit fbfc3a40d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 50 additions and 4 deletions

View File

@ -145,6 +145,7 @@ public abstract class AltsProtocolNegotiator implements ProtocolNegotiator {
.set(TSI_PEER_KEY, altsEvt.peer()) .set(TSI_PEER_KEY, altsEvt.peer())
.set(ALTS_CONTEXT_KEY, altsContext) .set(ALTS_CONTEXT_KEY, altsContext)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(), .build(),
new Security(new OtherSecurity("alts", Any.pack(altsContext.context)))); new Security(new OtherSecurity("alts", Any.pack(altsContext.context))));

View File

@ -347,6 +347,8 @@ public class AltsProtocolNegotiatorTest {
.isEqualTo(mockedAltsContext); .isEqualTo(mockedAltsContext);
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString()) assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString())
.isEqualTo("embedded"); .isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString())
.isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(CallCredentials.ATTR_SECURITY_LEVEL)) assertThat(grpcHandler.attrs.get(CallCredentials.ATTR_SECURITY_LEVEL))
.isEqualTo(SecurityLevel.PRIVACY_AND_INTEGRITY); .isEqualTo(SecurityLevel.PRIVACY_AND_INTEGRITY);
} }

View File

@ -36,7 +36,15 @@ public final class Grpc {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr @TransportAttr
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR = public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR =
Attributes.Key.create("remote-addr"); Attributes.Key.create("remote-addr");
/**
* Attribute key for the local address of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_LOCAL_ADDR =
Attributes.Key.create("local-addr");
/** /**
* Attribute key for SSL session of a transport. * Attribute key for SSL session of a transport.
@ -44,7 +52,7 @@ public final class Grpc {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr @TransportAttr
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION = public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
Attributes.Key.create("ssl-session"); Attributes.Key.create("ssl-session");
/** /**
* Annotation for transport attributes. It follows the annotation semantics defined * Annotation for transport attributes. It follows the annotation semantics defined

View File

@ -132,6 +132,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
synchronized (InProcessTransport.this) { synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder() Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.build(); .build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
clientTransportListener.transportReady(); clientTransportListener.transportReady();

View File

@ -1759,7 +1759,7 @@ public abstract class AbstractInteropTest {
} }
} }
/** Helper for getting remote address {@link io.grpc.ServerCall#getAttributes()} */ /** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainRemoteClientAddr() { protected SocketAddress obtainRemoteClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS); blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
@ -1769,6 +1769,16 @@ public abstract class AbstractInteropTest {
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
} }
/** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainLocalClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
}
/** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */ /** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */
protected void assertX500SubjectDn(String tlsInfo) { protected void assertX500SubjectDn(String tlsInfo) {
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.TestServiceBlockingStub stub =

View File

@ -87,6 +87,13 @@ public class Http2NettyTest extends AbstractInteropTest {
assertNotEquals(getPort(), isa.getPort()); assertNotEquals(getPort(), isa.getPort());
} }
@Test
public void localAddr() throws Exception {
InetSocketAddress isa = (InetSocketAddress) obtainLocalClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
assertEquals(getPort(), isa.getPort());
}
@Test @Test
public void tlsInfo() { public void tlsInfo() {
assertX500SubjectDn("CN=testclient, O=Internet Widgits Pty Ltd, ST=Some-State, C=AU"); assertX500SubjectDn("CN=testclient, O=Internet Widgits Pty Ltd, ST=Some-State, C=AU");

View File

@ -90,6 +90,7 @@ public final class ProtocolNegotiators {
// Set sttributes before replace to be sure we pass it before accepting any requests. // Set sttributes before replace to be sure we pass it before accepting any requests.
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder() handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(), .build(),
/*securityInfo=*/ null); /*securityInfo=*/ null);
// Just replace this handler with the gRPC handler. // Just replace this handler with the gRPC handler.
@ -163,6 +164,7 @@ public final class ProtocolNegotiators {
Attributes.newBuilder() Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(), .build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session))); new InternalChannelz.Security(new InternalChannelz.Tls(session)));
// Replace this handler with the GRPC handler. // Replace this handler with the GRPC handler.
@ -673,6 +675,7 @@ public final class ProtocolNegotiators {
Attributes.newBuilder() Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(), .build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session))); new InternalChannelz.Security(new InternalChannelz.Tls(session)));
@ -721,6 +724,7 @@ public final class ProtocolNegotiators {
Attributes Attributes
.newBuilder() .newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(), .build(),
/*securityInfo=*/ null); /*securityInfo=*/ null);
@ -764,6 +768,7 @@ public final class ProtocolNegotiators {
Attributes Attributes
.newBuilder() .newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(), .build(),
/*securityInfo=*/ null); /*securityInfo=*/ null);

View File

@ -77,12 +77,14 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
@ -105,6 +107,8 @@ public class NettyClientTransportTest {
private ManagedClientTransport.Listener clientTransportListener; private ManagedClientTransport.Listener clientTransportListener;
private final List<NettyClientTransport> transports = new ArrayList<>(); private final List<NettyClientTransport> transports = new ArrayList<>();
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1); private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener(); private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz(); private final InternalChannelz channelz = new InternalChannelz();
@ -536,6 +540,11 @@ public class NettyClientTransportTest {
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION)); assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
} }
@Test @Test
@ -749,7 +758,7 @@ public class NettyClientTransportTest {
} }
} }
private static final class EchoServerListener implements ServerListener { private final class EchoServerListener implements ServerListener {
final List<NettyServerTransport> transports = new ArrayList<>(); final List<NettyServerTransport> transports = new ArrayList<>();
final List<EchoServerStreamListener> streamListeners = final List<EchoServerStreamListener> streamListeners =
Collections.synchronizedList(new ArrayList<EchoServerStreamListener>()); Collections.synchronizedList(new ArrayList<EchoServerStreamListener>());
@ -769,6 +778,7 @@ public class NettyClientTransportTest {
@Override @Override
public Attributes transportReady(Attributes transportAttrs) { public Attributes transportReady(Attributes transportAttrs) {
serverTransportAttributesList.add(transportAttrs);
return transportAttrs; return transportAttrs;
} }

View File

@ -484,6 +484,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
attributes = Attributes attributes = Attributes
.newBuilder() .newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession) .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
.set(CallCredentials.ATTR_SECURITY_LEVEL, .set(CallCredentials.ATTR_SECURITY_LEVEL,
sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY) sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)

View File

@ -707,6 +707,7 @@ public abstract class AbstractTransportTest {
assertEquals("additional attribute value", assertEquals("additional attribute value",
serverStream.getAttributes().get(ADDITIONAL_TRANSPORT_ATTR_KEY)); serverStream.getAttributes().get(ADDITIONAL_TRANSPORT_ATTR_KEY));
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
serverStream.request(1); serverStream.request(1);
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));