diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..416fdd9be5c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -124,6 +124,12 @@ class NettyServerHandler extends AbstractNettyHandler { private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean( System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false")); + /** + * A message that simply passes through the channel without any real processing. It is useful to + * check if buffers have been drained and test the health of the channel in a single operation. + */ + static final Object NOOP_MESSAGE = new Object(); + private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; @@ -709,6 +715,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise); } else if (msg instanceof ForcefulCloseCommand) { forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); + } else if (msg == NOOP_MESSAGE) { + ctx.write(Unpooled.EMPTY_BUFFER, promise); } else { AssertionError e = new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..6651b42ee66 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -160,6 +160,18 @@ public void operationComplete(ChannelFuture future) throws Exception { channel.closeFuture().addListener(terminationNotifier); channel.pipeline().addLast(bufferingHandler); + + channel.writeAndFlush(NettyServerHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // grpcHandler (NettyServerHandler) may not be in the pipeline yet on early negotiation + // failure, so connectionError() can remain null. Notify termination here with the write + // failure cause to preserve/log the original transport termination reason. + notifyTerminated(future.cause()); + } + } + }); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..7f958a2845c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1347,6 +1347,25 @@ public void maxRstCountSent_exceedsLimit_fails() throws Exception { assertFalse(channel().isOpen()); } + @Test + public void write_noopMessage_writesEmptyBuffer() throws Exception { + manualSetUp(); + + ChannelPromise promise = newPromise(); + handler().write(ctx(), NettyServerHandler.NOOP_MESSAGE, promise); + channel().flush(); + + assertTrue(promise.isSuccess()); + + Object outbound = channel().readOutbound(); + assertTrue(outbound instanceof ByteBuf); + ByteBuf buf = (ByteBuf) outbound; + assertEquals(0, buf.readableBytes()); + buf.release(); + + assertNull(channel().readOutbound()); + } + private void madeYouReset(int burstSize) throws Exception { when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenAnswer((args) -> new TestServerStreamTracer()); diff --git a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java index 80438532172..90ca8f94c6c 100644 --- a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java +++ b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java @@ -133,6 +133,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Filter; +import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; @@ -472,6 +473,42 @@ public void from_tls_clientAuthRequire_noClientCert() throws Exception { } } + @Test + public void from_plaintextClient_toTlsServer_logsTransportFailureCause() throws Exception { + final AtomicReference transportFailureLog = new AtomicReference<>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + if ("Transport failed".equals(record.getMessage()) && record.getThrown() != null) { + transportFailureLog.compareAndSet(null, record); + } + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; + Logger logger = + Logger.getLogger(String.format("%s.connections", NettyServerTransport.class.getName())); + Level oldLevel = logger.getLevel(); + try { + logger.addHandler(handler); + logger.setLevel(Level.ALL); + + ServerCredentials serverCreds = TlsServerCredentials.create(server1Cert, server1Key); + ChannelCredentials channelCreds = InsecureChannelCredentials.create(); + Status status = expectFailedHandshake(channelCreds, serverCreds); + + assertEquals(Status.Code.UNAVAILABLE, status.getCode()); + assertThat(transportFailureLog.get()).isNotNull(); + } finally { + logger.removeHandler(handler); + logger.setLevel(oldLevel); + } + } + @Test public void from_tls_clientAuthRequire_clientCert() throws Exception { ServerCredentials serverCreds = TlsServerCredentials.newBuilder()