From 05dee597b22ee9588b943c61037a7529d455e8fe Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Wed, 17 Jun 2026 02:29:13 -0700 Subject: [PATCH 1/2] [core] Fix JDBC connection leak in ClientPool and JdbcCatalog --- .../org/apache/paimon/client/ClientPool.java | 10 +- .../apache/paimon/jdbc/JdbcCatalogLock.java | 2 +- .../paimon/jdbc/JdbcCatalogLockContext.java | 2 +- .../paimon/jdbc/JdbcClientPoolTest.java | 93 +++++++++++++++++++ .../clone/spits/CloneSplitsFunction.java | 13 +++ .../clone/spits/ListCloneSplitsFunction.java | 13 +++ 6 files changed, 130 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index b635a0882cb2..063921d93aaf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -68,7 +68,11 @@ public R run(Action action) throws E, InterruptedException { client = ensureActiveClient(client); return action.run(client); } finally { - clients.addFirst(client); + if (this.clients != null) { + clients.addFirst(client); + } else { + close(client); + } } } } @@ -93,6 +97,10 @@ public void execute(ExecuteAction action) throws E, InterruptedException { protected abstract void close(C client); + public boolean isClosed() { + return this.clients == null; + } + @Override public void close() { LinkedBlockingDeque clients = this.clients; diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 307f92f0a570..437016508582 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -83,7 +83,7 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti @Override public void close() throws IOException { - // Do nothing + connections.close(); } public static long checkMaxSleep(Map conf) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java index b109f271d1a8..bdda478e0a03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -40,7 +40,7 @@ public Options options() { } public JdbcClientPool connections() { - if (connections == null) { + if (connections == null || connections.isClosed()) { connections = new JdbcClientPool( options.get(CatalogOptions.CLIENT_POOL_SIZE), diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java index a96a2170bc9d..5f5a7fa34c25 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java @@ -18,15 +18,23 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; + import org.junit.jupiter.api.Test; import java.sql.Connection; import java.sql.SQLException; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JdbcClientPool} connection validation. */ public class JdbcClientPoolTest { @@ -149,4 +157,89 @@ public void testActionIsExecutedOnValidConnection() throws SQLException, Interru pool.close(); } } + + @Test + public void testIsClosedReportsCorrectly() { + JdbcClientPool pool = createPool(1); + assertThat(pool.isClosed()).isFalse(); + pool.close(); + assertThat(pool.isClosed()).isTrue(); + } + + @Test + public void testConnectionClosedWhenPoolClosedDuringAction() throws Exception { + JdbcClientPool pool = createPool(1); + AtomicReference connRef = new AtomicReference<>(); + CountDownLatch actionStarted = new CountDownLatch(1); + CountDownLatch poolClosed = new CountDownLatch(1); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future future = + executor.submit( + () -> { + try { + pool.run( + connection -> { + connRef.set(connection); + actionStarted.countDown(); + try { + poolClosed.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + } catch (Exception e) { + // expected + } + }); + + actionStarted.await(); + pool.close(); + poolClosed.countDown(); + future.get(); + + assertThat(connRef.get().isClosed()).isTrue(); + } finally { + executor.shutdown(); + } + } + + @Test + public void testPoolThrowsAfterClose() { + JdbcClientPool pool = createPool(1); + pool.close(); + + assertThatThrownBy( + () -> + pool.run( + connection -> { + return null; + })) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("closed pool"); + } + + @Test + public void testLockContextRecreatesPoolAfterClose() { + String dbUrl = + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + Options options = new Options(); + options.set(CatalogOptions.CLIENT_POOL_SIZE, 1); + options.set(CatalogOptions.URI.key(), dbUrl); + + JdbcCatalogLockContext context = new JdbcCatalogLockContext("test-catalog", options); + + JdbcClientPool firstPool = context.connections(); + assertThat(firstPool.isClosed()).isFalse(); + + firstPool.close(); + assertThat(firstPool.isClosed()).isTrue(); + + JdbcClientPool secondPool = context.connections(); + assertThat(secondPool).isNotSameAs(firstPool); + assertThat(secondPool.isClosed()).isFalse(); + secondPool.close(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java index 4b5bec097df6..3ddf458f0825 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java @@ -66,6 +66,19 @@ public void open(Configuration conf) throws Exception { this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig)); } + @Override + public void close() throws Exception { + super.close(); + if (sourceCatalog != null) { + sourceCatalog.close(); + sourceCatalog = null; + } + if (targetCatalog != null) { + targetCatalog.close(); + targetCatalog = null; + } + } + @Override public void processElement( CloneSplitInfo cloneSplitInfo, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java index f364c4380004..31d2fb202893 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java @@ -77,6 +77,19 @@ public void open(Configuration conf) throws Exception { this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig)); } + @Override + public void close() throws Exception { + super.close(); + if (sourceCatalog != null) { + sourceCatalog.close(); + sourceCatalog = null; + } + if (targetCatalog != null) { + targetCatalog.close(); + targetCatalog = null; + } + } + @Override public void processElement( CloneSchemaInfo cloneSchemaInfo, From 7e6c0b5d92d97360096212d01520423dc31b4760 Mon Sep 17 00:00:00 2001 From: Muhammad Junaid Muzammil <4795269+junmuz@users.noreply.github.com> Date: Mon, 22 Jun 2026 02:48:16 -0700 Subject: [PATCH 2/2] [core] Fix race condition in ClientPool return-to-pool path Simplify the finally block to always add the client back to the deque first, then check if close() raced. The deque's internal lock ensures mutual exclusion between drainTo and remove, preventing both leaks and double-closes. --- .../src/main/java/org/apache/paimon/client/ClientPool.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index 063921d93aaf..40a2689bcee4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -68,9 +68,10 @@ public R run(Action action) throws E, InterruptedException { client = ensureActiveClient(client); return action.run(client); } finally { - if (this.clients != null) { - clients.addFirst(client); - } else { + // Return client to the deque, then check if close() raced us. + // The deque's lock ensures either drainTo or remove sees the client. + clients.addFirst(client); + if (this.clients == null && clients.remove(client)) { close(client); } }