Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
client = ensureActiveClient(client);
return action.run(client);
} finally {
// Return client to the deque, then check if close() raced us.
// The deque's lock ensures either drainTo or remove sees the client.
clients.addFirst(client);
if (this.clients == null && clients.remove(client)) {
close(client);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.paimon.options.CatalogOptions.CLIENT_POOL_SIZE;
import static org.apache.paimon.options.CatalogOptions.URI;

/**
* A cache that shares {@link JdbcClientPool} instances across multiple catalog instances in the
* same JVM. This prevents each Flink operator from creating its own connection pool when using the
* JDBC catalog.
*
* <p>The cache is keyed by JDBC URI and catalog key. Pools live for the lifetime of the JVM and are
* closed via a shutdown hook.
*/
public class CachedJdbcClientPool {

private static final ConcurrentMap<Key, JdbcClientPool> CLIENT_POOLS =
new ConcurrentHashMap<>();

static {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
for (JdbcClientPool pool : CLIENT_POOLS.values()) {
pool.close();
}
CLIENT_POOLS.clear();
},
"jdbc-client-pool-shutdown"));
}

private final Key key;
private final int poolSize;
private final String dbUrl;
private final Map<String, String> props;

public CachedJdbcClientPool(Options options, Map<String, String> props) {
this.dbUrl = options.get(URI);
this.poolSize = options.get(CLIENT_POOL_SIZE);
this.props = props;
this.key = Key.of(dbUrl, options.get(JdbcCatalogOptions.CATALOG_KEY));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This key does not include the JDBC connection properties used by JdbcClientPool (props is later passed to JdbcUtils.extractJdbcConfiguration, including jdbc.user / jdbc.password). If two catalogs in the same JVM use the same URI and catalog-key but different credentials, the second catalog will reuse the pool created by the first one and run with the wrong database user. Please include the effective JDBC connection properties in the cache key (and add a regression test with the same URI/catalog-key but different jdbc.user or jdbc.password).

}

/** Returns the shared {@link JdbcClientPool} for this cache key, creating one if needed. */
public JdbcClientPool get() {
return CLIENT_POOLS.computeIfAbsent(key, k -> new JdbcClientPool(poolSize, dbUrl, props));
}

@VisibleForTesting
static ConcurrentMap<Key, JdbcClientPool> clientPools() {
return CLIENT_POOLS;
}

@VisibleForTesting
static void resetCache() {
for (JdbcClientPool pool : CLIENT_POOLS.values()) {
pool.close();
}
CLIENT_POOLS.clear();
}

static class Key {
private final String uri;
private final String catalogKey;

private Key(String uri, String catalogKey) {
this.uri = uri;
this.catalogKey = catalogKey;
}

static Key of(String uri, String catalogKey) {
return new Key(uri, catalogKey);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key that = (Key) o;
return Objects.equals(uri, that.uri) && Objects.equals(catalogKey, that.catalogKey);
}

@Override
public int hashCode() {
return Objects.hash(uri, catalogKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ protected JdbcCatalog(
this.options = context.options();
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
this.connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
this.connections = new CachedJdbcClientPool(options, options.toMap()).get();
try {
initializeCatalogTablesIfNeed();
} catch (SQLException e) {
Expand Down Expand Up @@ -569,7 +565,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException {

@Override
public void close() throws Exception {
connections.close();
// Do not close the connection pool here — it is shared across catalog instances
// via CachedJdbcClientPool and will be evicted/closed by the cache when idle.
}

private boolean syncTableProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Jdbc lock context. */
Expand All @@ -41,11 +40,7 @@ public Options options() {

public JdbcClientPool connections() {
if (connections == null) {
connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
connections = new CachedJdbcClientPool(options, options.toMap()).get();
}
return connections;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CachedJdbcClientPool}. */
public class CachedJdbcClientPoolTest {

@AfterEach
void tearDown() {
CachedJdbcClientPool.resetCache();
}

@Test
void testSameKeyReturnsSamePool() {
String uri = sqliteUri();
Options options = createOptions(uri, "my-catalog");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options, options.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options, options.toMap());

assertThat(cache1.get()).isSameAs(cache2.get());
}

@Test
void testDifferentUriReturnsDifferentPool() {
Options options1 = createOptions(sqliteUri(), "my-catalog");
Options options2 = createOptions(sqliteUri(), "my-catalog");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testDifferentCatalogKeyReturnsDifferentPool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "catalog-a");
Options options2 = createOptions(uri, "catalog-b");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testPoolIsUsable() throws SQLException, InterruptedException {
Options options = createOptions(sqliteUri(), "test-catalog");
CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap());

JdbcClientPool pool = cache.get();
Boolean result = pool.run(conn -> !conn.isClosed());

assertThat(result).isTrue();
}

@Test
void testMultipleCatalogInstancesSharePool() {
String uri = sqliteUri();
Options options = createOptions(uri, "shared-catalog");

JdbcCatalog catalog1 =
new JdbcCatalog(
new org.apache.paimon.fs.local.LocalFileIO(),
"shared-catalog",
org.apache.paimon.catalog.CatalogContext.create(options),
"/tmp/warehouse1");
JdbcCatalog catalog2 =
new JdbcCatalog(
new org.apache.paimon.fs.local.LocalFileIO(),
"shared-catalog",
org.apache.paimon.catalog.CatalogContext.create(options),
"/tmp/warehouse2");

assertThat(catalog1.getConnections()).isSameAs(catalog2.getConnections());
}

@Test
void testResetCacheClearsAllPools() {
Options options = createOptions(sqliteUri(), "test-catalog");
CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap());
JdbcClientPool pool = cache.get();

assertThat(pool).isNotNull();
assertThat(CachedJdbcClientPool.clientPools()).isNotEmpty();

CachedJdbcClientPool.resetCache();

assertThat(CachedJdbcClientPool.clientPools()).isEmpty();
}

private static Options createOptions(String uri, String catalogKey) {
Options options = new Options();
options.set(CatalogOptions.URI, uri);
options.set(JdbcCatalogOptions.CATALOG_KEY, catalogKey);
options.set(CatalogOptions.CLIENT_POOL_SIZE, 2);
return options;
}

private static String sqliteUri() {
return "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "");
}
}