diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index b635a0882cb2..4cdc94ed1dea 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -68,7 +68,12 @@ public R run(Action action) throws E, InterruptedException { client = ensureActiveClient(client); return action.run(client); } finally { + // Return client to the deque, then check if close() raced us. + // The deque's lock ensures either drainTo or remove sees the client. clients.addFirst(client); + if (this.clients == null && clients.remove(client)) { + close(client); + } } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/CachedJdbcClientPool.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/CachedJdbcClientPool.java new file mode 100644 index 000000000000..e7ae921cb220 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/CachedJdbcClientPool.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.options.Options; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.paimon.options.CatalogOptions.CLIENT_POOL_SIZE; +import static org.apache.paimon.options.CatalogOptions.URI; + +/** + * A cache that shares {@link JdbcClientPool} instances across multiple catalog instances in the + * same JVM. This prevents each Flink operator from creating its own connection pool when using the + * JDBC catalog. + * + *

The cache is keyed by JDBC URI and catalog key. Pools live for the lifetime of the JVM and are + * closed via a shutdown hook. + */ +public class CachedJdbcClientPool { + + private static final ConcurrentMap CLIENT_POOLS = + new ConcurrentHashMap<>(); + + static { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + for (JdbcClientPool pool : CLIENT_POOLS.values()) { + pool.close(); + } + CLIENT_POOLS.clear(); + }, + "jdbc-client-pool-shutdown")); + } + + private final Key key; + private final int poolSize; + private final String dbUrl; + private final Map props; + + public CachedJdbcClientPool(Options options, Map props) { + this.dbUrl = options.get(URI); + this.poolSize = options.get(CLIENT_POOL_SIZE); + this.props = props; + this.key = Key.of(dbUrl, options.get(JdbcCatalogOptions.CATALOG_KEY)); + } + + /** Returns the shared {@link JdbcClientPool} for this cache key, creating one if needed. */ + public JdbcClientPool get() { + return CLIENT_POOLS.computeIfAbsent(key, k -> new JdbcClientPool(poolSize, dbUrl, props)); + } + + @VisibleForTesting + static ConcurrentMap clientPools() { + return CLIENT_POOLS; + } + + @VisibleForTesting + static void resetCache() { + for (JdbcClientPool pool : CLIENT_POOLS.values()) { + pool.close(); + } + CLIENT_POOLS.clear(); + } + + static class Key { + private final String uri; + private final String catalogKey; + + private Key(String uri, String catalogKey) { + this.uri = uri; + this.catalogKey = catalogKey; + } + + static Key of(String uri, String catalogKey) { + return new Key(uri, catalogKey); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Key that = (Key) o; + return Objects.equals(uri, that.uri) && Objects.equals(catalogKey, that.catalogKey); + } + + @Override + public int hashCode() { + return Objects.hash(uri, catalogKey); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..b07d3395fa52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -98,11 +98,7 @@ protected JdbcCatalog( this.options = context.options(); this.warehouse = warehouse; Preconditions.checkNotNull(options, "Invalid catalog properties: null"); - this.connections = - new JdbcClientPool( - options.get(CatalogOptions.CLIENT_POOL_SIZE), - options.get(CatalogOptions.URI.key()), - options.toMap()); + this.connections = new CachedJdbcClientPool(options, options.toMap()).get(); try { initializeCatalogTablesIfNeed(); } catch (SQLException e) { @@ -569,7 +565,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException { @Override public void close() throws Exception { - connections.close(); + // Do not close the connection pool here — it is shared across catalog instances + // via CachedJdbcClientPool and will be evicted/closed by the cache when idle. } private boolean syncTableProperties() { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java index b109f271d1a8..faac4da63e76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -19,7 +19,6 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.CatalogLockContext; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; /** Jdbc lock context. */ @@ -41,11 +40,7 @@ public Options options() { public JdbcClientPool connections() { if (connections == null) { - connections = - new JdbcClientPool( - options.get(CatalogOptions.CLIENT_POOL_SIZE), - options.get(CatalogOptions.URI.key()), - options.toMap()); + connections = new CachedJdbcClientPool(options, options.toMap()).get(); } return connections; } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/CachedJdbcClientPoolTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/CachedJdbcClientPoolTest.java new file mode 100644 index 000000000000..09a11b187f09 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/CachedJdbcClientPoolTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CachedJdbcClientPool}. */ +public class CachedJdbcClientPoolTest { + + @AfterEach + void tearDown() { + CachedJdbcClientPool.resetCache(); + } + + @Test + void testSameKeyReturnsSamePool() { + String uri = sqliteUri(); + Options options = createOptions(uri, "my-catalog"); + + CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options, options.toMap()); + CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options, options.toMap()); + + assertThat(cache1.get()).isSameAs(cache2.get()); + } + + @Test + void testDifferentUriReturnsDifferentPool() { + Options options1 = createOptions(sqliteUri(), "my-catalog"); + Options options2 = createOptions(sqliteUri(), "my-catalog"); + + CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap()); + CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap()); + + assertThat(cache1.get()).isNotSameAs(cache2.get()); + } + + @Test + void testDifferentCatalogKeyReturnsDifferentPool() { + String uri = sqliteUri(); + Options options1 = createOptions(uri, "catalog-a"); + Options options2 = createOptions(uri, "catalog-b"); + + CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap()); + CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap()); + + assertThat(cache1.get()).isNotSameAs(cache2.get()); + } + + @Test + void testPoolIsUsable() throws SQLException, InterruptedException { + Options options = createOptions(sqliteUri(), "test-catalog"); + CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap()); + + JdbcClientPool pool = cache.get(); + Boolean result = pool.run(conn -> !conn.isClosed()); + + assertThat(result).isTrue(); + } + + @Test + void testMultipleCatalogInstancesSharePool() { + String uri = sqliteUri(); + Options options = createOptions(uri, "shared-catalog"); + + JdbcCatalog catalog1 = + new JdbcCatalog( + new org.apache.paimon.fs.local.LocalFileIO(), + "shared-catalog", + org.apache.paimon.catalog.CatalogContext.create(options), + "/tmp/warehouse1"); + JdbcCatalog catalog2 = + new JdbcCatalog( + new org.apache.paimon.fs.local.LocalFileIO(), + "shared-catalog", + org.apache.paimon.catalog.CatalogContext.create(options), + "/tmp/warehouse2"); + + assertThat(catalog1.getConnections()).isSameAs(catalog2.getConnections()); + } + + @Test + void testResetCacheClearsAllPools() { + Options options = createOptions(sqliteUri(), "test-catalog"); + CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap()); + JdbcClientPool pool = cache.get(); + + assertThat(pool).isNotNull(); + assertThat(CachedJdbcClientPool.clientPools()).isNotEmpty(); + + CachedJdbcClientPool.resetCache(); + + assertThat(CachedJdbcClientPool.clientPools()).isEmpty(); + } + + private static Options createOptions(String uri, String catalogKey) { + Options options = new Options(); + options.set(CatalogOptions.URI, uri); + options.set(JdbcCatalogOptions.CATALOG_KEY, catalogKey); + options.set(CatalogOptions.CLIENT_POOL_SIZE, 2); + return options; + } + + private static String sqliteUri() { + return "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + } +}