diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index b635a0882cb2..40a2689bcee4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -68,7 +68,12 @@ public R run(Action action) throws E, InterruptedException { client = ensureActiveClient(client); return action.run(client); } finally { + // Return client to the deque, then check if close() raced us. + // The deque's lock ensures either drainTo or remove sees the client. clients.addFirst(client); + if (this.clients == null && clients.remove(client)) { + close(client); + } } } } @@ -93,6 +98,10 @@ public void execute(ExecuteAction action) throws E, InterruptedException { protected abstract void close(C client); + public boolean isClosed() { + return this.clients == null; + } + @Override public void close() { LinkedBlockingDeque clients = this.clients; diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 307f92f0a570..437016508582 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -83,7 +83,7 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti @Override public void close() throws IOException { - // Do nothing + connections.close(); } public static long checkMaxSleep(Map conf) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java index b109f271d1a8..bdda478e0a03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -40,7 +40,7 @@ public Options options() { } public JdbcClientPool connections() { - if (connections == null) { + if (connections == null || connections.isClosed()) { connections = new JdbcClientPool( options.get(CatalogOptions.CLIENT_POOL_SIZE), diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java index a96a2170bc9d..5f5a7fa34c25 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java @@ -18,15 +18,23 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; + import org.junit.jupiter.api.Test; import java.sql.Connection; import java.sql.SQLException; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JdbcClientPool} connection validation. */ public class JdbcClientPoolTest { @@ -149,4 +157,89 @@ public void testActionIsExecutedOnValidConnection() throws SQLException, Interru pool.close(); } } + + @Test + public void testIsClosedReportsCorrectly() { + JdbcClientPool pool = createPool(1); + assertThat(pool.isClosed()).isFalse(); + pool.close(); + assertThat(pool.isClosed()).isTrue(); + } + + @Test + public void testConnectionClosedWhenPoolClosedDuringAction() throws Exception { + JdbcClientPool pool = createPool(1); + AtomicReference connRef = new AtomicReference<>(); + CountDownLatch actionStarted = new CountDownLatch(1); + CountDownLatch poolClosed = new CountDownLatch(1); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future future = + executor.submit( + () -> { + try { + pool.run( + connection -> { + connRef.set(connection); + actionStarted.countDown(); + try { + poolClosed.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + } catch (Exception e) { + // expected + } + }); + + actionStarted.await(); + pool.close(); + poolClosed.countDown(); + future.get(); + + assertThat(connRef.get().isClosed()).isTrue(); + } finally { + executor.shutdown(); + } + } + + @Test + public void testPoolThrowsAfterClose() { + JdbcClientPool pool = createPool(1); + pool.close(); + + assertThatThrownBy( + () -> + pool.run( + connection -> { + return null; + })) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed pool"); + } + + @Test + public void testLockContextRecreatesPoolAfterClose() { + String dbUrl = + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + Options options = new Options(); + options.set(CatalogOptions.CLIENT_POOL_SIZE, 1); + options.set(CatalogOptions.URI.key(), dbUrl); + + JdbcCatalogLockContext context = new JdbcCatalogLockContext("test-catalog", options); + + JdbcClientPool firstPool = context.connections(); + assertThat(firstPool.isClosed()).isFalse(); + + firstPool.close(); + assertThat(firstPool.isClosed()).isTrue(); + + JdbcClientPool secondPool = context.connections(); + assertThat(secondPool).isNotSameAs(firstPool); + assertThat(secondPool.isClosed()).isFalse(); + secondPool.close(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java index 4b5bec097df6..3ddf458f0825 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java @@ -66,6 +66,19 @@ public void open(Configuration conf) throws Exception { this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig)); } + @Override + public void close() throws Exception { + super.close(); + if (sourceCatalog != null) { + sourceCatalog.close(); + sourceCatalog = null; + } + if (targetCatalog != null) { + targetCatalog.close(); + targetCatalog = null; + } + } + @Override public void processElement( CloneSplitInfo cloneSplitInfo, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java index f364c4380004..31d2fb202893 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java @@ -77,6 +77,19 @@ public void open(Configuration conf) throws Exception { this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig)); } + @Override + public void close() throws Exception { + super.close(); + if (sourceCatalog != null) { + sourceCatalog.close(); + sourceCatalog = null; + } + if (targetCatalog != null) { + targetCatalog.close(); + targetCatalog = null; + } + } + @Override public void processElement( CloneSchemaInfo cloneSchemaInfo,