From 9e67ddd10bebffa946fc3cb30d8781cbc253627c Mon Sep 17 00:00:00 2001 From: becomeStar Date: Sun, 25 Jan 2026 21:43:50 +0900 Subject: [PATCH 1/2] netty: Propagate first handshake failure from buffering handler When a handshake failure occurs before any writes are buffered on the server side, WriteBufferingAndExceptionHandler can record the failure internally but never surface it to downstream inbound handlers. This makes the original handshake error unobservable and complicates debugging and instrumentation. Propagate only the first failure via exceptionCaught, gated on the absence of a previous failure, so that the canonical error becomes observable while avoiding duplicate propagation and preserving existing close semantics. --- .../WriteBufferingAndExceptionHandler.java | 2 + ...WriteBufferingAndExceptionHandlerTest.java | 61 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java index 2799dfccb61..56ba9fd3214 100644 --- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java @@ -99,6 +99,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write. // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect. if (ctx.channel().isActive() && previousFailure == null) { + ctx.fireExceptionCaught(cause); + final class LogOnFailure implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture future) { diff --git a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java index b99a9386fcf..9c2022cfc14 100644 --- a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java @@ -31,8 +31,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultEventLoop; @@ -41,6 +44,7 @@ import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; import java.net.ConnectException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -381,4 +385,61 @@ public void uncaughtReadFails() throws Exception { assertThat(status.getDescription()).contains("channelRead() missed"); } } + + @Test + public void handshakeFailure_isPropagatedOnce() throws Exception { + AtomicInteger exceptionCount = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + + ChannelHandler observer = + new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + exceptionCount.incrementAndGet(); + latch.countDown(); + } + }; + + WriteBufferingAndExceptionHandler handler = + new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {}); + + LocalAddress addr = new LocalAddress("local"); + + ChannelFuture cf = + new Bootstrap() + .channel(LocalChannel.class) + .group(group) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(handler); + ch.pipeline().addLast(observer); + } + }) + .register(); + + chan = cf.channel(); + cf.sync(); + + ChannelFuture sf = + new ServerBootstrap() + .group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInboundHandlerAdapter() {}) + .bind(addr); + server = sf.channel(); + sf.sync(); + + chan.connect(addr).sync(); + + RuntimeException handshakeFailure = + Status.UNAVAILABLE.withDescription("handshake failed").asRuntimeException(); + + chan.pipeline().fireExceptionCaught(handshakeFailure); + chan.pipeline().fireExceptionCaught(new RuntimeException("Second")); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertThat(exceptionCount.get()).isEqualTo(1); + } } From e32bfcac66cdee8e82e11d4c9417b02f0d1fe290 Mon Sep 17 00:00:00 2001 From: becomeStar Date: Fri, 20 Feb 2026 00:36:37 +0900 Subject: [PATCH 2/2] netty: Log early handshake failure cause on server Add a server-side NOOP write path to surface early negotiation failures through notifyTerminated(). When negotiation fails before NettyServerHandler becomes active in the pipeline, connectionError() can remain null and the transport may log an unhelpful termination reason. This preserves the original failure cause in transport logs and adds coverage for the server NOOP write path. --- .../io/grpc/netty/NettyServerHandler.java | 8 +++ .../io/grpc/netty/NettyServerTransport.java | 12 ++++ .../WriteBufferingAndExceptionHandler.java | 2 - .../io/grpc/netty/NettyServerHandlerTest.java | 19 ++++++ .../grpc/netty/ProtocolNegotiatorsTest.java | 37 +++++++++++ ...WriteBufferingAndExceptionHandlerTest.java | 61 ------------------- 6 files changed, 76 insertions(+), 63 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..416fdd9be5c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -124,6 +124,12 @@ class NettyServerHandler extends AbstractNettyHandler { private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean( System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false")); + /** + * A message that simply passes through the channel without any real processing. It is useful to + * check if buffers have been drained and test the health of the channel in a single operation. + */ + static final Object NOOP_MESSAGE = new Object(); + private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; @@ -709,6 +715,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise); } else if (msg instanceof ForcefulCloseCommand) { forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); + } else if (msg == NOOP_MESSAGE) { + ctx.write(Unpooled.EMPTY_BUFFER, promise); } else { AssertionError e = new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..6651b42ee66 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -160,6 +160,18 @@ public void operationComplete(ChannelFuture future) throws Exception { channel.closeFuture().addListener(terminationNotifier); channel.pipeline().addLast(bufferingHandler); + + channel.writeAndFlush(NettyServerHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // grpcHandler (NettyServerHandler) may not be in the pipeline yet on early negotiation + // failure, so connectionError() can remain null. Notify termination here with the write + // failure cause to preserve/log the original transport termination reason. + notifyTerminated(future.cause()); + } + } + }); } @Override diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java index 56ba9fd3214..2799dfccb61 100644 --- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java @@ -99,8 +99,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write. // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect. if (ctx.channel().isActive() && previousFailure == null) { - ctx.fireExceptionCaught(cause); - final class LogOnFailure implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture future) { diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..7f958a2845c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1347,6 +1347,25 @@ public void maxRstCountSent_exceedsLimit_fails() throws Exception { assertFalse(channel().isOpen()); } + @Test + public void write_noopMessage_writesEmptyBuffer() throws Exception { + manualSetUp(); + + ChannelPromise promise = newPromise(); + handler().write(ctx(), NettyServerHandler.NOOP_MESSAGE, promise); + channel().flush(); + + assertTrue(promise.isSuccess()); + + Object outbound = channel().readOutbound(); + assertTrue(outbound instanceof ByteBuf); + ByteBuf buf = (ByteBuf) outbound; + assertEquals(0, buf.readableBytes()); + buf.release(); + + assertNull(channel().readOutbound()); + } + private void madeYouReset(int burstSize) throws Exception { when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenAnswer((args) -> new TestServerStreamTracer()); diff --git a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java index 80438532172..90ca8f94c6c 100644 --- a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java +++ b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java @@ -133,6 +133,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Filter; +import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; @@ -472,6 +473,42 @@ public void from_tls_clientAuthRequire_noClientCert() throws Exception { } } + @Test + public void from_plaintextClient_toTlsServer_logsTransportFailureCause() throws Exception { + final AtomicReference transportFailureLog = new AtomicReference<>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + if ("Transport failed".equals(record.getMessage()) && record.getThrown() != null) { + transportFailureLog.compareAndSet(null, record); + } + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; + Logger logger = + Logger.getLogger(String.format("%s.connections", NettyServerTransport.class.getName())); + Level oldLevel = logger.getLevel(); + try { + logger.addHandler(handler); + logger.setLevel(Level.ALL); + + ServerCredentials serverCreds = TlsServerCredentials.create(server1Cert, server1Key); + ChannelCredentials channelCreds = InsecureChannelCredentials.create(); + Status status = expectFailedHandshake(channelCreds, serverCreds); + + assertEquals(Status.Code.UNAVAILABLE, status.getCode()); + assertThat(transportFailureLog.get()).isNotNull(); + } finally { + logger.removeHandler(handler); + logger.setLevel(oldLevel); + } + } + @Test public void from_tls_clientAuthRequire_clientCert() throws Exception { ServerCredentials serverCreds = TlsServerCredentials.newBuilder() diff --git a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java index 9c2022cfc14..b99a9386fcf 100644 --- a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java @@ -31,11 +31,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultEventLoop; @@ -44,7 +41,6 @@ import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; import java.net.ConnectException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -385,61 +381,4 @@ public void uncaughtReadFails() throws Exception { assertThat(status.getDescription()).contains("channelRead() missed"); } } - - @Test - public void handshakeFailure_isPropagatedOnce() throws Exception { - AtomicInteger exceptionCount = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(1); - - ChannelHandler observer = - new ChannelInboundHandlerAdapter() { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - exceptionCount.incrementAndGet(); - latch.countDown(); - } - }; - - WriteBufferingAndExceptionHandler handler = - new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {}); - - LocalAddress addr = new LocalAddress("local"); - - ChannelFuture cf = - new Bootstrap() - .channel(LocalChannel.class) - .group(group) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) { - ch.pipeline().addLast(handler); - ch.pipeline().addLast(observer); - } - }) - .register(); - - chan = cf.channel(); - cf.sync(); - - ChannelFuture sf = - new ServerBootstrap() - .group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInboundHandlerAdapter() {}) - .bind(addr); - server = sf.channel(); - sf.sync(); - - chan.connect(addr).sync(); - - RuntimeException handshakeFailure = - Status.UNAVAILABLE.withDescription("handshake failed").asRuntimeException(); - - chan.pipeline().fireExceptionCaught(handshakeFailure); - chan.pipeline().fireExceptionCaught(new RuntimeException("Second")); - - assertTrue(latch.await(5, TimeUnit.SECONDS)); - assertThat(exceptionCount.get()).isEqualTo(1); - } }