From c4a36e4a67a73dab17be4b99f40fcdd5796670b7 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 19 Jun 2026 17:51:56 +0200 Subject: [PATCH] Fix wrong exception propagation on connection errors and channel binding failure See #1672, #1674, #1675 Inflight commands should not fail with ClosedConnectionException but with the actual error, e.g. a PgException reporting SQLSTATE 57P01 when pg_terminate_backend kills a connection mid-flight. When channel binding is required but SSL is not available, the PostgreSQL protocol recommends immediately closing the connection. The ChannelBindingException is now correctly reported to the caller rather than being masked by a subsequent ClosedConnectionException. Some portions of this content were created with the assistance of Claude Code. Signed-off-by: Thomas Segismont Fixup Signed-off-by: Thomas Segismont --- .../tests/mssqlclient/MSSQLQueriesTest.java | 21 ++++++++++++++++-- .../impl/codec/InitPgCommandMessage.java | 20 ++++++++++++----- .../tests/pgclient/PgConnectionTest.java | 9 ++++++-- .../java/io/vertx/tests/pgclient/TLSTest.java | 14 ++++-------- .../src/test/java/module-info.java | 1 + .../sqlclient/codec/SocketConnectionBase.java | 22 +++++++++---------- 6 files changed, 55 insertions(+), 32 deletions(-) diff --git a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java index 07b645cab..9e0b4869e 100644 --- a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java +++ b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java @@ -40,8 +40,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; @RunWith(VertxUnitRunner.class) @@ -206,4 +205,22 @@ public void testQuerySequences(TestContext ctx) { })); })); } + + @Test + public void testUnsupportedXmlTypeErrorPropagation(TestContext ctx) { + connection + .query("CREATE TABLE #TempXmlTest (id INT, xmlData XML)") + .execute() + .onComplete(ctx.asyncAssertSuccess(create -> { + connection + .query("SELECT * FROM #TempXmlTest") + .execute() + .onComplete(ctx.asyncAssertFailure(t -> { + ctx.verify(v -> { + assertThat(t, instanceOf(UnsupportedOperationException.class)); + assertThat(t.getMessage(), containsString("XML")); + }); + })); + })); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java index 0126c2554..25d1ff925 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java @@ -16,19 +16,19 @@ */ package io.vertx.pgclient.impl.codec; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - import io.netty.buffer.ByteBuf; import io.vertx.core.VertxException; import io.vertx.pgclient.impl.PgDatabaseMetadata; import io.vertx.pgclient.impl.PgSocketConnection; import io.vertx.pgclient.impl.auth.scram.ScramAuthentication; import io.vertx.pgclient.impl.auth.scram.ScramSession; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.codec.CommandResponse; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.protocol.InitCommand; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + class InitPgCommandMessage extends PgCommandMessage { private PgEncoder encoder; @@ -66,9 +66,17 @@ void handleAuthenticationSasl(ByteBuf in) { } PgSocketConnection pgSocketConn = (PgSocketConnection) cmd.connection().unwrap(); scramSession = scramAuth.session(cmd.username(), cmd.password().toCharArray(), pgSocketConn.channelBinding()); - encoder.writeScramClientInitialMessage( + try { + encoder.writeScramClientInitialMessage( scramSession.createInitialSaslMessage(in, encoder.channelHandlerContext())); - encoder.flush(); + encoder.flush(); + } catch (RuntimeException e) { + decoder.fireCommandResponse(CommandResponse.failure(e)); + // If the frontend does not support the authentication method requested by the server, + // then it should immediately close the connection. + // See https://www.postgresql.org/docs/current/protocol-flow.html + encoder.close(); + } } @Override diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java index b7b0043bd..f57a63bd1 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java @@ -20,7 +20,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.pgclient.PgConnection; -import io.vertx.sqlclient.ClosedConnectionException; +import io.vertx.pgclient.PgException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.Tuple; @@ -148,7 +148,12 @@ public void testInflightCommandsFailWhenConnectionClosed(TestContext ctx) { .query("SELECT pg_sleep(20)") .execute() .onComplete(ctx.asyncAssertFailure(t -> { - ctx.assertTrue(t instanceof ClosedConnectionException); + if (t instanceof PgException) { + PgException pge = (PgException) t; + ctx.assertEquals("57P01", pge.getSqlState()); // ADMIN SHUTDOWN + } else { + ctx.fail(t); + } })); connector.accept(ctx.asyncAssertSuccess(conn2 -> { conn2 diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java index 63726a9ee..9c1178979 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java @@ -11,6 +11,7 @@ package io.vertx.tests.pgclient; +import com.ongres.scram.client.ChannelBindingException; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; @@ -20,12 +21,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.pgclient.ChannelBinding; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgConnection; -import io.vertx.pgclient.SslMode; -import io.vertx.pgclient.SslNegotiation; -import io.vertx.sqlclient.ClosedConnectionException; +import io.vertx.pgclient.*; import io.vertx.sqlclient.Tuple; import io.vertx.tests.pgclient.junit.ContainerPgRule; import org.junit.*; @@ -281,14 +277,12 @@ public void testChannelBindingRequireWithSsl(TestContext ctx) { @Test public void testChannelBindingRequireWithoutSsl(TestContext ctx) { Async async = ctx.async(); - PgConnectOptions options = new PgConnectOptions(ruleOptionalSll.options()) - .setSslMode(SslMode.DISABLE) + PgConnectOptions options = new PgConnectOptions(ruleSllOff.options()) .setChannelBinding(ChannelBinding.REQUIRE) .setSslOptions(new ClientSSLOptions().setTrustAll(true)); PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertFailure(err -> { - // ctx.assertEquals("Channel bindins is required", err.getMessage()); - ctx.assertTrue(err instanceof ClosedConnectionException); // TODO: handle ChannelBindingException + ctx.assertTrue(err instanceof ChannelBindingException); async.complete(); })); } diff --git a/vertx-pg-client/src/test/java/module-info.java b/vertx-pg-client/src/test/java/module-info.java index 230ce6b4a..9cb5ec5b2 100644 --- a/vertx-pg-client/src/test/java/module-info.java +++ b/vertx-pg-client/src/test/java/module-info.java @@ -2,6 +2,7 @@ requires io.netty.buffer; requires io.netty.transport; + requires com.ongres.scram.client; requires io.vertx.core; requires io.vertx.sql.client; requires io.vertx.sql.client.pg; diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 85b912b40..e3ee9414a 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -21,28 +21,25 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; -import io.vertx.core.*; +import io.vertx.core.Completable; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.tracing.TracingPolicy; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.sqlclient.ClosedConnectionException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.codec.impl.PreparedStatementCache; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.spi.DatabaseMetadata; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.connection.ConnectionContext; -import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; -import io.vertx.sqlclient.spi.protocol.CommandBase; -import io.vertx.sqlclient.spi.protocol.CompositeCommand; -import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; +import io.vertx.sqlclient.spi.protocol.*; import java.util.ArrayDeque; import java.util.List; @@ -437,11 +434,12 @@ private void handleClose(Throwable t) { if (t != null) { reportException(t); } + Throwable inflightCause = t != null ? t : ClosedConnectionException.INSTANCE; CommandMessage msg; while ((msg = inflights.poll()) != null) { - fail(msg, ClosedConnectionException.INSTANCE); + fail(msg, inflightCause); } - Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); + Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); CommandBase cmd; while ((cmd = pending.poll()) != null) { CommandBase c = cmd;