diff --git a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java index 07b645cab..9e0b4869e 100644 --- a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java +++ b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java @@ -40,8 +40,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; @RunWith(VertxUnitRunner.class) @@ -206,4 +205,22 @@ public void testQuerySequences(TestContext ctx) { })); })); } + + @Test + public void testUnsupportedXmlTypeErrorPropagation(TestContext ctx) { + connection + .query("CREATE TABLE #TempXmlTest (id INT, xmlData XML)") + .execute() + .onComplete(ctx.asyncAssertSuccess(create -> { + connection + .query("SELECT * FROM #TempXmlTest") + .execute() + .onComplete(ctx.asyncAssertFailure(t -> { + ctx.verify(v -> { + assertThat(t, instanceOf(UnsupportedOperationException.class)); + assertThat(t.getMessage(), containsString("XML")); + }); + })); + })); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java index 0126c2554..25d1ff925 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java @@ -16,19 +16,19 @@ */ package io.vertx.pgclient.impl.codec; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - import io.netty.buffer.ByteBuf; import io.vertx.core.VertxException; import io.vertx.pgclient.impl.PgDatabaseMetadata; import io.vertx.pgclient.impl.PgSocketConnection; import io.vertx.pgclient.impl.auth.scram.ScramAuthentication; import io.vertx.pgclient.impl.auth.scram.ScramSession; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.codec.CommandResponse; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.protocol.InitCommand; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + class InitPgCommandMessage extends PgCommandMessage { private PgEncoder encoder; @@ -66,9 +66,17 @@ void handleAuthenticationSasl(ByteBuf in) { } PgSocketConnection pgSocketConn = (PgSocketConnection) cmd.connection().unwrap(); scramSession = scramAuth.session(cmd.username(), cmd.password().toCharArray(), pgSocketConn.channelBinding()); - encoder.writeScramClientInitialMessage( + try { + encoder.writeScramClientInitialMessage( scramSession.createInitialSaslMessage(in, encoder.channelHandlerContext())); - encoder.flush(); + encoder.flush(); + } catch (RuntimeException e) { + decoder.fireCommandResponse(CommandResponse.failure(e)); + // If the frontend does not support the authentication method requested by the server, + // then it should immediately close the connection. + // See https://www.postgresql.org/docs/current/protocol-flow.html + encoder.close(); + } } @Override diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java index b7b0043bd..f57a63bd1 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java @@ -20,7 +20,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.pgclient.PgConnection; -import io.vertx.sqlclient.ClosedConnectionException; +import io.vertx.pgclient.PgException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.Tuple; @@ -148,7 +148,12 @@ public void testInflightCommandsFailWhenConnectionClosed(TestContext ctx) { .query("SELECT pg_sleep(20)") .execute() .onComplete(ctx.asyncAssertFailure(t -> { - ctx.assertTrue(t instanceof ClosedConnectionException); + if (t instanceof PgException) { + PgException pge = (PgException) t; + ctx.assertEquals("57P01", pge.getSqlState()); // ADMIN SHUTDOWN + } else { + ctx.fail(t); + } })); connector.accept(ctx.asyncAssertSuccess(conn2 -> { conn2 diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java index 63726a9ee..9c1178979 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/TLSTest.java @@ -11,6 +11,7 @@ package io.vertx.tests.pgclient; +import com.ongres.scram.client.ChannelBindingException; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; @@ -20,12 +21,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.pgclient.ChannelBinding; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgConnection; -import io.vertx.pgclient.SslMode; -import io.vertx.pgclient.SslNegotiation; -import io.vertx.sqlclient.ClosedConnectionException; +import io.vertx.pgclient.*; import io.vertx.sqlclient.Tuple; import io.vertx.tests.pgclient.junit.ContainerPgRule; import org.junit.*; @@ -281,14 +277,12 @@ public void testChannelBindingRequireWithSsl(TestContext ctx) { @Test public void testChannelBindingRequireWithoutSsl(TestContext ctx) { Async async = ctx.async(); - PgConnectOptions options = new PgConnectOptions(ruleOptionalSll.options()) - .setSslMode(SslMode.DISABLE) + PgConnectOptions options = new PgConnectOptions(ruleSllOff.options()) .setChannelBinding(ChannelBinding.REQUIRE) .setSslOptions(new ClientSSLOptions().setTrustAll(true)); PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertFailure(err -> { - // ctx.assertEquals("Channel bindins is required", err.getMessage()); - ctx.assertTrue(err instanceof ClosedConnectionException); // TODO: handle ChannelBindingException + ctx.assertTrue(err instanceof ChannelBindingException); async.complete(); })); } diff --git a/vertx-pg-client/src/test/java/module-info.java b/vertx-pg-client/src/test/java/module-info.java index 230ce6b4a..9cb5ec5b2 100644 --- a/vertx-pg-client/src/test/java/module-info.java +++ b/vertx-pg-client/src/test/java/module-info.java @@ -2,6 +2,7 @@ requires io.netty.buffer; requires io.netty.transport; + requires com.ongres.scram.client; requires io.vertx.core; requires io.vertx.sql.client; requires io.vertx.sql.client.pg; diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 85b912b40..e3ee9414a 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -21,28 +21,25 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; -import io.vertx.core.*; +import io.vertx.core.Completable; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.tracing.TracingPolicy; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.sqlclient.ClosedConnectionException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.codec.impl.PreparedStatementCache; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.spi.DatabaseMetadata; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.connection.ConnectionContext; -import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; -import io.vertx.sqlclient.spi.protocol.CommandBase; -import io.vertx.sqlclient.spi.protocol.CompositeCommand; -import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; +import io.vertx.sqlclient.spi.protocol.*; import java.util.ArrayDeque; import java.util.List; @@ -437,11 +434,12 @@ private void handleClose(Throwable t) { if (t != null) { reportException(t); } + Throwable inflightCause = t != null ? t : ClosedConnectionException.INSTANCE; CommandMessage msg; while ((msg = inflights.poll()) != null) { - fail(msg, ClosedConnectionException.INSTANCE); + fail(msg, inflightCause); } - Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); + Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); CommandBase cmd; while ((cmd = pending.poll()) != null) { CommandBase c = cmd;