mirror of https://github.com/grpc/grpc-java.git
core: Propagate authority override from LB exactly once
Setting the authority is only useful when creating a real stream, as there will be a following pick otherwise. In addition, DelayedStream will buffer each call to setAuthority() in a list and we don't want that memory usage. Note that no LBs are using this feature yet, so users would not have been exposed to the memory use. We also needed to setAuthority() when the LB selected a subchannel on the first pick attempt.
This commit is contained in:
parent
7153ff8522
commit
90aefb26e7
|
@ -140,9 +140,15 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
|
||||
callOptions.isWaitForReady());
|
||||
if (transport != null) {
|
||||
return transport.newStream(
|
||||
ClientStream stream = transport.newStream(
|
||||
args.getMethodDescriptor(), args.getHeaders(), callOptions,
|
||||
tracers);
|
||||
// User code provided authority takes precedence over the LB provided one; this will be
|
||||
// overwritten by ClientCallImpl if the application sets an authority override
|
||||
if (pickResult.getAuthorityOverride() != null) {
|
||||
stream.setAuthority(pickResult.getAuthorityOverride());
|
||||
}
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
|
||||
|
@ -287,10 +293,6 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
for (final PendingStream stream : toProcess) {
|
||||
PickResult pickResult = picker.pickSubchannel(stream.args);
|
||||
CallOptions callOptions = stream.args.getCallOptions();
|
||||
// User code provided authority takes precedence over the LB provided one.
|
||||
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
|
||||
stream.setAuthority(pickResult.getAuthorityOverride());
|
||||
}
|
||||
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
|
||||
callOptions.isWaitForReady());
|
||||
if (transport != null) {
|
||||
|
@ -301,7 +303,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
if (callOptions.getExecutor() != null) {
|
||||
executor = callOptions.getExecutor();
|
||||
}
|
||||
Runnable runnable = stream.createRealStream(transport);
|
||||
Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
|
||||
if (runnable != null) {
|
||||
executor.execute(runnable);
|
||||
}
|
||||
|
@ -354,7 +356,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
}
|
||||
|
||||
/** Runnable may be null. */
|
||||
private Runnable createRealStream(ClientTransport transport) {
|
||||
private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
|
||||
ClientStream realStream;
|
||||
Context origContext = context.attach();
|
||||
try {
|
||||
|
@ -364,6 +366,13 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
} finally {
|
||||
context.detach(origContext);
|
||||
}
|
||||
if (authorityOverride != null) {
|
||||
// User code provided authority takes precedence over the LB provided one; this will be
|
||||
// overwritten by an enqueud call from ClientCallImpl if the application sets an authority
|
||||
// override. We must call the real stream directly because stream.start() has likely already
|
||||
// been called on the delayed stream.
|
||||
realStream.setAuthority(authorityOverride);
|
||||
}
|
||||
return setStream(realStream);
|
||||
}
|
||||
|
||||
|
|
|
@ -503,26 +503,11 @@ public class DelayedClientTransportTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFromLbIsIgnored() {
|
||||
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
|
||||
method, headers, callOptions, tracers);
|
||||
delayedStream.start(mock(ClientStreamListener.class));
|
||||
SubchannelPicker picker = mock(SubchannelPicker.class);
|
||||
PickResult pickResult = PickResult.withSubchannel(
|
||||
mockSubchannel, null, "authority-override-hostname-from-lb");
|
||||
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);
|
||||
|
||||
delayedTransport.reprocess(picker);
|
||||
fakeExecutor.runDueTasks();
|
||||
|
||||
verify(mockRealStream, never()).setAuthority("authority-override-hostname-from-lb");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void
|
||||
reprocess_authorityOverrideNotInCallOptions_authorityOverrideFromLbIsSetIntoStream() {
|
||||
public void reprocess_authorityOverrideFromLb() {
|
||||
InOrder inOrder = inOrder(mockRealStream);
|
||||
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
|
||||
method, headers, callOptions.withAuthority(null), tracers);
|
||||
delayedStream.setAuthority("authority-override-from-calloptions");
|
||||
delayedStream.start(mock(ClientStreamListener.class));
|
||||
SubchannelPicker picker = mock(SubchannelPicker.class);
|
||||
PickResult pickResult = PickResult.withSubchannel(
|
||||
|
@ -536,7 +521,10 @@ public class DelayedClientTransportTest {
|
|||
delayedTransport.reprocess(picker);
|
||||
fakeExecutor.runDueTasks();
|
||||
|
||||
verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
|
||||
// Must be set before start(), and may be overwritten
|
||||
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
|
||||
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
|
||||
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -563,28 +551,26 @@ public class DelayedClientTransportTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void newStream_assignsTransport_authorityFromCallOptionsSupersedesAuthorityFromLB() {
|
||||
public void newStream_authorityOverrideFromLb() {
|
||||
InOrder inOrder = inOrder(mockRealStream);
|
||||
SubchannelPicker picker = mock(SubchannelPicker.class);
|
||||
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
|
||||
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
|
||||
PickResult pickResult = PickResult.withSubchannel(
|
||||
subchannel, null, "authority-override-hostname-from-lb");
|
||||
mockSubchannel, null, "authority-override-hostname-from-lb");
|
||||
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);
|
||||
ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
|
||||
ArgumentCaptor.forClass(CallOptions.class);
|
||||
when(mockRealTransport.newStream(
|
||||
any(MethodDescriptor.class), any(Metadata.class), callOptionsArgumentCaptor.capture(),
|
||||
ArgumentMatchers.<ClientStreamTracer[]>any()))
|
||||
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any()))
|
||||
.thenReturn(mockRealStream);
|
||||
delayedTransport.reprocess(picker);
|
||||
verifyNoMoreInteractions(picker);
|
||||
verifyNoMoreInteractions(transportListener);
|
||||
|
||||
CallOptions callOptions =
|
||||
CallOptions.DEFAULT.withAuthority("authority-override-hosstname-from-calloptions");
|
||||
delayedTransport.newStream(method, headers, callOptions, tracers);
|
||||
assertThat(callOptionsArgumentCaptor.getValue().getAuthority()).isEqualTo(
|
||||
"authority-override-hosstname-from-calloptions");
|
||||
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers);
|
||||
assertThat(stream).isSameInstanceAs(mockRealStream);
|
||||
stream.setAuthority("authority-override-from-calloptions");
|
||||
stream.start(mock(ClientStreamListener.class));
|
||||
|
||||
// Must be set before start(), and may be overwritten
|
||||
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
|
||||
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
|
||||
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue