From 1d2e4195ceebc44c107a1ac541d1ae4efc2261a4 Mon Sep 17 00:00:00 2001 From: tchivs Date: Tue, 25 Nov 2025 14:07:15 +0800 Subject: [PATCH 1/5] Add view support for JDBC Catalog - Add view methods to CachingCatalog for proper delegation - Add IT tests for JDBC Catalog view operations - Add SQLite JDBC dependency for tests --- .../apache/paimon/catalog/CachingCatalog.java | 36 +++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 286 ++++++++++++++++++ .../org/apache/paimon/jdbc/JdbcUtils.java | 231 ++++++++++++++ .../apache/paimon/jdbc/JdbcCatalogTest.java | 238 ++++++++++++++- paimon-flink/paimon-flink-common/pom.xml | 7 + .../paimon/flink/JdbcCatalogViewITCase.java | 211 +++++++++++++ 6 files changed, 1008 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index afe4ed8ae6de..3a0a6c909a88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -32,6 +32,8 @@ import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -368,6 +370,40 @@ public void invalidateTable(Identifier identifier) { } } + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + wrapped.createView(identifier, view, ignoreIfExists); + } + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + return wrapped.getView(identifier); + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + return wrapped.listViews(databaseName); + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + wrapped.dropView(identifier, ignoreIfNotExists); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + wrapped.renameView(fromView, toView, ignoreIfNotExists); + } + + @Override + public void alterView(Identifier view, List viewChanges, boolean ignoreIfNotExists) + throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { + wrapped.alterView(view, viewChanges, ignoreIfNotExists); + } + // ================================== Cache Public API // ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..0106d610b23d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -19,12 +19,14 @@ package org.apache.paimon.jdbc; import org.apache.paimon.CoreOptions; +import org.apache.paimon.PagedList; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; @@ -37,8 +39,13 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.view.ViewSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -58,6 +65,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -164,6 +172,18 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc return conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE).execute(); }); + // Check and create view table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.VIEW_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_VIEW_TABLE).execute(); + }); + // if lock enabled, Check and create distributed lock table. if (lockEnabled()) { JdbcUtils.createDistributedLockTable(connections, options); @@ -240,6 +260,8 @@ protected void dropDatabaseImpl(String name) { catalogKey, name); } + // Delete views from paimon_views. + execute(connections, JdbcUtils.DELETE_VIEWS_SQL, catalogKey, name); } @Override @@ -646,4 +668,268 @@ private List fetch(RowProducer toRow, String sql, String... args) { throw new RuntimeException("Interrupted in SQL query", e); } } + + // ======================= view methods =============================== + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + try { + String viewSchemaJson = + JdbcUtils.getViewSchema( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (viewSchemaJson == null) { + throw new ViewNotExistException(identifier); + } + + ViewSchema viewSchema = JsonSerdeUtil.fromJson(viewSchemaJson, ViewSchema.class); + return new ViewImpl( + identifier, + viewSchema.fields(), + viewSchema.query(), + viewSchema.dialects(), + viewSchema.comment(), + viewSchema.options()); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException("Failed to get view " + identifier.getFullName(), e); + } + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + // Check if database exists + try { + getDatabase(identifier.getDatabaseName()); + } catch (DatabaseNotExistException e) { + throw e; + } + + // Check if view already exists + if (JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new ViewAlreadyExistException(identifier); + } + + // Serialize view schema to JSON + ViewSchema viewSchema = + new ViewSchema( + view.rowType().getFields(), + view.query(), + view.dialects(), + view.comment().orElse(null), + view.options()); + String viewSchemaJson = JsonSerdeUtil.toJson(viewSchema); + + // Insert view + try { + JdbcUtils.insertView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + } catch (RuntimeException e) { + if (e.getMessage() != null && e.getMessage().contains("View already exists")) { + throw new ViewAlreadyExistException(identifier, e); + } + throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); + } + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + // Check if view exists + if (!JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfNotExists) { + return; + } + throw new ViewNotExistException(identifier); + } + + // Delete view + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_VIEW_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + + if (deletedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to drop view %s: affected %d rows", + identifier.getFullName(), deletedRecords)); + } + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + // Check if database exists + if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + + return fetch( + row -> row.getString(JdbcUtils.VIEW_NAME), + JdbcUtils.LIST_VIEWS_SQL, + catalogKey, + databaseName); + } + + @Override + public PagedList listViewsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + CatalogUtils.validateNamePattern(this, viewNamePattern); + return new PagedList<>(listViews(databaseName), null); + } + + @Override + public PagedList listViewDetailsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + PagedList pagedViews = + listViewsPaged(databaseName, maxResults, pageToken, viewNamePattern); + return new PagedList<>( + pagedViews.getElements().stream() + .map( + viewName -> { + try { + return getView(Identifier.create(databaseName, viewName)); + } catch (ViewNotExistException ignored) { + LOG.warn( + "view {}.{} does not exist", + databaseName, + viewName); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + pagedViews.getNextPageToken()); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + // Check if source view exists + if (!JdbcUtils.viewExists( + connections, catalogKey, fromView.getDatabaseName(), fromView.getObjectName())) { + if (ignoreIfNotExists) { + return; + } + throw new ViewNotExistException(fromView); + } + + // Check if target view already exists + if (JdbcUtils.viewExists( + connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + + // Rename view + try { + JdbcUtils.renameView(connections, catalogKey, fromView, toView); + } catch (RuntimeException e) { + if (e.getMessage() != null && e.getMessage().contains("View already exists")) { + throw new ViewAlreadyExistException(toView, e); + } else if (e.getMessage() != null && e.getMessage().contains("View does not exist")) { + throw new ViewNotExistException(fromView, e); + } + throw new RuntimeException( + "Failed to rename view from " + + fromView.getFullName() + + " to " + + toView.getFullName(), + e); + } + } + + @Override + public void alterView( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { + // Get existing view + View existingView; + try { + existingView = getView(identifier); + } catch (ViewNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw e; + } + + // Apply changes + Map newOptions = new HashMap<>(existingView.options()); + String newComment = existingView.comment().orElse(null); + Map newDialects = new HashMap<>(existingView.dialects()); + for (ViewChange change : changes) { + if (change instanceof ViewChange.SetViewOption) { + ViewChange.SetViewOption setOption = (ViewChange.SetViewOption) change; + newOptions.put(setOption.key(), setOption.value()); + } else if (change instanceof ViewChange.RemoveViewOption) { + ViewChange.RemoveViewOption removeOption = (ViewChange.RemoveViewOption) change; + newOptions.remove(removeOption.key()); + } else if (change instanceof ViewChange.UpdateViewComment) { + ViewChange.UpdateViewComment updateComment = (ViewChange.UpdateViewComment) change; + newComment = updateComment.comment(); + } else if (change instanceof ViewChange.AddDialect) { + ViewChange.AddDialect addDialect = (ViewChange.AddDialect) change; + if (newDialects.containsKey(addDialect.dialect())) { + throw new DialectAlreadyExistException(identifier, addDialect.dialect()); + } + newDialects.put(addDialect.dialect(), addDialect.query()); + } else if (change instanceof ViewChange.UpdateDialect) { + ViewChange.UpdateDialect updateDialect = (ViewChange.UpdateDialect) change; + if (!newDialects.containsKey(updateDialect.dialect())) { + throw new DialectNotExistException(identifier, updateDialect.dialect()); + } + newDialects.put(updateDialect.dialect(), updateDialect.query()); + } else if (change instanceof ViewChange.DropDialect) { + ViewChange.DropDialect dropDialect = (ViewChange.DropDialect) change; + if (!newDialects.containsKey(dropDialect.dialect())) { + throw new DialectNotExistException(identifier, dropDialect.dialect()); + } + newDialects.remove(dropDialect.dialect()); + } + } + + // Create updated view schema + ViewSchema updatedSchema = + new ViewSchema( + existingView.rowType().getFields(), + existingView.query(), + newDialects, + newComment, + newOptions); + String viewSchemaJson = JsonSerdeUtil.toJson(updatedSchema); + + // Update view + try { + JdbcUtils.updateView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + } catch (RuntimeException e) { + throw new RuntimeException("Failed to alter view " + identifier.getFullName(), e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..a7571fd62ad0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -331,6 +331,117 @@ public class JdbcUtils { static final String ACQUIRED_AT = "acquired_at"; static final String EXPIRE_TIME = "expire_time_seconds"; + // View table + public static final String VIEW_TABLE_NAME = "paimon_views"; + public static final String VIEW_DATABASE = "database_name"; + public static final String VIEW_NAME = "view_name"; + public static final String VIEW_SCHEMA = "view_schema"; + + static final String CREATE_VIEW_TABLE = + "CREATE TABLE " + + VIEW_TABLE_NAME + + "(" + + CATALOG_KEY + + " VARCHAR(255) NOT NULL," + + VIEW_DATABASE + + " VARCHAR(255) NOT NULL," + + VIEW_NAME + + " VARCHAR(255) NOT NULL," + + VIEW_SCHEMA + + " TEXT NOT NULL," + + " PRIMARY KEY (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ")" + + ")"; + + static final String GET_VIEW_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String LIST_VIEWS_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String INSERT_VIEW_SQL = + "INSERT INTO " + + VIEW_TABLE_NAME + + " (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ", " + + VIEW_SCHEMA + + ") " + + " VALUES (?,?,?,?)"; + + static final String DROP_VIEW_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String DELETE_VIEWS_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String RENAME_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_DATABASE + + " = ? , " + + VIEW_NAME + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String UPDATE_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_SCHEMA + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + public static Properties extractJdbcConfiguration( Map properties, String prefix) { Properties result = new Properties(); @@ -678,4 +789,124 @@ private static String deletePropertiesStatement(Set properties) { return sqlStatement.toString(); } + + /** Check if view exists. */ + public static boolean viewExists( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) { + return exists(connections, JdbcUtils.GET_VIEW_SQL, storeKey, databaseName, viewName); + } + + /** Get view schema JSON. */ + public static String getViewSchema( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtils.GET_VIEW_SQL)) { + sql.setString(1, storeKey); + sql.setString(2, databaseName); + sql.setString(3, viewName); + ResultSet rs = sql.executeQuery(); + if (rs.next()) { + String schema = rs.getString(VIEW_SCHEMA); + rs.close(); + return schema; + } + rs.close(); + return null; + } + }); + } + + /** Insert view. */ + public static void insertView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int insertedRecords = + execute( + err -> { + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null + && err.getMessage().contains("constraint failed"))) { + throw new RuntimeException( + String.format( + "View already exists: %s.%s", + databaseName, viewName)); + } + }, + connections, + JdbcUtils.INSERT_VIEW_SQL, + storeKey, + databaseName, + viewName, + viewSchemaJson); + + if (insertedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to insert view %s.%s: affected %d rows", + databaseName, viewName, insertedRecords)); + } + } + + /** Update view. */ + public static void updateView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int updatedRecords = + execute( + connections, + JdbcUtils.UPDATE_VIEW_SQL, + viewSchemaJson, + storeKey, + databaseName, + viewName); + + if (updatedRecords == 0) { + throw new RuntimeException( + String.format("View does not exist: %s.%s", databaseName, viewName)); + } else if (updatedRecords != 1) { + LOG.warn( + "Update operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } + + /** Rename view. */ + public static void renameView( + JdbcClientPool connections, String storeKey, Identifier fromView, Identifier toView) { + int updatedRecords = + execute( + err -> { + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null + && err.getMessage().contains("constraint failed"))) { + throw new RuntimeException( + String.format("View already exists: %s", toView)); + } + }, + connections, + JdbcUtils.RENAME_VIEW_SQL, + toView.getDatabaseName(), + toView.getObjectName(), + storeKey, + fromView.getDatabaseName(), + fromView.getObjectName()); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", fromView, toView); + } else if (updatedRecords == 0) { + throw new RuntimeException(String.format("View does not exist: %s", fromView)); + } else { + LOG.warn( + "Rename operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index fd3c6fdc5950..77b1b67533ee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -29,7 +29,11 @@ import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -46,6 +50,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -64,7 +73,9 @@ private JdbcCatalog initCatalog(Map props) { Map properties = Maps.newHashMap(); properties.put( CatalogOptions.URI.key(), - "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); @@ -146,6 +157,11 @@ protected boolean supportsReplaceTable() { return false; } + @Override + protected boolean supportsView() { + return true; + } + @Test public void testRepairTableNotExist() throws Exception { String databaseName = "repair_db"; @@ -345,6 +361,34 @@ private Map fetchTableProperties( } } + private String fetchViewSchema(JdbcCatalog jdbcCatalog, String databaseName, String viewName) { + try { + return JdbcUtils.getViewSchema( + jdbcCatalog.getConnections(), + jdbcCatalog.getCatalogKey(), + databaseName, + viewName); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private boolean tableExists(JdbcCatalog jdbcCatalog, String tableName) { + try { + return jdbcCatalog + .getConnections() + .run( + conn -> { + try (ResultSet rs = + conn.getMetaData().getTables(null, null, tableName, null)) { + return rs.next(); + } + }); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Test public void testTablePropertiesSyncOnCreate() throws Exception { JdbcCatalog syncCatalog = initCatalogWithSync(true); @@ -625,4 +669,196 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + @Test + public void testViewTableCreatedOnInitialize() { + assertThat(tableExists((JdbcCatalog) catalog, JdbcUtils.VIEW_TABLE_NAME)).isTrue(); + } + + @Test + public void testCreateViewStoresViewSchemaWithoutResolvingReferencedTable() throws Exception { + String databaseName = "view_schema_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_missing_table"); + View view = createView(identifier); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_missing_table"); + assertThat(viewSchemaJson).contains("\"query\"").contains("\"SELECT * FROM OTHER_TABLE\""); + assertThat(catalog.getView(identifier).query()).isEqualTo("SELECT * FROM OTHER_TABLE"); + } + + @Test + public void testCreateViewStoresCrossDatabaseQueryWithoutResolvingReferencedTable() + throws Exception { + String databaseName = "cross_database_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_other_database"); + String query = "SELECT * FROM other_database.missing_table"; + View baseView = createView(identifier); + View view = + new ViewImpl( + identifier, + baseView.rowType().getFields(), + query, + baseView.dialects(), + baseView.comment().orElse(null), + baseView.options()); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_other_database"); + assertThat(viewSchemaJson).contains("\"query\"").contains(query); + assertThat(catalog.getView(identifier).query()).isEqualTo(query); + } + + @Test + public void testDropDatabaseCleansViewMetadata() throws Exception { + String databaseName = "drop_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + catalog.dropDatabase(databaseName, false, true); + + assertThat( + fetchViewSchema( + (JdbcCatalog) catalog, + identifier.getDatabaseName(), + identifier.getObjectName())) + .isNull(); + } + + @Test + public void testRenameViewAcrossDatabases() throws Exception { + String sourceDatabase = "source_view_db"; + String targetDatabase = "target_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create(targetDatabase, "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createDatabase(targetDatabase, false); + catalog.createView(source, createView(source), false); + + catalog.renameView(source, target, false); + + assertThatThrownBy(() -> catalog.getView(source)) + .isInstanceOf(Catalog.ViewNotExistException.class); + assertThat(catalog.getView(target).fullName()).isEqualTo(target.getFullName()); + assertThat(catalog.listViews(sourceDatabase)).isEmpty(); + assertThat(catalog.listViews(targetDatabase)).containsExactly("view_name"); + } + + @Test + public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { + String databaseName = "concurrent_view_db"; + Identifier identifier = Identifier.create(databaseName, "same_view"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + Callable create = + () -> { + try { + catalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future first = executor.submit(create); + Future second = executor.submit(create); + + assertThat(ImmutableList.of(first.get(), second.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listViews(databaseName)).containsExactly("same_view"); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testAlterView() throws Exception { + Identifier identifier = Identifier.create("alter_view_db", "my_view"); + View view = createView(identifier); + catalog.createDatabase(identifier.getDatabaseName(), false); + + assertDoesNotThrow( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + true)); + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + false)) + .isInstanceOf(Catalog.ViewNotExistException.class); + + catalog.createView(identifier, view, false); + catalog.alterView(identifier, ImmutableList.of(ViewChange.setOption("k", "v")), false); + assertThat(catalog.getView(identifier).options()).containsEntry("k", "v"); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.removeOption("k")), false); + assertThat(catalog.getView(identifier).options()).doesNotContainKey("k"); + + catalog.alterView( + identifier, ImmutableList.of(ViewChange.updateComment("new comment")), false); + assertThat(catalog.getView(identifier).comment()).hasValue("new comment"); + + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.addDialect("flink_1", "SELECT * FROM FLINK_TABLE_1")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_1"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.addDialect( + "flink_1", "SELECT * FROM FLINK_TABLE_1")), + false)) + .isInstanceOf(Catalog.DialectAlreadyExistException.class); + + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect("flink_1", "SELECT * FROM FLINK_TABLE_2")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_2"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect( + "missing", "SELECT * FROM FLINK_TABLE_2")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.dropDialect("flink_1")), false); + assertThat(catalog.getView(identifier).query("flink_1")).isEqualTo(view.query()); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.dropDialect("missing")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + } } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index c0774b1c587a..8818ecc8b8a5 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -238,6 +238,13 @@ under the License. ${project.version} test + + + org.xerial + sqlite-jdbc + 3.44.0.0 + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java new file mode 100644 index 000000000000..a3b1af7aeb5b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.jdbc.JdbcCatalog; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for JDBC Catalog View support. */ +public class JdbcCatalogViewITCase extends CatalogITCaseBase { + + private static final String DATABASE_NAME = "test_db"; + private static final String TABLE_NAME = "test_table"; + + @TempDir java.nio.file.Path tempFile; + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + sql(String.format("CREATE DATABASE %s", DATABASE_NAME)); + sql(String.format("USE %s", DATABASE_NAME)); + sql( + String.format( + "CREATE TABLE %s.%s (id INT, name STRING, amount DOUBLE)", + DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "INSERT INTO %s.%s VALUES (1, 'Alice', 100.0), (2, 'Bob', 200.0), (3, 'Charlie', 150.0)", + DATABASE_NAME, TABLE_NAME)); + } + + @Override + protected Map catalogOptions() { + Map options = new HashMap<>(); + options.put("metastore", "jdbc"); + options.put( + CatalogOptions.URI.key(), + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + // Disable lock for simpler testing + options.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return options; + } + + @Override + protected String getTempDirPath() { + return tempFile.toUri().toString(); + } + + @Test + public void testCreateAndQueryView() { + // Create a view + String viewName = "sales_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT name, amount FROM %s.%s WHERE amount > 100", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("Bob"); + assertThat(result.toString()).contains("Charlie"); + } + + @Test + public void testShowViews() { + // Create multiple views + sql( + String.format( + "CREATE VIEW %s.view1 AS SELECT * FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "CREATE VIEW %s.view2 AS SELECT id, name FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + + // List views + List result = sql("SHOW VIEWS"); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("view1"); + assertThat(result.toString()).contains("view2"); + } + + @Test + public void testDropView() { + // Create a view + String viewName = "temp_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Verify view exists + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).contains(viewName); + + // Drop the view + sql(String.format("DROP VIEW %s.%s", DATABASE_NAME, viewName)); + + // Verify view is dropped + views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testViewWithAggregation() { + // Create a view with aggregation + String viewName = "agg_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT COUNT(*) as cnt, SUM(amount) as total FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + assertThat(result.get(0).getField(0)).isEqualTo(3L); // count + assertThat((Double) result.get(0).getField(1)).isEqualTo(450.0); // sum + } + + @Test + public void testCreateViewIfNotExists() { + String viewName = "idempotent_view"; + + // Create view first time + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Create view again with IF NOT EXISTS - should not throw error + sql( + String.format( + "CREATE VIEW IF NOT EXISTS %s.%s AS SELECT id FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Original view should remain unchanged + List result = + sql(String.format("SELECT * FROM %s.%s LIMIT 1", DATABASE_NAME, viewName)); + assertThat(result.get(0).getArity()).isEqualTo(3); // original view has 3 columns + } + + @Test + public void testDropViewIfExists() { + String viewName = "maybe_exists_view"; + + // Drop non-existent view with IF EXISTS - should not throw error + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Create and drop + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Verify it's gone + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testShowCreateView() { + String viewName = "describable_view"; + String query = + String.format( + "SELECT `name`, `amount` FROM `%s`.`%s` WHERE `amount` > 100", + DATABASE_NAME, TABLE_NAME); + sql(String.format("CREATE VIEW %s.%s AS %s", DATABASE_NAME, viewName, query)); + + // Show create view + List result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + String createStatement = result.get(0).toString(); + assertThat(createStatement).contains(viewName); + assertThat(createStatement).contains("amount"); + } +} From 7c1c9191d76d8bd6d355866596fffc6def2b8870 Mon Sep 17 00:00:00 2001 From: tchivs Date: Thu, 11 Jun 2026 15:10:30 +0800 Subject: [PATCH 2/5] Fix JDBC catalog view edge cases --- docs/docs/concepts/views.md | 6 + docs/docs/flink/sql-ddl.md | 12 ++ docs/docs/spark/sql-ddl.md | 5 +- .../org/apache/paimon/jdbc/JdbcCatalog.java | 193 ++++++++++++++++-- .../org/apache/paimon/jdbc/JdbcUtils.java | 32 +-- .../apache/paimon/jdbc/JdbcCatalogTest.java | 91 +++++++++ 6 files changed, 302 insertions(+), 37 deletions(-) diff --git a/docs/docs/concepts/views.md b/docs/docs/concepts/views.md index 8f0559fde9c4..b6c649ff5df9 100644 --- a/docs/docs/concepts/views.md +++ b/docs/docs/concepts/views.md @@ -36,6 +36,8 @@ View metadata is persisted only when the catalog implementation supports it: - **Hive metastore catalog** – view metadata is stored together with table metadata inside the metastore warehouse. - **REST catalog** – view metadata is kept in the REST backend and exposed through the catalog API. +- **JDBC catalog** – view metadata is stored in the catalog database in the `paimon_views` + metadata table. The table is created automatically when the JDBC catalog is initialized. File-system catalogs do not currently support views because they lack persistent metadata storage. @@ -57,6 +59,10 @@ view using their native dialect. Use `CREATE VIEW` or `CREATE OR REPLACE VIEW` to register a view. Paimon assigns a UUID, writes the first metadata file, and records version `1`. +The catalog stores the view definition. It does not resolve or validate referenced tables when the +view is created, so missing or cross-database table references are checked by the compute engine when +the view is queried. + ```sql CREATE VIEW sales_view AS SELECT region, SUM(amount) AS total_amount diff --git a/docs/docs/flink/sql-ddl.md b/docs/docs/flink/sql-ddl.md index 7708af840afc..835bc376e120 100644 --- a/docs/docs/flink/sql-ddl.md +++ b/docs/docs/flink/sql-ddl.md @@ -146,6 +146,18 @@ You can also perform logical isolation for databases under multiple catalogs by Additionally, when creating a JdbcCatalog, you can specify the maximum length for the lock key by configuring "lock-key-max-length," which defaults to 255. Since this value is a combination of {catalog-key}.{database-name}.{table-name}, please adjust accordingly. +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. The view SQL is stored as catalog metadata and is not +resolved when the view is created. + +```sql +CREATE VIEW sales_view AS SELECT name, amount FROM sales WHERE amount > 100; + +SHOW VIEWS; + +DROP VIEW sales_view; +``` + You can define any default table options with the prefix `table-default.` for tables created in the catalog. ## Create Table diff --git a/docs/docs/spark/sql-ddl.md b/docs/docs/spark/sql-ddl.md index 66ac0bc8b1cb..4e6b9566b528 100644 --- a/docs/docs/spark/sql-ddl.md +++ b/docs/docs/spark/sql-ddl.md @@ -103,6 +103,9 @@ Paimon JDBC Catalog in Spark needs to correctly add the corresponding jar packag | mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | | postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. + ```bash spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ @@ -375,7 +378,7 @@ CREATE TABLE target_tbl LIKE source_tbl; ## View Views are based on the result-set of an SQL query, when using `org.apache.paimon.spark.SparkCatalog`, views are managed by paimon itself. -And in this case, views are supported when the `metastore` type is `hive` or `rest`. +And in this case, views are supported when the `metastore` type is `hive`, `rest` or `jdbc`. ### Create Or Replace View diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 0106d610b23d..c6f8828c5558 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.PagedList; +import org.apache.paimon.TableType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; @@ -71,9 +72,12 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties; @@ -137,51 +141,66 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = - dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null); - if (tableExists.next()) { - return true; + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); }); // Check and create database properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) - .execute(); }); // Check and create table properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE).execute(); }); // Check and create view table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = - dbMeta.getTables(null, null, JdbcUtils.VIEW_TABLE_NAME, null); - if (tableExists.next()) { - return true; + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.VIEW_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_VIEW_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_VIEW_TABLE).execute(); }); // if lock enabled, Check and create distributed lock table. @@ -246,6 +265,26 @@ protected void createDatabaseImpl(String name, Map properties) { insertProperties(connections, catalogKey, name, createProps); } + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + checkNotSystemDatabase(name); + try { + getDatabase(name); + } catch (DatabaseNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw new DatabaseNotExistException(name); + } + + if (!cascade && (!listTables(name).isEmpty() || !listViews(name).isEmpty())) { + throw new DatabaseNotEmptyException(name); + } + + dropDatabaseImpl(name); + } + @Override protected void dropDatabaseImpl(String name) { // Delete table from paimon_tables @@ -312,6 +351,55 @@ protected List listTablesImpl(String databaseName) { databaseName); } + @Override + public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + checkNotBranch(identifier, "createTable"); + checkNotSystemTable(identifier, "createTable"); + validateCreateTable(schema, false); + validateCustomTablePath(schema.options()); + + getDatabase(identifier.getDatabaseName()); + + try { + getTable(identifier); + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(identifier); + } catch (TableNotExistException ignored) { + } + + if (JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(identifier); + } + + copyTableDefaultOptions(schema.options()); + + TableType tableType = Options.fromMap(schema.options()).get(TYPE); + switch (tableType) { + case TABLE: + case MATERIALIZED_TABLE: + createTableImpl(identifier, schema); + break; + case FORMAT_TABLE: + createFormatTable(identifier, schema); + break; + case OBJECT_TABLE: + throw new UnsupportedOperationException( + String.format( + "Catalog %s cannot support object tables.", + this.getClass().getName())); + } + } + @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { @@ -392,6 +480,37 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } + @Override + public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + checkNotBranch(fromTable, "renameTable"); + checkNotBranch(toTable, "renameTable"); + checkNotSystemTable(fromTable, "renameTable"); + checkNotSystemTable(toTable, "renameTable"); + + try { + getTable(fromTable); + } catch (TableNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(fromTable); + } + + try { + getTable(toTable); + throw new TableAlreadyExistException(toTable); + } catch (TableNotExistException ignored) { + } + + if (JdbcUtils.viewExists( + connections, catalogKey, toTable.getDatabaseName(), toTable.getObjectName())) { + throw new TableAlreadyExistException(toTable); + } + + renameTableImpl(fromTable, toTable); + } + @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { @@ -598,6 +717,19 @@ private boolean syncTableProperties() { return options.get(CatalogOptions.SYNC_ALL_PROPERTIES); } + private void copyTableDefaultOptions(Map tableOptions) { + tableDefaultOptions.forEach(tableOptions::putIfAbsent); + } + + private void validateCustomTablePath(Map tableOptions) { + if (!allowCustomTablePath() && tableOptions.containsKey(PATH.key())) { + throw new UnsupportedOperationException( + String.format( + "The current catalog %s does not support specifying the table path when creating a table.", + this.getClass().getSimpleName())); + } + } + private Map convertToPropertiesTableKey(TableSchema tableSchema) { Map properties = new HashMap<>(); if (!tableSchema.primaryKeys().isEmpty()) { @@ -707,6 +839,17 @@ public void createView(Identifier identifier, View view, boolean ignoreIfExists) throw e; } + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new ViewAlreadyExistException(identifier); + } + // Check if view already exists if (JdbcUtils.viewExists( connections, @@ -841,6 +984,14 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { throw new ViewAlreadyExistException(toView); } + if (JdbcUtils.tableExists( + connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + if (!JdbcUtils.databaseExists(connections, catalogKey, toView.getDatabaseName())) { + throw new IllegalArgumentException( + String.format("Database %s does not exist.", toView.getDatabaseName())); + } // Rename view try { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index a7571fd62ad0..42867b5feba6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.Options; @@ -40,6 +41,7 @@ /** Util for jdbc catalog. */ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + private static final String UNIQUE_CONSTRAINT_VIOLATION_STATE = "23505"; public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_KEY = "catalog_key"; public static final String TABLE_DATABASE = "database_name"; @@ -483,9 +485,7 @@ public static void updateTable( int updatedRecords = execute( err -> { - if (err instanceof SQLIntegrityConstraintViolationException - || (err.getMessage() != null - && err.getMessage().contains("constraint failed"))) { + if (isUniqueConstraintViolation(err)) { throw new RuntimeException( String.format("Table already exists: %s", toTable)); } @@ -806,13 +806,11 @@ public static String getViewSchema( sql.setString(1, storeKey); sql.setString(2, databaseName); sql.setString(3, viewName); - ResultSet rs = sql.executeQuery(); - if (rs.next()) { - String schema = rs.getString(VIEW_SCHEMA); - rs.close(); - return schema; + try (ResultSet rs = sql.executeQuery()) { + if (rs.next()) { + return rs.getString(VIEW_SCHEMA); + } } - rs.close(); return null; } }); @@ -828,9 +826,7 @@ public static void insertView( int insertedRecords = execute( err -> { - if (err instanceof SQLIntegrityConstraintViolationException - || (err.getMessage() != null - && err.getMessage().contains("constraint failed"))) { + if (isUniqueConstraintViolation(err)) { throw new RuntimeException( String.format( "View already exists: %s.%s", @@ -884,9 +880,7 @@ public static void renameView( int updatedRecords = execute( err -> { - if (err instanceof SQLIntegrityConstraintViolationException - || (err.getMessage() != null - && err.getMessage().contains("constraint failed"))) { + if (isUniqueConstraintViolation(err)) { throw new RuntimeException( String.format("View already exists: %s", toView)); } @@ -909,4 +903,12 @@ public static void renameView( updatedRecords); } } + + @VisibleForTesting + static boolean isUniqueConstraintViolation(SQLException err) { + String message = err.getMessage(); + return err instanceof SQLIntegrityConstraintViolationException + || UNIQUE_CONSTRAINT_VIOLATION_STATE.equals(err.getSQLState()) + || (message != null && message.contains("constraint failed")); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 77b1b67533ee..3c1dac34b59f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -45,6 +45,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -162,6 +163,22 @@ protected boolean supportsView() { return true; } + @Test + public void testUniqueConstraintViolationDetection() { + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLIntegrityConstraintViolationException("duplicate entry"))) + .isTrue(); + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLException("duplicate key", "23505"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("constraint failed"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("syntax error"))) + .isFalse(); + } + @Test public void testRepairTableNotExist() throws Exception { String databaseName = "repair_db"; @@ -732,6 +749,24 @@ public void testDropDatabaseCleansViewMetadata() throws Exception { .isNull(); } + @Test + public void testDropDatabaseWithoutCascadeRejectsViewOnlyDatabase() throws Exception { + String databaseName = "view_only_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + + assertThatThrownBy(() -> catalog.dropDatabase(databaseName, false, false)) + .isInstanceOf(Catalog.DatabaseNotEmptyException.class) + .hasMessage("Database " + databaseName + " is not empty."); + assertThat(catalog.getView(identifier).fullName()).isEqualTo(identifier.getFullName()); + + catalog.dropDatabase(databaseName, false, true); + assertThatThrownBy(() -> catalog.getDatabase(databaseName)) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + } + @Test public void testRenameViewAcrossDatabases() throws Exception { String sourceDatabase = "source_view_db"; @@ -752,6 +787,62 @@ public void testRenameViewAcrossDatabases() throws Exception { assertThat(catalog.listViews(targetDatabase)).containsExactly("view_name"); } + @Test + public void testRenameViewRejectsMissingTargetDatabase() throws Exception { + String sourceDatabase = "source_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create("missing_view_db", "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createView(source, createView(source), false); + + assertThatThrownBy(() -> catalog.renameView(source, target, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database missing_view_db does not exist."); + assertThat(catalog.getView(source).fullName()).isEqualTo(source.getFullName()); + assertThatThrownBy(() -> catalog.getView(target)) + .isInstanceOf(Catalog.ViewNotExistException.class); + } + + @Test + public void testTableAndViewCannotShareName() throws Exception { + String databaseName = "shared_name_db"; + Identifier tableFirst = Identifier.create(databaseName, "table_first"); + Identifier viewFirst = Identifier.create(databaseName, "view_first"); + + catalog.createDatabase(databaseName, false); + + catalog.createTable(tableFirst, DEFAULT_TABLE_SCHEMA, false); + assertThatThrownBy(() -> catalog.createView(tableFirst, createView(tableFirst), false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + catalog.createView(tableFirst, createView(tableFirst), true); + assertThat(catalog.listViews(databaseName)).doesNotContain(tableFirst.getObjectName()); + + catalog.createView(viewFirst, createView(viewFirst), false); + assertThatThrownBy(() -> catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, true); + assertThat(catalog.listTables(databaseName)).doesNotContain(viewFirst.getObjectName()); + } + + @Test + public void testRenameRejectsTableViewNameCollision() throws Exception { + String databaseName = "rename_collision_db"; + Identifier table = Identifier.create(databaseName, "table_name"); + Identifier view = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createTable(table, DEFAULT_TABLE_SCHEMA, false); + catalog.createView(view, createView(view), false); + + assertThatThrownBy(() -> catalog.renameTable(table, view, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + assertThatThrownBy(() -> catalog.renameView(view, table, false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + assertThat(catalog.listTables(databaseName)).containsExactly(table.getObjectName()); + assertThat(catalog.listViews(databaseName)).containsExactly(view.getObjectName()); + } + @Test public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { String databaseName = "concurrent_view_db"; From 9bc435bc5cdf98bbf513db5dff41aa9bb9feda49 Mon Sep 17 00:00:00 2001 From: tchivs Date: Tue, 23 Jun 2026 15:49:25 +0800 Subject: [PATCH 3/5] Fix JDBC catalog view review feedback --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 148 ++++++++++++------ .../apache/paimon/jdbc/JdbcCatalogTest.java | 56 +++++++ 2 files changed, 152 insertions(+), 52 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index c6f8828c5558..b266fa5fa1b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -361,35 +361,33 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx getDatabase(identifier.getDatabaseName()); - try { - getTable(identifier); - if (ignoreIfExists) { - return; - } - throw new TableAlreadyExistException(identifier); - } catch (TableNotExistException ignored) { - } - - if (JdbcUtils.viewExists( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName())) { - if (ignoreIfExists) { - return; - } - throw new TableAlreadyExistException(identifier); - } - copyTableDefaultOptions(schema.options()); TableType tableType = Options.fromMap(schema.options()).get(TYPE); switch (tableType) { case TABLE: case MATERIALIZED_TABLE: - createTableImpl(identifier, schema); + try { + runWithLock( + identifier, + () -> { + if (!validateTableNotExists(identifier, ignoreIfExists)) { + return null; + } + createTableImplWithLock(identifier, schema); + return null; + }); + } catch (TableAlreadyExistException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Failed to create table " + identifier.getFullName(), e); + } break; case FORMAT_TABLE: + if (!validateTableNotExists(identifier, ignoreIfExists)) { + return; + } createFormatTable(identifier, schema); break; case OBJECT_TABLE: @@ -443,11 +441,24 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { + try { + runWithLock( + identifier, + () -> { + validateTableNotExists(identifier, false); + createTableImplWithLock(identifier, schema); + return null; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); + } + } + + private void createTableImplWithLock(Identifier identifier, Schema schema) { try { // create table file SchemaManager schemaManager = getSchemaManager(identifier); - TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); + TableSchema tableSchema = schemaManager.createTable(schema); // Update schema metadata Path path = getTableLocation(identifier); if (JdbcUtils.insertTable( @@ -480,6 +491,26 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } + private boolean validateTableNotExists(Identifier identifier, boolean ignoreIfExists) + throws TableAlreadyExistException { + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return false; + } + throw new TableAlreadyExistException(identifier); + } + return true; + } + @Override public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException { @@ -839,29 +870,6 @@ public void createView(Identifier identifier, View view, boolean ignoreIfExists) throw e; } - if (JdbcUtils.tableExists( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName())) { - if (ignoreIfExists) { - return; - } - throw new ViewAlreadyExistException(identifier); - } - - // Check if view already exists - if (JdbcUtils.viewExists( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName())) { - if (ignoreIfExists) { - return; - } - throw new ViewAlreadyExistException(identifier); - } - // Serialize view schema to JSON ViewSchema viewSchema = new ViewSchema( @@ -874,20 +882,52 @@ public void createView(Identifier identifier, View view, boolean ignoreIfExists) // Insert view try { - JdbcUtils.insertView( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName(), - viewSchemaJson); + runWithLock( + identifier, + () -> { + if (!validateViewNotExists(identifier, ignoreIfExists)) { + return null; + } + JdbcUtils.insertView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + return null; + }); } catch (RuntimeException e) { if (e.getMessage() != null && e.getMessage().contains("View already exists")) { throw new ViewAlreadyExistException(identifier, e); } throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); + } catch (ViewAlreadyExistException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); } } + private boolean validateViewNotExists(Identifier identifier, boolean ignoreIfExists) + throws ViewAlreadyExistException { + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return false; + } + throw new ViewAlreadyExistException(identifier); + } + return true; + } + @Override public void dropView(Identifier identifier, boolean ignoreIfNotExists) throws ViewNotExistException { @@ -922,6 +962,10 @@ public void dropView(Identifier identifier, boolean ignoreIfNotExists) @Override public List listViews(String databaseName) throws DatabaseNotExistException { + if (CatalogUtils.isSystemDatabase(databaseName)) { + return Collections.emptyList(); + } + // Check if database exists if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) { throw new DatabaseNotExistException(databaseName); diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 3c1dac34b59f..d8cd98413876 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -843,6 +843,19 @@ public void testRenameRejectsTableViewNameCollision() throws Exception { assertThat(catalog.listViews(databaseName)).containsExactly(view.getObjectName()); } + @Test + public void testListViewsFromSystemDatabase() throws Exception { + assertThat(catalog.listViews(Catalog.SYSTEM_DATABASE_NAME)).isEmpty(); + assertThat( + catalog.listViewsPaged(Catalog.SYSTEM_DATABASE_NAME, 10, null, null) + .getElements()) + .isEmpty(); + assertThat( + catalog.listViewDetailsPaged(Catalog.SYSTEM_DATABASE_NAME, 10, null, null) + .getElements()) + .isEmpty(); + } + @Test public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { String databaseName = "concurrent_view_db"; @@ -875,6 +888,49 @@ public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { } } + @Test + public void testConcurrentCreateTableAndViewCannotShareName() throws Exception { + String databaseName = "concurrent_table_view_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + catalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listTables(databaseName).contains(identifier.getObjectName())) + .isNotEqualTo( + catalog.listViews(databaseName).contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + @Test public void testAlterView() throws Exception { Identifier identifier = Identifier.create("alter_view_db", "my_view"); From 0fe1ace2a20e4cd2861bfaf3f994e194d6833cf0 Mon Sep 17 00:00:00 2001 From: tchivs Date: Tue, 23 Jun 2026 16:48:17 +0800 Subject: [PATCH 4/5] Fix JDBC catalog view concurrency handling --- .../apache/paimon/catalog/CachingCatalog.java | 36 -- .../org/apache/paimon/jdbc/JdbcCatalog.java | 344 +++++++++++------- .../org/apache/paimon/jdbc/JdbcUtils.java | 57 ++- 3 files changed, 270 insertions(+), 167 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 3a0a6c909a88..afe4ed8ae6de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -32,8 +32,6 @@ import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; -import org.apache.paimon.view.View; -import org.apache.paimon.view.ViewChange; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -370,40 +368,6 @@ public void invalidateTable(Identifier identifier) { } } - @Override - public void createView(Identifier identifier, View view, boolean ignoreIfExists) - throws ViewAlreadyExistException, DatabaseNotExistException { - wrapped.createView(identifier, view, ignoreIfExists); - } - - @Override - public View getView(Identifier identifier) throws ViewNotExistException { - return wrapped.getView(identifier); - } - - @Override - public List listViews(String databaseName) throws DatabaseNotExistException { - return wrapped.listViews(databaseName); - } - - @Override - public void dropView(Identifier identifier, boolean ignoreIfNotExists) - throws ViewNotExistException { - wrapped.dropView(identifier, ignoreIfNotExists); - } - - @Override - public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) - throws ViewNotExistException, ViewAlreadyExistException { - wrapped.renameView(fromView, toView, ignoreIfNotExists); - } - - @Override - public void alterView(Identifier view, List viewChanges, boolean ignoreIfNotExists) - throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { - wrapped.alterView(view, viewChanges, ignoreIfNotExists); - } - // ================================== Cache Public API // ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index b266fa5fa1b5..605c33764946 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -33,6 +33,8 @@ import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.jdbc.JdbcUtils.JdbcViewConflictException; +import org.apache.paimon.jdbc.JdbcUtils.JdbcViewConflictKind; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -233,6 +235,11 @@ public List listDatabases() { row -> row.getString(JdbcUtils.DATABASE_NAME), JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL, catalogKey)); + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.VIEW_DATABASE), + JdbcUtils.LIST_ALL_VIEW_DATABASES_SQL, + catalogKey)); return databases.stream().distinct().collect(Collectors.toList()); } @@ -651,6 +658,37 @@ public T runWithLock(Identifier identifier, Callable callable) throws Exc return Lock.fromCatalog(lock, identifier).runWithLock(callable); } + private T runWithLocks( + Identifier firstIdentifier, Identifier secondIdentifier, Callable callable) + throws Exception { + if (firstIdentifier.equals(secondIdentifier)) { + return runWithLock(firstIdentifier, callable); + } + + Identifier firstLock = firstIdentifier; + Identifier secondLock = secondIdentifier; + if (lockKey(firstIdentifier).compareTo(lockKey(secondIdentifier)) > 0) { + firstLock = secondIdentifier; + secondLock = firstIdentifier; + } + + Identifier nestedLock = secondLock; + return runWithLock(firstLock, () -> runWithLock(nestedLock, callable)); + } + + private String lockKey(Identifier identifier) { + return identifier.getDatabaseName() + "\0" + identifier.getObjectName(); + } + + private RuntimeException viewOperationException( + String operation, Identifier identifier, Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return new RuntimeException( + "Failed to " + operation + " view " + identifier.getFullName(), e); + } + @Override public void repairCatalog() { List databases; @@ -856,6 +894,9 @@ public View getView(Identifier identifier) throws ViewNotExistException { viewSchema.comment(), viewSchema.options()); } catch (SQLException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException("Failed to get view " + identifier.getFullName(), e); } } @@ -864,11 +905,7 @@ public View getView(Identifier identifier) throws ViewNotExistException { public void createView(Identifier identifier, View view, boolean ignoreIfExists) throws ViewAlreadyExistException, DatabaseNotExistException { // Check if database exists - try { - getDatabase(identifier.getDatabaseName()); - } catch (DatabaseNotExistException e) { - throw e; - } + getDatabase(identifier.getDatabaseName()); // Serialize view schema to JSON ViewSchema viewSchema = @@ -896,15 +933,15 @@ public void createView(Identifier identifier, View view, boolean ignoreIfExists) viewSchemaJson); return null; }); - } catch (RuntimeException e) { - if (e.getMessage() != null && e.getMessage().contains("View already exists")) { - throw new ViewAlreadyExistException(identifier, e); - } - throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); } catch (ViewAlreadyExistException e) { throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.ALREADY_EXISTS) { + throw new ViewAlreadyExistException(identifier, e); + } + throw viewOperationException("create", identifier, e); } catch (Exception e) { - throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); + throw viewOperationException("create", identifier, e); } } @@ -931,32 +968,39 @@ private boolean validateViewNotExists(Identifier identifier, boolean ignoreIfExi @Override public void dropView(Identifier identifier, boolean ignoreIfNotExists) throws ViewNotExistException { - // Check if view exists - if (!JdbcUtils.viewExists( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName())) { - if (ignoreIfNotExists) { - return; - } - throw new ViewNotExistException(identifier); - } - - // Delete view - int deletedRecords = - execute( - connections, - JdbcUtils.DROP_VIEW_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName()); - - if (deletedRecords != 1) { - throw new RuntimeException( - String.format( - "Failed to drop view %s: affected %d rows", - identifier.getFullName(), deletedRecords)); + try { + runWithLock( + identifier, + () -> { + if (!JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfNotExists) { + return null; + } + throw new ViewNotExistException(identifier); + } + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_VIEW_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (deletedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to drop view %s: affected %d rows", + identifier.getFullName(), deletedRecords)); + } + return null; + }); + } catch (ViewNotExistException e) { + throw e; + } catch (Exception e) { + throw viewOperationException("drop", identifier, e); } } @@ -978,6 +1022,7 @@ public List listViews(String databaseName) throws DatabaseNotExistExcept databaseName); } + // TODO: Implement actual paging and pattern filtering @Override public PagedList listViewsPaged( String databaseName, Integer maxResults, String pageToken, String viewNamePattern) @@ -1014,36 +1059,52 @@ public PagedList listViewDetailsPaged( @Override public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) throws ViewNotExistException, ViewAlreadyExistException { - // Check if source view exists - if (!JdbcUtils.viewExists( - connections, catalogKey, fromView.getDatabaseName(), fromView.getObjectName())) { - if (ignoreIfNotExists) { - return; - } - throw new ViewNotExistException(fromView); - } + try { + runWithLocks( + fromView, + toView, + () -> { + if (!JdbcUtils.viewExists( + connections, + catalogKey, + fromView.getDatabaseName(), + fromView.getObjectName())) { + if (ignoreIfNotExists) { + return null; + } + throw new ViewNotExistException(fromView); + } - // Check if target view already exists - if (JdbcUtils.viewExists( - connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { - throw new ViewAlreadyExistException(toView); - } - if (JdbcUtils.tableExists( - connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { - throw new ViewAlreadyExistException(toView); - } - if (!JdbcUtils.databaseExists(connections, catalogKey, toView.getDatabaseName())) { - throw new IllegalArgumentException( - String.format("Database %s does not exist.", toView.getDatabaseName())); - } + if (!JdbcUtils.databaseExists( + connections, catalogKey, toView.getDatabaseName())) { + throw new IllegalArgumentException( + String.format( + "Database %s does not exist.", + toView.getDatabaseName())); + } - // Rename view - try { - JdbcUtils.renameView(connections, catalogKey, fromView, toView); - } catch (RuntimeException e) { - if (e.getMessage() != null && e.getMessage().contains("View already exists")) { + if (JdbcUtils.viewExists( + connections, + catalogKey, + toView.getDatabaseName(), + toView.getObjectName()) + || JdbcUtils.tableExists( + connections, + catalogKey, + toView.getDatabaseName(), + toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + + JdbcUtils.renameView(connections, catalogKey, fromView, toView); + return null; + }); + } catch (ViewAlreadyExistException | ViewNotExistException e) { + throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.ALREADY_EXISTS) { throw new ViewAlreadyExistException(toView, e); - } else if (e.getMessage() != null && e.getMessage().contains("View does not exist")) { + } else if (e.kind() == JdbcViewConflictKind.NOT_EXISTS) { throw new ViewNotExistException(fromView, e); } throw new RuntimeException( @@ -1052,6 +1113,18 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN + " to " + toView.getFullName(), e); + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + "Failed to rename view from " + + fromView.getFullName() + + " to " + + toView.getFullName(), + e); } } @@ -1059,72 +1132,89 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN public void alterView( Identifier identifier, List changes, boolean ignoreIfNotExists) throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { - // Get existing view - View existingView; try { - existingView = getView(identifier); - } catch (ViewNotExistException e) { - if (ignoreIfNotExists) { - return; - } - throw e; - } + runWithLock( + identifier, + () -> { + View existingView; + try { + existingView = getView(identifier); + } catch (ViewNotExistException e) { + if (ignoreIfNotExists) { + return null; + } + throw e; + } - // Apply changes - Map newOptions = new HashMap<>(existingView.options()); - String newComment = existingView.comment().orElse(null); - Map newDialects = new HashMap<>(existingView.dialects()); - for (ViewChange change : changes) { - if (change instanceof ViewChange.SetViewOption) { - ViewChange.SetViewOption setOption = (ViewChange.SetViewOption) change; - newOptions.put(setOption.key(), setOption.value()); - } else if (change instanceof ViewChange.RemoveViewOption) { - ViewChange.RemoveViewOption removeOption = (ViewChange.RemoveViewOption) change; - newOptions.remove(removeOption.key()); - } else if (change instanceof ViewChange.UpdateViewComment) { - ViewChange.UpdateViewComment updateComment = (ViewChange.UpdateViewComment) change; - newComment = updateComment.comment(); - } else if (change instanceof ViewChange.AddDialect) { - ViewChange.AddDialect addDialect = (ViewChange.AddDialect) change; - if (newDialects.containsKey(addDialect.dialect())) { - throw new DialectAlreadyExistException(identifier, addDialect.dialect()); - } - newDialects.put(addDialect.dialect(), addDialect.query()); - } else if (change instanceof ViewChange.UpdateDialect) { - ViewChange.UpdateDialect updateDialect = (ViewChange.UpdateDialect) change; - if (!newDialects.containsKey(updateDialect.dialect())) { - throw new DialectNotExistException(identifier, updateDialect.dialect()); - } - newDialects.put(updateDialect.dialect(), updateDialect.query()); - } else if (change instanceof ViewChange.DropDialect) { - ViewChange.DropDialect dropDialect = (ViewChange.DropDialect) change; - if (!newDialects.containsKey(dropDialect.dialect())) { - throw new DialectNotExistException(identifier, dropDialect.dialect()); - } - newDialects.remove(dropDialect.dialect()); - } - } + Map newOptions = new HashMap<>(existingView.options()); + String newComment = existingView.comment().orElse(null); + Map newDialects = new HashMap<>(existingView.dialects()); + for (ViewChange change : changes) { + if (change instanceof ViewChange.SetViewOption) { + ViewChange.SetViewOption setOption = + (ViewChange.SetViewOption) change; + newOptions.put(setOption.key(), setOption.value()); + } else if (change instanceof ViewChange.RemoveViewOption) { + ViewChange.RemoveViewOption removeOption = + (ViewChange.RemoveViewOption) change; + newOptions.remove(removeOption.key()); + } else if (change instanceof ViewChange.UpdateViewComment) { + ViewChange.UpdateViewComment updateComment = + (ViewChange.UpdateViewComment) change; + newComment = updateComment.comment(); + } else if (change instanceof ViewChange.AddDialect) { + ViewChange.AddDialect addDialect = (ViewChange.AddDialect) change; + if (newDialects.containsKey(addDialect.dialect())) { + throw new DialectAlreadyExistException( + identifier, addDialect.dialect()); + } + newDialects.put(addDialect.dialect(), addDialect.query()); + } else if (change instanceof ViewChange.UpdateDialect) { + ViewChange.UpdateDialect updateDialect = + (ViewChange.UpdateDialect) change; + if (!newDialects.containsKey(updateDialect.dialect())) { + throw new DialectNotExistException( + identifier, updateDialect.dialect()); + } + newDialects.put(updateDialect.dialect(), updateDialect.query()); + } else if (change instanceof ViewChange.DropDialect) { + ViewChange.DropDialect dropDialect = + (ViewChange.DropDialect) change; + if (!newDialects.containsKey(dropDialect.dialect())) { + throw new DialectNotExistException( + identifier, dropDialect.dialect()); + } + newDialects.remove(dropDialect.dialect()); + } + } - // Create updated view schema - ViewSchema updatedSchema = - new ViewSchema( - existingView.rowType().getFields(), - existingView.query(), - newDialects, - newComment, - newOptions); - String viewSchemaJson = JsonSerdeUtil.toJson(updatedSchema); - - // Update view - try { - JdbcUtils.updateView( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName(), - viewSchemaJson); - } catch (RuntimeException e) { - throw new RuntimeException("Failed to alter view " + identifier.getFullName(), e); + ViewSchema updatedSchema = + new ViewSchema( + existingView.rowType().getFields(), + existingView.query(), + newDialects, + newComment, + newOptions); + String viewSchemaJson = JsonSerdeUtil.toJson(updatedSchema); + JdbcUtils.updateView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + return null; + }); + } catch (ViewNotExistException + | DialectAlreadyExistException + | DialectNotExistException e) { + throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.NOT_EXISTS) { + throw new ViewNotExistException(identifier, e); + } + throw viewOperationException("alter", identifier, e); + } catch (Exception e) { + throw viewOperationException("alter", identifier, e); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 42867b5feba6..b2e00eb4b3c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -42,6 +42,26 @@ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); private static final String UNIQUE_CONSTRAINT_VIOLATION_STATE = "23505"; + + enum JdbcViewConflictKind { + ALREADY_EXISTS, + NOT_EXISTS + } + + /** Internal exception for JDBC view conflict (unique constraint violation or not-found). */ + static class JdbcViewConflictException extends RuntimeException { + private final JdbcViewConflictKind kind; + + JdbcViewConflictException(JdbcViewConflictKind kind, String message) { + super(message); + this.kind = kind; + } + + JdbcViewConflictKind kind() { + return kind; + } + } + public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_KEY = "catalog_key"; public static final String TABLE_DATABASE = "database_name"; @@ -371,6 +391,17 @@ public class JdbcUtils { + VIEW_NAME + " = ? "; + static final String GET_VIEW_DATABASE_SQL = + "SELECT " + + VIEW_DATABASE + + " FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? LIMIT 1"; + static final String LIST_VIEWS_SQL = "SELECT * FROM " + VIEW_TABLE_NAME @@ -444,6 +475,15 @@ public class JdbcUtils { + VIEW_NAME + " = ? "; + static final String LIST_ALL_VIEW_DATABASES_SQL = + "SELECT DISTINCT " + + VIEW_DATABASE + + " FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ?"; + public static Properties extractJdbcConfiguration( Map properties, String prefix) { Properties result = new Properties(); @@ -519,6 +559,9 @@ public static boolean databaseExists( if (exists(connections, JdbcUtils.GET_DATABASE_PROPERTIES_SQL, storeKey, databaseName)) { return true; } + if (exists(connections, JdbcUtils.GET_VIEW_DATABASE_SQL, storeKey, databaseName)) { + return true; + } return false; } @@ -578,6 +621,7 @@ public static int execute( sqlErrorHandler.accept(e); throw new RuntimeException(String.format("Failed to execute: %s", sql), e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted in SQL command", e); } } @@ -827,7 +871,8 @@ public static void insertView( execute( err -> { if (isUniqueConstraintViolation(err)) { - throw new RuntimeException( + throw new JdbcViewConflictException( + JdbcViewConflictKind.ALREADY_EXISTS, String.format( "View already exists: %s.%s", databaseName, viewName)); @@ -865,7 +910,8 @@ public static void updateView( viewName); if (updatedRecords == 0) { - throw new RuntimeException( + throw new JdbcViewConflictException( + JdbcViewConflictKind.NOT_EXISTS, String.format("View does not exist: %s.%s", databaseName, viewName)); } else if (updatedRecords != 1) { LOG.warn( @@ -881,7 +927,8 @@ public static void renameView( execute( err -> { if (isUniqueConstraintViolation(err)) { - throw new RuntimeException( + throw new JdbcViewConflictException( + JdbcViewConflictKind.ALREADY_EXISTS, String.format("View already exists: %s", toView)); } }, @@ -896,7 +943,9 @@ public static void renameView( if (updatedRecords == 1) { LOG.info("Renamed view from {}, to {}", fromView, toView); } else if (updatedRecords == 0) { - throw new RuntimeException(String.format("View does not exist: %s", fromView)); + throw new JdbcViewConflictException( + JdbcViewConflictKind.NOT_EXISTS, + String.format("View does not exist: %s", fromView)); } else { LOG.warn( "Rename operation affected {} rows: the view table's primary key assumption has been violated", From 0d16ab06e3bf822087e71edbc60ba735d6d392e3 Mon Sep 17 00:00:00 2001 From: tchivs Date: Wed, 24 Jun 2026 10:54:04 +0800 Subject: [PATCH 5/5] Fix JDBC catalog view locking edge cases --- docs/docs/concepts/catalog.md | 6 + docs/docs/concepts/views.md | 28 +- docs/docs/flink/sql-ddl.md | 5 + docs/docs/spark/sql-ddl.md | 5 + .../org/apache/paimon/jdbc/JdbcCatalog.java | 165 ++++++++---- .../apache/paimon/jdbc/JdbcCatalogTest.java | 249 +++++++++++++++++- 6 files changed, 406 insertions(+), 52 deletions(-) diff --git a/docs/docs/concepts/catalog.md b/docs/docs/concepts/catalog.md index 3eb9f2c13570..b6f60dbdd280 100644 --- a/docs/docs/concepts/catalog.md +++ b/docs/docs/concepts/catalog.md @@ -92,3 +92,9 @@ CREATE CATALOG my_jdbc WITH ( 'warehouse' = 'hdfs:///path/to/warehouse' ); ``` + +The JDBC catalog also persists Paimon views. View metadata is stored in an automatically created +`paimon_views` table, and table/view names share a single namespace per database (a name cannot be +used by both a table and a view at the same time). See [Views](./views) for details on the JDBC +catalog upgrade requirements, single-process locking semantics, and behavior of `DROP DATABASE` +against view-only databases. diff --git a/docs/docs/concepts/views.md b/docs/docs/concepts/views.md index b6c649ff5df9..016171b01ced 100644 --- a/docs/docs/concepts/views.md +++ b/docs/docs/concepts/views.md @@ -41,7 +41,6 @@ View metadata is persisted only when the catalog implementation supports it: File-system catalogs do not currently support views because they lack persistent metadata storage. - ### Representation structure | Field | Type | Description | @@ -52,6 +51,33 @@ File-system catalogs do not currently support views because they lack persistent Multiple representations can be stored for the same version so that different engines can access the view using their native dialect. +### JDBC catalog notes + +The JDBC catalog stores views in a dedicated `paimon_views` table that is created on first +initialization. A few things are worth knowing when running on top of an existing JDBC catalog: + +- **Required permissions on upgrade.** Upgrading to a Paimon release with view support requires + `CREATE TABLE` permission on the catalog database the first time the catalog is opened, so that + the `paimon_views` table can be created. Operators who tightened privileges to CRUD-only after + the initial deployment should either restore `CREATE TABLE` permission temporarily or create the + `paimon_views` table manually beforehand. +- **Table and view share the same identifier namespace.** A name cannot be used by both a table + and a view in the same database. `createTable`, `renameTable`, `createView` and `renameView` all + validate this invariant under the catalog lock; concurrent operations targeting the same + identifier will see exactly one winner. +- **Single-process atomicity does not depend on `lock.enabled`.** The JDBC catalog also keeps a + per-JVM stripe lock keyed by `(catalog key, database, object name)`, so the table-vs-view name + uniqueness invariant holds within one JVM even when `lock.enabled = false`. Setting + `lock.enabled = true` (with `lock.type = jdbc`) is still recommended for multi-process + deployments because the stripe lock only serializes operations within the same JVM. +- **Database visibility.** A database that contains only views (and no tables or properties) is + reported by `listDatabases` and `SHOW DATABASES`. `DROP DATABASE ... CASCADE` removes both the + tables and the views in that database; `DROP DATABASE` without `CASCADE` will reject databases + that still hold any view. +- **Cross-database rename.** `renameView(from, to)` and `renameTable(from, to)` raise an + `IllegalArgumentException` (`Database X does not exist.`) when the target database is missing, + matching the BadRequest semantics of the REST catalog. + ## Operations ### Create or replace view diff --git a/docs/docs/flink/sql-ddl.md b/docs/docs/flink/sql-ddl.md index 835bc376e120..4964a0cc18d2 100644 --- a/docs/docs/flink/sql-ddl.md +++ b/docs/docs/flink/sql-ddl.md @@ -150,6 +150,11 @@ JDBC catalog supports persistent Paimon views. View metadata is stored in the au `paimon_views` table in the catalog database. The view SQL is stored as catalog metadata and is not resolved when the view is created. +Within a single database, a name cannot be used by both a table and a view; all writers +(`createTable`, `renameTable`, `createView`, `renameView`) reject conflicting identifiers. See +[Views](../concepts/views#jdbc-catalog-notes) for upgrade-time permissions, single-process locking +semantics, and database visibility details. + ```sql CREATE VIEW sales_view AS SELECT name, amount FROM sales WHERE amount > 100; diff --git a/docs/docs/spark/sql-ddl.md b/docs/docs/spark/sql-ddl.md index 4e6b9566b528..aa9310b8b478 100644 --- a/docs/docs/spark/sql-ddl.md +++ b/docs/docs/spark/sql-ddl.md @@ -106,6 +106,11 @@ Paimon JDBC Catalog in Spark needs to correctly add the corresponding jar packag JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created `paimon_views` table in the catalog database. +Within a single database, a name cannot be used by both a table and a view; all writers +(`createTable`, `renameTable`, `createView`, `renameView`) reject conflicting identifiers. See +[Views](../concepts/views#jdbc-catalog-notes) for upgrade-time permissions, single-process locking +semantics, and database visibility details. + ```bash spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 605c33764946..888aed5719f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -54,6 +54,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.apache.paimon.shade.guava30.com.google.common.collect.Sets; +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,12 +100,21 @@ public class JdbcCatalog extends AbstractCatalog { public static final String PROPERTY_PREFIX = "jdbc."; private static final String DATABASE_EXISTS_PROPERTY = "exists"; + private static final int LOCAL_LOCK_STRIPES = 64; private final JdbcClientPool connections; private final String catalogKey; private final Options options; private final String warehouse; + /** + * Per-JVM stripe locks keyed by (catalogKey, database, objectName). They guarantee that the new + * table / view name validation and metadata write happen atomically within the same process + * even when the catalog-level distributed lock is disabled (e.g. {@code lock.enabled = false}). + */ + private static final Striped LOCAL_LOCKS = + Striped.lazyWeakLock(LOCAL_LOCK_STRIPES); + protected JdbcCatalog( FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { super(fileIO, context); @@ -392,10 +402,22 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx } break; case FORMAT_TABLE: - if (!validateTableNotExists(identifier, ignoreIfExists)) { - return; + try { + runWithLock( + identifier, + () -> { + if (!validateTableNotExists(identifier, ignoreIfExists)) { + return null; + } + createFormatTable(identifier, schema); + return null; + }); + } catch (TableAlreadyExistException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Failed to create table " + identifier.getFullName(), e); } - createFormatTable(identifier, schema); break; case OBJECT_TABLE: throw new UnsupportedOperationException( @@ -448,17 +470,10 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { - try { - runWithLock( - identifier, - () -> { - validateTableNotExists(identifier, false); - createTableImplWithLock(identifier, schema); - return null; - }); - } catch (Exception e) { - throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); - } + // Callers (currently only the overridden createTable above) are expected to invoke this + // method while already holding the catalog lock for `identifier`. Acquiring another + // distributed lock here would cause non-reentrant nested lock acquisition. + createTableImplWithLock(identifier, schema); } private void createTableImplWithLock(Identifier identifier, Schema schema) { @@ -527,26 +542,59 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore checkNotSystemTable(toTable, "renameTable"); try { - getTable(fromTable); - } catch (TableNotExistException e) { - if (ignoreIfNotExists) { - return; - } - throw new TableNotExistException(fromTable); - } + runWithLocks( + fromTable, + toTable, + () -> { + // Re-check existence inside the lock so that the validation is atomic + // with the metadata update below. This prevents races against + // concurrent createView / renameView / createTable targeting the same + // identifier (tables and views are stored in separate JDBC tables + // without a shared uniqueness constraint). + try { + getTable(fromTable); + } catch (TableNotExistException e) { + if (ignoreIfNotExists) { + return null; + } + throw new TableNotExistException(fromTable); + } - try { - getTable(toTable); - throw new TableAlreadyExistException(toTable); - } catch (TableNotExistException ignored) { - } + requireDatabaseExistsForWrite(toTable); - if (JdbcUtils.viewExists( - connections, catalogKey, toTable.getDatabaseName(), toTable.getObjectName())) { - throw new TableAlreadyExistException(toTable); - } + if (JdbcUtils.tableExists( + connections, + catalogKey, + toTable.getDatabaseName(), + toTable.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + toTable.getDatabaseName(), + toTable.getObjectName())) { + throw new TableAlreadyExistException(toTable); + } - renameTableImpl(fromTable, toTable); + renameTableImpl(fromTable, toTable); + return null; + }); + } catch (TableNotExistException | TableAlreadyExistException e) { + throw e; + } catch (IllegalArgumentException e) { + // Propagate "target database does not exist" verbatim to mirror renameView's + // behaviour and to match RESTCatalog's BadRequest semantics. + throw e; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + "Failed to rename table from " + + fromTable.getFullName() + + " to " + + toTable.getFullName(), + e); + } } @Override @@ -646,16 +694,26 @@ public Optional lockContext() { } public T runWithLock(Identifier identifier, Callable callable) throws Exception { - if (!lockEnabled()) { - return callable.call(); + // Always serialize same-identifier mutations within a single JVM via stripe locks. This + // keeps the table/view name uniqueness invariant intact even when the JDBC distributed + // lock is disabled, so that view operations never silently degrade to a non-atomic + // check-then-act. + java.util.concurrent.locks.Lock localLock = LOCAL_LOCKS.get(lockKey(identifier)); + localLock.lock(); + try { + if (!lockEnabled()) { + return callable.call(); + } + JdbcCatalogLock lock = + new JdbcCatalogLock( + connections, + catalogKey, + checkMaxSleep(options.toMap()), + acquireTimeout(options.toMap())); + return Lock.fromCatalog(lock, identifier).runWithLock(callable); + } finally { + localLock.unlock(); } - JdbcCatalogLock lock = - new JdbcCatalogLock( - connections, - catalogKey, - checkMaxSleep(options.toMap()), - acquireTimeout(options.toMap())); - return Lock.fromCatalog(lock, identifier).runWithLock(callable); } private T runWithLocks( @@ -677,7 +735,24 @@ private T runWithLocks( } private String lockKey(Identifier identifier) { - return identifier.getDatabaseName() + "\0" + identifier.getObjectName(); + return catalogKey + "\0" + identifier.getDatabaseName() + "\0" + identifier.getObjectName(); + } + + /** + * Validate that the target database of a write operation exists. + * + *

The check uses {@link JdbcUtils#databaseExists}, which considers both the catalog table + * properties and the view metadata table, so that this single helper can be shared by {@code + * createTable}, {@code createView}, {@code renameTable} and {@code renameView}. + * + *

An {@link IllegalArgumentException} is thrown for missing databases, mirroring the + * behavior of {@code RESTCatalog.renameView} for {@code BadRequest} errors. + */ + private void requireDatabaseExistsForWrite(Identifier identifier) { + if (!JdbcUtils.databaseExists(connections, catalogKey, identifier.getDatabaseName())) { + throw new IllegalArgumentException( + String.format("Database %s does not exist.", identifier.getDatabaseName())); + } } private RuntimeException viewOperationException( @@ -1075,13 +1150,7 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN throw new ViewNotExistException(fromView); } - if (!JdbcUtils.databaseExists( - connections, catalogKey, toView.getDatabaseName())) { - throw new IllegalArgumentException( - String.format( - "Database %s does not exist.", - toView.getDatabaseName())); - } + requireDatabaseExistsForWrite(toView); if (JdbcUtils.viewExists( connections, diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index d8cd98413876..351454436dda 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -71,12 +71,16 @@ public void setUp() throws Exception { } private JdbcCatalog initCatalog(Map props) { - Map properties = Maps.newHashMap(); - properties.put( - CatalogOptions.URI.key(), + return initCatalog( + props, "jdbc:sqlite:file:" + UUID.randomUUID().toString().replace("-", "") + "?mode=memory&cache=shared"); + } + + private JdbcCatalog initCatalog(Map props, String uri) { + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), uri); properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); @@ -349,6 +353,18 @@ private JdbcCatalog initCatalogWithSync(boolean syncAllProperties) { return initCatalog(props); } + private JdbcCatalog initCatalogWithoutLock() { + Map props = Maps.newHashMap(); + props.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return initCatalog(props); + } + + private JdbcCatalog initCatalogWithoutLock(String uri) { + Map props = Maps.newHashMap(); + props.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return initCatalog(props, uri); + } + private Map fetchTableProperties( JdbcCatalog jdbcCatalog, String databaseName, String tableName) { try { @@ -804,6 +820,24 @@ public void testRenameViewRejectsMissingTargetDatabase() throws Exception { .isInstanceOf(Catalog.ViewNotExistException.class); } + @Test + public void testRenameTableRejectsMissingTargetDatabase() throws Exception { + String sourceDatabase = "source_rename_table_db"; + Identifier source = Identifier.create(sourceDatabase, "table_name"); + Identifier target = Identifier.create("missing_target_db", "table_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createTable(source, DEFAULT_TABLE_SCHEMA, false); + + assertThatThrownBy(() -> catalog.renameTable(source, target, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database missing_target_db does not exist."); + // Source table must remain intact. + assertDoesNotThrow(() -> catalog.getTable(source)); + assertThatThrownBy(() -> catalog.getTable(target)) + .isInstanceOf(Catalog.TableNotExistException.class); + } + @Test public void testTableAndViewCannotShareName() throws Exception { String databaseName = "shared_name_db"; @@ -931,6 +965,215 @@ public void testConcurrentCreateTableAndViewCannotShareName() throws Exception { } } + @Test + public void testConcurrentRenameTableAndCreateViewCannotShareName() throws Exception { + String databaseName = "concurrent_rename_table_view_db"; + Identifier source = Identifier.create(databaseName, "src_table"); + Identifier target = Identifier.create(databaseName, "shared_name"); + View view = createView(target); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + catalog.createTable(source, DEFAULT_TABLE_SCHEMA, false); + + Callable renameTable = + () -> { + try { + catalog.renameTable(source, target, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + catalog.createView(target, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableRenamed = executor.submit(renameTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableRenamed.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + // The shared identifier must be either a table or a view, but never both. + assertThat(catalog.listTables(databaseName).contains(target.getObjectName())) + .isNotEqualTo(catalog.listViews(databaseName).contains(target.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentRenameTableAndRenameViewCannotShareName() throws Exception { + String databaseName = "concurrent_rename_collision_db"; + Identifier sourceTable = Identifier.create(databaseName, "src_table"); + Identifier sourceView = Identifier.create(databaseName, "src_view"); + Identifier target = Identifier.create(databaseName, "shared_name"); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + catalog.createTable(sourceTable, DEFAULT_TABLE_SCHEMA, false); + catalog.createView(sourceView, createView(sourceView), false); + + Callable renameTable = + () -> { + try { + catalog.renameTable(sourceTable, target, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable renameView = + () -> { + try { + catalog.renameView(sourceView, target, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableRenamed = executor.submit(renameTable); + Future viewRenamed = executor.submit(renameView); + + assertThat(ImmutableList.of(tableRenamed.get(), viewRenamed.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listTables(databaseName).contains(target.getObjectName())) + .isNotEqualTo(catalog.listViews(databaseName).contains(target.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentCreateTableAndViewCannotShareNameWithoutDistributedLock() + throws Exception { + // Even when the JDBC distributed lock is disabled, mutations targeting the same + // identifier within a single JVM must remain atomic. This is provided by the per-JVM + // local stripe lock. + JdbcCatalog noLockCatalog = initCatalogWithoutLock(); + try { + String databaseName = "no_lock_concurrent_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + noLockCatalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + noLockCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + noLockCatalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat( + noLockCatalog + .listTables(databaseName) + .contains(identifier.getObjectName())) + .isNotEqualTo( + noLockCatalog + .listViews(databaseName) + .contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } finally { + noLockCatalog.close(); + } + } + + @Test + public void testConcurrentCreateTableAndViewCannotShareNameAcrossCatalogInstancesWithoutLock() + throws Exception { + String uri = + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"; + JdbcCatalog tableCatalog = initCatalogWithoutLock(uri); + JdbcCatalog viewCatalog = initCatalogWithoutLock(uri); + try { + String databaseName = "no_lock_two_catalogs_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + tableCatalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + tableCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + viewCatalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat( + tableCatalog + .listTables(databaseName) + .contains(identifier.getObjectName())) + .isNotEqualTo( + viewCatalog + .listViews(databaseName) + .contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } finally { + tableCatalog.close(); + viewCatalog.close(); + } + } + @Test public void testAlterView() throws Exception { Identifier identifier = Identifier.create("alter_view_db", "my_view");