diff --git a/docs/docs/concepts/catalog.md b/docs/docs/concepts/catalog.md index 3eb9f2c13570..b6f60dbdd280 100644 --- a/docs/docs/concepts/catalog.md +++ b/docs/docs/concepts/catalog.md @@ -92,3 +92,9 @@ CREATE CATALOG my_jdbc WITH ( 'warehouse' = 'hdfs:///path/to/warehouse' ); ``` + +The JDBC catalog also persists Paimon views. View metadata is stored in an automatically created +`paimon_views` table, and table/view names share a single namespace per database (a name cannot be +used by both a table and a view at the same time). See [Views](./views) for details on the JDBC +catalog upgrade requirements, single-process locking semantics, and behavior of `DROP DATABASE` +against view-only databases. diff --git a/docs/docs/concepts/views.md b/docs/docs/concepts/views.md index 8f0559fde9c4..016171b01ced 100644 --- a/docs/docs/concepts/views.md +++ b/docs/docs/concepts/views.md @@ -36,10 +36,11 @@ View metadata is persisted only when the catalog implementation supports it: - **Hive metastore catalog** – view metadata is stored together with table metadata inside the metastore warehouse. - **REST catalog** – view metadata is kept in the REST backend and exposed through the catalog API. +- **JDBC catalog** – view metadata is stored in the catalog database in the `paimon_views` + metadata table. The table is created automatically when the JDBC catalog is initialized. File-system catalogs do not currently support views because they lack persistent metadata storage. - ### Representation structure | Field | Type | Description | @@ -50,6 +51,33 @@ File-system catalogs do not currently support views because they lack persistent Multiple representations can be stored for the same version so that different engines can access the view using their native dialect. +### JDBC catalog notes + +The JDBC catalog stores views in a dedicated `paimon_views` table that is created on first +initialization. A few things are worth knowing when running on top of an existing JDBC catalog: + +- **Required permissions on upgrade.** Upgrading to a Paimon release with view support requires + `CREATE TABLE` permission on the catalog database the first time the catalog is opened, so that + the `paimon_views` table can be created. Operators who tightened privileges to CRUD-only after + the initial deployment should either restore `CREATE TABLE` permission temporarily or create the + `paimon_views` table manually beforehand. +- **Table and view share the same identifier namespace.** A name cannot be used by both a table + and a view in the same database. `createTable`, `renameTable`, `createView` and `renameView` all + validate this invariant under the catalog lock; concurrent operations targeting the same + identifier will see exactly one winner. +- **Single-process atomicity does not depend on `lock.enabled`.** The JDBC catalog also keeps a + per-JVM stripe lock keyed by `(catalog key, database, object name)`, so the table-vs-view name + uniqueness invariant holds within one JVM even when `lock.enabled = false`. Setting + `lock.enabled = true` (with `lock.type = jdbc`) is still recommended for multi-process + deployments because the stripe lock only serializes operations within the same JVM. +- **Database visibility.** A database that contains only views (and no tables or properties) is + reported by `listDatabases` and `SHOW DATABASES`. `DROP DATABASE ... CASCADE` removes both the + tables and the views in that database; `DROP DATABASE` without `CASCADE` will reject databases + that still hold any view. +- **Cross-database rename.** `renameView(from, to)` and `renameTable(from, to)` raise an + `IllegalArgumentException` (`Database X does not exist.`) when the target database is missing, + matching the BadRequest semantics of the REST catalog. + ## Operations ### Create or replace view @@ -57,6 +85,10 @@ view using their native dialect. Use `CREATE VIEW` or `CREATE OR REPLACE VIEW` to register a view. Paimon assigns a UUID, writes the first metadata file, and records version `1`. +The catalog stores the view definition. It does not resolve or validate referenced tables when the +view is created, so missing or cross-database table references are checked by the compute engine when +the view is queried. + ```sql CREATE VIEW sales_view AS SELECT region, SUM(amount) AS total_amount diff --git a/docs/docs/flink/sql-ddl.md b/docs/docs/flink/sql-ddl.md index 7708af840afc..4964a0cc18d2 100644 --- a/docs/docs/flink/sql-ddl.md +++ b/docs/docs/flink/sql-ddl.md @@ -146,6 +146,23 @@ You can also perform logical isolation for databases under multiple catalogs by Additionally, when creating a JdbcCatalog, you can specify the maximum length for the lock key by configuring "lock-key-max-length," which defaults to 255. Since this value is a combination of {catalog-key}.{database-name}.{table-name}, please adjust accordingly. +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. The view SQL is stored as catalog metadata and is not +resolved when the view is created. + +Within a single database, a name cannot be used by both a table and a view; all writers +(`createTable`, `renameTable`, `createView`, `renameView`) reject conflicting identifiers. See +[Views](../concepts/views#jdbc-catalog-notes) for upgrade-time permissions, single-process locking +semantics, and database visibility details. + +```sql +CREATE VIEW sales_view AS SELECT name, amount FROM sales WHERE amount > 100; + +SHOW VIEWS; + +DROP VIEW sales_view; +``` + You can define any default table options with the prefix `table-default.` for tables created in the catalog. ## Create Table diff --git a/docs/docs/spark/sql-ddl.md b/docs/docs/spark/sql-ddl.md index 66ac0bc8b1cb..aa9310b8b478 100644 --- a/docs/docs/spark/sql-ddl.md +++ b/docs/docs/spark/sql-ddl.md @@ -103,6 +103,14 @@ Paimon JDBC Catalog in Spark needs to correctly add the corresponding jar packag | mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | | postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. + +Within a single database, a name cannot be used by both a table and a view; all writers +(`createTable`, `renameTable`, `createView`, `renameView`) reject conflicting identifiers. See +[Views](../concepts/views#jdbc-catalog-notes) for upgrade-time permissions, single-process locking +semantics, and database visibility details. + ```bash spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ @@ -375,7 +383,7 @@ CREATE TABLE target_tbl LIKE source_tbl; ## View Views are based on the result-set of an SQL query, when using `org.apache.paimon.spark.SparkCatalog`, views are managed by paimon itself. -And in this case, views are supported when the `metastore` type is `hive` or `rest`. +And in this case, views are supported when the `metastore` type is `hive`, `rest` or `jdbc`. ### Create Or Replace View diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..888aed5719f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -19,17 +19,22 @@ package org.apache.paimon.jdbc; import org.apache.paimon.CoreOptions; +import org.apache.paimon.PagedList; +import org.apache.paimon.TableType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.jdbc.JdbcUtils.JdbcViewConflictException; +import org.apache.paimon.jdbc.JdbcUtils.JdbcViewConflictKind; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -37,13 +42,19 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.view.ViewSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.apache.paimon.shade.guava30.com.google.common.collect.Sets; +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,14 +69,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties; @@ -85,12 +100,21 @@ public class JdbcCatalog extends AbstractCatalog { public static final String PROPERTY_PREFIX = "jdbc."; private static final String DATABASE_EXISTS_PROPERTY = "exists"; + private static final int LOCAL_LOCK_STRIPES = 64; private final JdbcClientPool connections; private final String catalogKey; private final Options options; private final String warehouse; + /** + * Per-JVM stripe locks keyed by (catalogKey, database, objectName). They guarantee that the new + * table / view name validation and metadata write happen atomically within the same process + * even when the catalog-level distributed lock is disabled (e.g. {@code lock.enabled = false}). + */ + private static final Striped LOCAL_LOCKS = + Striped.lazyWeakLock(LOCAL_LOCK_STRIPES); + protected JdbcCatalog( FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { super(fileIO, context); @@ -129,39 +153,66 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = - dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null); - if (tableExists.next()) { - return true; + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); }); // Check and create database properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) - .execute(); }); // Check and create table properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE)) { + return statement.execute(); + } + }); + + // Check and create view table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.VIEW_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_VIEW_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE).execute(); }); // if lock enabled, Check and create distributed lock table. @@ -194,6 +245,11 @@ public List listDatabases() { row -> row.getString(JdbcUtils.DATABASE_NAME), JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL, catalogKey)); + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.VIEW_DATABASE), + JdbcUtils.LIST_ALL_VIEW_DATABASES_SQL, + catalogKey)); return databases.stream().distinct().collect(Collectors.toList()); } @@ -226,6 +282,26 @@ protected void createDatabaseImpl(String name, Map properties) { insertProperties(connections, catalogKey, name, createProps); } + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + checkNotSystemDatabase(name); + try { + getDatabase(name); + } catch (DatabaseNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw new DatabaseNotExistException(name); + } + + if (!cascade && (!listTables(name).isEmpty() || !listViews(name).isEmpty())) { + throw new DatabaseNotEmptyException(name); + } + + dropDatabaseImpl(name); + } + @Override protected void dropDatabaseImpl(String name) { // Delete table from paimon_tables @@ -240,6 +316,8 @@ protected void dropDatabaseImpl(String name) { catalogKey, name); } + // Delete views from paimon_views. + execute(connections, JdbcUtils.DELETE_VIEWS_SQL, catalogKey, name); } @Override @@ -290,6 +368,65 @@ protected List listTablesImpl(String databaseName) { databaseName); } + @Override + public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + checkNotBranch(identifier, "createTable"); + checkNotSystemTable(identifier, "createTable"); + validateCreateTable(schema, false); + validateCustomTablePath(schema.options()); + + getDatabase(identifier.getDatabaseName()); + + copyTableDefaultOptions(schema.options()); + + TableType tableType = Options.fromMap(schema.options()).get(TYPE); + switch (tableType) { + case TABLE: + case MATERIALIZED_TABLE: + try { + runWithLock( + identifier, + () -> { + if (!validateTableNotExists(identifier, ignoreIfExists)) { + return null; + } + createTableImplWithLock(identifier, schema); + return null; + }); + } catch (TableAlreadyExistException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Failed to create table " + identifier.getFullName(), e); + } + break; + case FORMAT_TABLE: + try { + runWithLock( + identifier, + () -> { + if (!validateTableNotExists(identifier, ignoreIfExists)) { + return null; + } + createFormatTable(identifier, schema); + return null; + }); + } catch (TableAlreadyExistException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Failed to create table " + identifier.getFullName(), e); + } + break; + case OBJECT_TABLE: + throw new UnsupportedOperationException( + String.format( + "Catalog %s cannot support object tables.", + this.getClass().getName())); + } + } + @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { @@ -333,11 +470,17 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { + // Callers (currently only the overridden createTable above) are expected to invoke this + // method while already holding the catalog lock for `identifier`. Acquiring another + // distributed lock here would cause non-reentrant nested lock acquisition. + createTableImplWithLock(identifier, schema); + } + + private void createTableImplWithLock(Identifier identifier, Schema schema) { try { // create table file SchemaManager schemaManager = getSchemaManager(identifier); - TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); + TableSchema tableSchema = schemaManager.createTable(schema); // Update schema metadata Path path = getTableLocation(identifier); if (JdbcUtils.insertTable( @@ -370,6 +513,90 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } + private boolean validateTableNotExists(Identifier identifier, boolean ignoreIfExists) + throws TableAlreadyExistException { + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return false; + } + throw new TableAlreadyExistException(identifier); + } + return true; + } + + @Override + public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + checkNotBranch(fromTable, "renameTable"); + checkNotBranch(toTable, "renameTable"); + checkNotSystemTable(fromTable, "renameTable"); + checkNotSystemTable(toTable, "renameTable"); + + try { + runWithLocks( + fromTable, + toTable, + () -> { + // Re-check existence inside the lock so that the validation is atomic + // with the metadata update below. This prevents races against + // concurrent createView / renameView / createTable targeting the same + // identifier (tables and views are stored in separate JDBC tables + // without a shared uniqueness constraint). + try { + getTable(fromTable); + } catch (TableNotExistException e) { + if (ignoreIfNotExists) { + return null; + } + throw new TableNotExistException(fromTable); + } + + requireDatabaseExistsForWrite(toTable); + + if (JdbcUtils.tableExists( + connections, + catalogKey, + toTable.getDatabaseName(), + toTable.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + toTable.getDatabaseName(), + toTable.getObjectName())) { + throw new TableAlreadyExistException(toTable); + } + + renameTableImpl(fromTable, toTable); + return null; + }); + } catch (TableNotExistException | TableAlreadyExistException e) { + throw e; + } catch (IllegalArgumentException e) { + // Propagate "target database does not exist" verbatim to mirror renameView's + // behaviour and to match RESTCatalog's BadRequest semantics. + throw e; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + "Failed to rename table from " + + fromTable.getFullName() + + " to " + + toTable.getFullName(), + e); + } + } + @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { @@ -467,16 +694,74 @@ public Optional lockContext() { } public T runWithLock(Identifier identifier, Callable callable) throws Exception { - if (!lockEnabled()) { - return callable.call(); + // Always serialize same-identifier mutations within a single JVM via stripe locks. This + // keeps the table/view name uniqueness invariant intact even when the JDBC distributed + // lock is disabled, so that view operations never silently degrade to a non-atomic + // check-then-act. + java.util.concurrent.locks.Lock localLock = LOCAL_LOCKS.get(lockKey(identifier)); + localLock.lock(); + try { + if (!lockEnabled()) { + return callable.call(); + } + JdbcCatalogLock lock = + new JdbcCatalogLock( + connections, + catalogKey, + checkMaxSleep(options.toMap()), + acquireTimeout(options.toMap())); + return Lock.fromCatalog(lock, identifier).runWithLock(callable); + } finally { + localLock.unlock(); } - JdbcCatalogLock lock = - new JdbcCatalogLock( - connections, - catalogKey, - checkMaxSleep(options.toMap()), - acquireTimeout(options.toMap())); - return Lock.fromCatalog(lock, identifier).runWithLock(callable); + } + + private T runWithLocks( + Identifier firstIdentifier, Identifier secondIdentifier, Callable callable) + throws Exception { + if (firstIdentifier.equals(secondIdentifier)) { + return runWithLock(firstIdentifier, callable); + } + + Identifier firstLock = firstIdentifier; + Identifier secondLock = secondIdentifier; + if (lockKey(firstIdentifier).compareTo(lockKey(secondIdentifier)) > 0) { + firstLock = secondIdentifier; + secondLock = firstIdentifier; + } + + Identifier nestedLock = secondLock; + return runWithLock(firstLock, () -> runWithLock(nestedLock, callable)); + } + + private String lockKey(Identifier identifier) { + return catalogKey + "\0" + identifier.getDatabaseName() + "\0" + identifier.getObjectName(); + } + + /** + * Validate that the target database of a write operation exists. + * + *

The check uses {@link JdbcUtils#databaseExists}, which considers both the catalog table + * properties and the view metadata table, so that this single helper can be shared by {@code + * createTable}, {@code createView}, {@code renameTable} and {@code renameView}. + * + *

An {@link IllegalArgumentException} is thrown for missing databases, mirroring the + * behavior of {@code RESTCatalog.renameView} for {@code BadRequest} errors. + */ + private void requireDatabaseExistsForWrite(Identifier identifier) { + if (!JdbcUtils.databaseExists(connections, catalogKey, identifier.getDatabaseName())) { + throw new IllegalArgumentException( + String.format("Database %s does not exist.", identifier.getDatabaseName())); + } + } + + private RuntimeException viewOperationException( + String operation, Identifier identifier, Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return new RuntimeException( + "Failed to " + operation + " view " + identifier.getFullName(), e); } @Override @@ -576,6 +861,19 @@ private boolean syncTableProperties() { return options.get(CatalogOptions.SYNC_ALL_PROPERTIES); } + private void copyTableDefaultOptions(Map tableOptions) { + tableDefaultOptions.forEach(tableOptions::putIfAbsent); + } + + private void validateCustomTablePath(Map tableOptions) { + if (!allowCustomTablePath() && tableOptions.containsKey(PATH.key())) { + throw new UnsupportedOperationException( + String.format( + "The current catalog %s does not support specifying the table path when creating a table.", + this.getClass().getSimpleName())); + } + } + private Map convertToPropertiesTableKey(TableSchema tableSchema) { Map properties = new HashMap<>(); if (!tableSchema.primaryKeys().isEmpty()) { @@ -646,4 +944,346 @@ private List fetch(RowProducer toRow, String sql, String... args) { throw new RuntimeException("Interrupted in SQL query", e); } } + + // ======================= view methods =============================== + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + try { + String viewSchemaJson = + JdbcUtils.getViewSchema( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (viewSchemaJson == null) { + throw new ViewNotExistException(identifier); + } + + ViewSchema viewSchema = JsonSerdeUtil.fromJson(viewSchemaJson, ViewSchema.class); + return new ViewImpl( + identifier, + viewSchema.fields(), + viewSchema.query(), + viewSchema.dialects(), + viewSchema.comment(), + viewSchema.options()); + } catch (SQLException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException("Failed to get view " + identifier.getFullName(), e); + } + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + // Check if database exists + getDatabase(identifier.getDatabaseName()); + + // Serialize view schema to JSON + ViewSchema viewSchema = + new ViewSchema( + view.rowType().getFields(), + view.query(), + view.dialects(), + view.comment().orElse(null), + view.options()); + String viewSchemaJson = JsonSerdeUtil.toJson(viewSchema); + + // Insert view + try { + runWithLock( + identifier, + () -> { + if (!validateViewNotExists(identifier, ignoreIfExists)) { + return null; + } + JdbcUtils.insertView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + return null; + }); + } catch (ViewAlreadyExistException e) { + throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.ALREADY_EXISTS) { + throw new ViewAlreadyExistException(identifier, e); + } + throw viewOperationException("create", identifier, e); + } catch (Exception e) { + throw viewOperationException("create", identifier, e); + } + } + + private boolean validateViewNotExists(Identifier identifier, boolean ignoreIfExists) + throws ViewAlreadyExistException { + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()) + || JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return false; + } + throw new ViewAlreadyExistException(identifier); + } + return true; + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + try { + runWithLock( + identifier, + () -> { + if (!JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfNotExists) { + return null; + } + throw new ViewNotExistException(identifier); + } + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_VIEW_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (deletedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to drop view %s: affected %d rows", + identifier.getFullName(), deletedRecords)); + } + return null; + }); + } catch (ViewNotExistException e) { + throw e; + } catch (Exception e) { + throw viewOperationException("drop", identifier, e); + } + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + if (CatalogUtils.isSystemDatabase(databaseName)) { + return Collections.emptyList(); + } + + // Check if database exists + if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + + return fetch( + row -> row.getString(JdbcUtils.VIEW_NAME), + JdbcUtils.LIST_VIEWS_SQL, + catalogKey, + databaseName); + } + + // TODO: Implement actual paging and pattern filtering + @Override + public PagedList listViewsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + CatalogUtils.validateNamePattern(this, viewNamePattern); + return new PagedList<>(listViews(databaseName), null); + } + + @Override + public PagedList listViewDetailsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + PagedList pagedViews = + listViewsPaged(databaseName, maxResults, pageToken, viewNamePattern); + return new PagedList<>( + pagedViews.getElements().stream() + .map( + viewName -> { + try { + return getView(Identifier.create(databaseName, viewName)); + } catch (ViewNotExistException ignored) { + LOG.warn( + "view {}.{} does not exist", + databaseName, + viewName); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + pagedViews.getNextPageToken()); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + try { + runWithLocks( + fromView, + toView, + () -> { + if (!JdbcUtils.viewExists( + connections, + catalogKey, + fromView.getDatabaseName(), + fromView.getObjectName())) { + if (ignoreIfNotExists) { + return null; + } + throw new ViewNotExistException(fromView); + } + + requireDatabaseExistsForWrite(toView); + + if (JdbcUtils.viewExists( + connections, + catalogKey, + toView.getDatabaseName(), + toView.getObjectName()) + || JdbcUtils.tableExists( + connections, + catalogKey, + toView.getDatabaseName(), + toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + + JdbcUtils.renameView(connections, catalogKey, fromView, toView); + return null; + }); + } catch (ViewAlreadyExistException | ViewNotExistException e) { + throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.ALREADY_EXISTS) { + throw new ViewAlreadyExistException(toView, e); + } else if (e.kind() == JdbcViewConflictKind.NOT_EXISTS) { + throw new ViewNotExistException(fromView, e); + } + throw new RuntimeException( + "Failed to rename view from " + + fromView.getFullName() + + " to " + + toView.getFullName(), + e); + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException( + "Failed to rename view from " + + fromView.getFullName() + + " to " + + toView.getFullName(), + e); + } + } + + @Override + public void alterView( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { + try { + runWithLock( + identifier, + () -> { + View existingView; + try { + existingView = getView(identifier); + } catch (ViewNotExistException e) { + if (ignoreIfNotExists) { + return null; + } + throw e; + } + + Map newOptions = new HashMap<>(existingView.options()); + String newComment = existingView.comment().orElse(null); + Map newDialects = new HashMap<>(existingView.dialects()); + for (ViewChange change : changes) { + if (change instanceof ViewChange.SetViewOption) { + ViewChange.SetViewOption setOption = + (ViewChange.SetViewOption) change; + newOptions.put(setOption.key(), setOption.value()); + } else if (change instanceof ViewChange.RemoveViewOption) { + ViewChange.RemoveViewOption removeOption = + (ViewChange.RemoveViewOption) change; + newOptions.remove(removeOption.key()); + } else if (change instanceof ViewChange.UpdateViewComment) { + ViewChange.UpdateViewComment updateComment = + (ViewChange.UpdateViewComment) change; + newComment = updateComment.comment(); + } else if (change instanceof ViewChange.AddDialect) { + ViewChange.AddDialect addDialect = (ViewChange.AddDialect) change; + if (newDialects.containsKey(addDialect.dialect())) { + throw new DialectAlreadyExistException( + identifier, addDialect.dialect()); + } + newDialects.put(addDialect.dialect(), addDialect.query()); + } else if (change instanceof ViewChange.UpdateDialect) { + ViewChange.UpdateDialect updateDialect = + (ViewChange.UpdateDialect) change; + if (!newDialects.containsKey(updateDialect.dialect())) { + throw new DialectNotExistException( + identifier, updateDialect.dialect()); + } + newDialects.put(updateDialect.dialect(), updateDialect.query()); + } else if (change instanceof ViewChange.DropDialect) { + ViewChange.DropDialect dropDialect = + (ViewChange.DropDialect) change; + if (!newDialects.containsKey(dropDialect.dialect())) { + throw new DialectNotExistException( + identifier, dropDialect.dialect()); + } + newDialects.remove(dropDialect.dialect()); + } + } + + ViewSchema updatedSchema = + new ViewSchema( + existingView.rowType().getFields(), + existingView.query(), + newDialects, + newComment, + newOptions); + String viewSchemaJson = JsonSerdeUtil.toJson(updatedSchema); + JdbcUtils.updateView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + return null; + }); + } catch (ViewNotExistException + | DialectAlreadyExistException + | DialectNotExistException e) { + throw e; + } catch (JdbcViewConflictException e) { + if (e.kind() == JdbcViewConflictKind.NOT_EXISTS) { + throw new ViewNotExistException(identifier, e); + } + throw viewOperationException("alter", identifier, e); + } catch (Exception e) { + throw viewOperationException("alter", identifier, e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..b2e00eb4b3c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.Options; @@ -40,6 +41,27 @@ /** Util for jdbc catalog. */ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + private static final String UNIQUE_CONSTRAINT_VIOLATION_STATE = "23505"; + + enum JdbcViewConflictKind { + ALREADY_EXISTS, + NOT_EXISTS + } + + /** Internal exception for JDBC view conflict (unique constraint violation or not-found). */ + static class JdbcViewConflictException extends RuntimeException { + private final JdbcViewConflictKind kind; + + JdbcViewConflictException(JdbcViewConflictKind kind, String message) { + super(message); + this.kind = kind; + } + + JdbcViewConflictKind kind() { + return kind; + } + } + public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_KEY = "catalog_key"; public static final String TABLE_DATABASE = "database_name"; @@ -331,6 +353,137 @@ public class JdbcUtils { static final String ACQUIRED_AT = "acquired_at"; static final String EXPIRE_TIME = "expire_time_seconds"; + // View table + public static final String VIEW_TABLE_NAME = "paimon_views"; + public static final String VIEW_DATABASE = "database_name"; + public static final String VIEW_NAME = "view_name"; + public static final String VIEW_SCHEMA = "view_schema"; + + static final String CREATE_VIEW_TABLE = + "CREATE TABLE " + + VIEW_TABLE_NAME + + "(" + + CATALOG_KEY + + " VARCHAR(255) NOT NULL," + + VIEW_DATABASE + + " VARCHAR(255) NOT NULL," + + VIEW_NAME + + " VARCHAR(255) NOT NULL," + + VIEW_SCHEMA + + " TEXT NOT NULL," + + " PRIMARY KEY (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ")" + + ")"; + + static final String GET_VIEW_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String GET_VIEW_DATABASE_SQL = + "SELECT " + + VIEW_DATABASE + + " FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? LIMIT 1"; + + static final String LIST_VIEWS_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String INSERT_VIEW_SQL = + "INSERT INTO " + + VIEW_TABLE_NAME + + " (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ", " + + VIEW_SCHEMA + + ") " + + " VALUES (?,?,?,?)"; + + static final String DROP_VIEW_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String DELETE_VIEWS_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String RENAME_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_DATABASE + + " = ? , " + + VIEW_NAME + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String UPDATE_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_SCHEMA + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String LIST_ALL_VIEW_DATABASES_SQL = + "SELECT DISTINCT " + + VIEW_DATABASE + + " FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ?"; + public static Properties extractJdbcConfiguration( Map properties, String prefix) { Properties result = new Properties(); @@ -372,9 +525,7 @@ public static void updateTable( int updatedRecords = execute( err -> { - if (err instanceof SQLIntegrityConstraintViolationException - || (err.getMessage() != null - && err.getMessage().contains("constraint failed"))) { + if (isUniqueConstraintViolation(err)) { throw new RuntimeException( String.format("Table already exists: %s", toTable)); } @@ -408,6 +559,9 @@ public static boolean databaseExists( if (exists(connections, JdbcUtils.GET_DATABASE_PROPERTIES_SQL, storeKey, databaseName)) { return true; } + if (exists(connections, JdbcUtils.GET_VIEW_DATABASE_SQL, storeKey, databaseName)) { + return true; + } return false; } @@ -467,6 +621,7 @@ public static int execute( sqlErrorHandler.accept(e); throw new RuntimeException(String.format("Failed to execute: %s", sql), e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted in SQL command", e); } } @@ -678,4 +833,131 @@ private static String deletePropertiesStatement(Set properties) { return sqlStatement.toString(); } + + /** Check if view exists. */ + public static boolean viewExists( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) { + return exists(connections, JdbcUtils.GET_VIEW_SQL, storeKey, databaseName, viewName); + } + + /** Get view schema JSON. */ + public static String getViewSchema( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtils.GET_VIEW_SQL)) { + sql.setString(1, storeKey); + sql.setString(2, databaseName); + sql.setString(3, viewName); + try (ResultSet rs = sql.executeQuery()) { + if (rs.next()) { + return rs.getString(VIEW_SCHEMA); + } + } + return null; + } + }); + } + + /** Insert view. */ + public static void insertView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int insertedRecords = + execute( + err -> { + if (isUniqueConstraintViolation(err)) { + throw new JdbcViewConflictException( + JdbcViewConflictKind.ALREADY_EXISTS, + String.format( + "View already exists: %s.%s", + databaseName, viewName)); + } + }, + connections, + JdbcUtils.INSERT_VIEW_SQL, + storeKey, + databaseName, + viewName, + viewSchemaJson); + + if (insertedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to insert view %s.%s: affected %d rows", + databaseName, viewName, insertedRecords)); + } + } + + /** Update view. */ + public static void updateView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int updatedRecords = + execute( + connections, + JdbcUtils.UPDATE_VIEW_SQL, + viewSchemaJson, + storeKey, + databaseName, + viewName); + + if (updatedRecords == 0) { + throw new JdbcViewConflictException( + JdbcViewConflictKind.NOT_EXISTS, + String.format("View does not exist: %s.%s", databaseName, viewName)); + } else if (updatedRecords != 1) { + LOG.warn( + "Update operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } + + /** Rename view. */ + public static void renameView( + JdbcClientPool connections, String storeKey, Identifier fromView, Identifier toView) { + int updatedRecords = + execute( + err -> { + if (isUniqueConstraintViolation(err)) { + throw new JdbcViewConflictException( + JdbcViewConflictKind.ALREADY_EXISTS, + String.format("View already exists: %s", toView)); + } + }, + connections, + JdbcUtils.RENAME_VIEW_SQL, + toView.getDatabaseName(), + toView.getObjectName(), + storeKey, + fromView.getDatabaseName(), + fromView.getObjectName()); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", fromView, toView); + } else if (updatedRecords == 0) { + throw new JdbcViewConflictException( + JdbcViewConflictKind.NOT_EXISTS, + String.format("View does not exist: %s", fromView)); + } else { + LOG.warn( + "Rename operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } + + @VisibleForTesting + static boolean isUniqueConstraintViolation(SQLException err) { + String message = err.getMessage(); + return err instanceof SQLIntegrityConstraintViolationException + || UNIQUE_CONSTRAINT_VIOLATION_STATE.equals(err.getSQLState()) + || (message != null && message.contains("constraint failed")); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index fd3c6fdc5950..351454436dda 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -29,7 +29,11 @@ import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -41,11 +45,17 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -61,10 +71,16 @@ public void setUp() throws Exception { } private JdbcCatalog initCatalog(Map props) { + return initCatalog( + props, + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); + } + + private JdbcCatalog initCatalog(Map props, String uri) { Map properties = Maps.newHashMap(); - properties.put( - CatalogOptions.URI.key(), - "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + properties.put(CatalogOptions.URI.key(), uri); properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); @@ -146,6 +162,27 @@ protected boolean supportsReplaceTable() { return false; } + @Override + protected boolean supportsView() { + return true; + } + + @Test + public void testUniqueConstraintViolationDetection() { + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLIntegrityConstraintViolationException("duplicate entry"))) + .isTrue(); + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLException("duplicate key", "23505"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("constraint failed"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("syntax error"))) + .isFalse(); + } + @Test public void testRepairTableNotExist() throws Exception { String databaseName = "repair_db"; @@ -316,6 +353,18 @@ private JdbcCatalog initCatalogWithSync(boolean syncAllProperties) { return initCatalog(props); } + private JdbcCatalog initCatalogWithoutLock() { + Map props = Maps.newHashMap(); + props.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return initCatalog(props); + } + + private JdbcCatalog initCatalogWithoutLock(String uri) { + Map props = Maps.newHashMap(); + props.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return initCatalog(props, uri); + } + private Map fetchTableProperties( JdbcCatalog jdbcCatalog, String databaseName, String tableName) { try { @@ -345,6 +394,34 @@ private Map fetchTableProperties( } } + private String fetchViewSchema(JdbcCatalog jdbcCatalog, String databaseName, String viewName) { + try { + return JdbcUtils.getViewSchema( + jdbcCatalog.getConnections(), + jdbcCatalog.getCatalogKey(), + databaseName, + viewName); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private boolean tableExists(JdbcCatalog jdbcCatalog, String tableName) { + try { + return jdbcCatalog + .getConnections() + .run( + conn -> { + try (ResultSet rs = + conn.getMetaData().getTables(null, null, tableName, null)) { + return rs.next(); + } + }); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Test public void testTablePropertiesSyncOnCreate() throws Exception { JdbcCatalog syncCatalog = initCatalogWithSync(true); @@ -625,4 +702,553 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + @Test + public void testViewTableCreatedOnInitialize() { + assertThat(tableExists((JdbcCatalog) catalog, JdbcUtils.VIEW_TABLE_NAME)).isTrue(); + } + + @Test + public void testCreateViewStoresViewSchemaWithoutResolvingReferencedTable() throws Exception { + String databaseName = "view_schema_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_missing_table"); + View view = createView(identifier); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_missing_table"); + assertThat(viewSchemaJson).contains("\"query\"").contains("\"SELECT * FROM OTHER_TABLE\""); + assertThat(catalog.getView(identifier).query()).isEqualTo("SELECT * FROM OTHER_TABLE"); + } + + @Test + public void testCreateViewStoresCrossDatabaseQueryWithoutResolvingReferencedTable() + throws Exception { + String databaseName = "cross_database_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_other_database"); + String query = "SELECT * FROM other_database.missing_table"; + View baseView = createView(identifier); + View view = + new ViewImpl( + identifier, + baseView.rowType().getFields(), + query, + baseView.dialects(), + baseView.comment().orElse(null), + baseView.options()); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_other_database"); + assertThat(viewSchemaJson).contains("\"query\"").contains(query); + assertThat(catalog.getView(identifier).query()).isEqualTo(query); + } + + @Test + public void testDropDatabaseCleansViewMetadata() throws Exception { + String databaseName = "drop_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + catalog.dropDatabase(databaseName, false, true); + + assertThat( + fetchViewSchema( + (JdbcCatalog) catalog, + identifier.getDatabaseName(), + identifier.getObjectName())) + .isNull(); + } + + @Test + public void testDropDatabaseWithoutCascadeRejectsViewOnlyDatabase() throws Exception { + String databaseName = "view_only_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + + assertThatThrownBy(() -> catalog.dropDatabase(databaseName, false, false)) + .isInstanceOf(Catalog.DatabaseNotEmptyException.class) + .hasMessage("Database " + databaseName + " is not empty."); + assertThat(catalog.getView(identifier).fullName()).isEqualTo(identifier.getFullName()); + + catalog.dropDatabase(databaseName, false, true); + assertThatThrownBy(() -> catalog.getDatabase(databaseName)) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + } + + @Test + public void testRenameViewAcrossDatabases() throws Exception { + String sourceDatabase = "source_view_db"; + String targetDatabase = "target_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create(targetDatabase, "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createDatabase(targetDatabase, false); + catalog.createView(source, createView(source), false); + + catalog.renameView(source, target, false); + + assertThatThrownBy(() -> catalog.getView(source)) + .isInstanceOf(Catalog.ViewNotExistException.class); + assertThat(catalog.getView(target).fullName()).isEqualTo(target.getFullName()); + assertThat(catalog.listViews(sourceDatabase)).isEmpty(); + assertThat(catalog.listViews(targetDatabase)).containsExactly("view_name"); + } + + @Test + public void testRenameViewRejectsMissingTargetDatabase() throws Exception { + String sourceDatabase = "source_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create("missing_view_db", "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createView(source, createView(source), false); + + assertThatThrownBy(() -> catalog.renameView(source, target, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database missing_view_db does not exist."); + assertThat(catalog.getView(source).fullName()).isEqualTo(source.getFullName()); + assertThatThrownBy(() -> catalog.getView(target)) + .isInstanceOf(Catalog.ViewNotExistException.class); + } + + @Test + public void testRenameTableRejectsMissingTargetDatabase() throws Exception { + String sourceDatabase = "source_rename_table_db"; + Identifier source = Identifier.create(sourceDatabase, "table_name"); + Identifier target = Identifier.create("missing_target_db", "table_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createTable(source, DEFAULT_TABLE_SCHEMA, false); + + assertThatThrownBy(() -> catalog.renameTable(source, target, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database missing_target_db does not exist."); + // Source table must remain intact. + assertDoesNotThrow(() -> catalog.getTable(source)); + assertThatThrownBy(() -> catalog.getTable(target)) + .isInstanceOf(Catalog.TableNotExistException.class); + } + + @Test + public void testTableAndViewCannotShareName() throws Exception { + String databaseName = "shared_name_db"; + Identifier tableFirst = Identifier.create(databaseName, "table_first"); + Identifier viewFirst = Identifier.create(databaseName, "view_first"); + + catalog.createDatabase(databaseName, false); + + catalog.createTable(tableFirst, DEFAULT_TABLE_SCHEMA, false); + assertThatThrownBy(() -> catalog.createView(tableFirst, createView(tableFirst), false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + catalog.createView(tableFirst, createView(tableFirst), true); + assertThat(catalog.listViews(databaseName)).doesNotContain(tableFirst.getObjectName()); + + catalog.createView(viewFirst, createView(viewFirst), false); + assertThatThrownBy(() -> catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, true); + assertThat(catalog.listTables(databaseName)).doesNotContain(viewFirst.getObjectName()); + } + + @Test + public void testRenameRejectsTableViewNameCollision() throws Exception { + String databaseName = "rename_collision_db"; + Identifier table = Identifier.create(databaseName, "table_name"); + Identifier view = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createTable(table, DEFAULT_TABLE_SCHEMA, false); + catalog.createView(view, createView(view), false); + + assertThatThrownBy(() -> catalog.renameTable(table, view, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + assertThatThrownBy(() -> catalog.renameView(view, table, false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + assertThat(catalog.listTables(databaseName)).containsExactly(table.getObjectName()); + assertThat(catalog.listViews(databaseName)).containsExactly(view.getObjectName()); + } + + @Test + public void testListViewsFromSystemDatabase() throws Exception { + assertThat(catalog.listViews(Catalog.SYSTEM_DATABASE_NAME)).isEmpty(); + assertThat( + catalog.listViewsPaged(Catalog.SYSTEM_DATABASE_NAME, 10, null, null) + .getElements()) + .isEmpty(); + assertThat( + catalog.listViewDetailsPaged(Catalog.SYSTEM_DATABASE_NAME, 10, null, null) + .getElements()) + .isEmpty(); + } + + @Test + public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { + String databaseName = "concurrent_view_db"; + Identifier identifier = Identifier.create(databaseName, "same_view"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + Callable create = + () -> { + try { + catalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future first = executor.submit(create); + Future second = executor.submit(create); + + assertThat(ImmutableList.of(first.get(), second.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listViews(databaseName)).containsExactly("same_view"); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentCreateTableAndViewCannotShareName() throws Exception { + String databaseName = "concurrent_table_view_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + catalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listTables(databaseName).contains(identifier.getObjectName())) + .isNotEqualTo( + catalog.listViews(databaseName).contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentRenameTableAndCreateViewCannotShareName() throws Exception { + String databaseName = "concurrent_rename_table_view_db"; + Identifier source = Identifier.create(databaseName, "src_table"); + Identifier target = Identifier.create(databaseName, "shared_name"); + View view = createView(target); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + catalog.createTable(source, DEFAULT_TABLE_SCHEMA, false); + + Callable renameTable = + () -> { + try { + catalog.renameTable(source, target, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + catalog.createView(target, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableRenamed = executor.submit(renameTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableRenamed.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + // The shared identifier must be either a table or a view, but never both. + assertThat(catalog.listTables(databaseName).contains(target.getObjectName())) + .isNotEqualTo(catalog.listViews(databaseName).contains(target.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentRenameTableAndRenameViewCannotShareName() throws Exception { + String databaseName = "concurrent_rename_collision_db"; + Identifier sourceTable = Identifier.create(databaseName, "src_table"); + Identifier sourceView = Identifier.create(databaseName, "src_view"); + Identifier target = Identifier.create(databaseName, "shared_name"); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + catalog.createTable(sourceTable, DEFAULT_TABLE_SCHEMA, false); + catalog.createView(sourceView, createView(sourceView), false); + + Callable renameTable = + () -> { + try { + catalog.renameTable(sourceTable, target, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable renameView = + () -> { + try { + catalog.renameView(sourceView, target, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableRenamed = executor.submit(renameTable); + Future viewRenamed = executor.submit(renameView); + + assertThat(ImmutableList.of(tableRenamed.get(), viewRenamed.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listTables(databaseName).contains(target.getObjectName())) + .isNotEqualTo(catalog.listViews(databaseName).contains(target.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testConcurrentCreateTableAndViewCannotShareNameWithoutDistributedLock() + throws Exception { + // Even when the JDBC distributed lock is disabled, mutations targeting the same + // identifier within a single JVM must remain atomic. This is provided by the per-JVM + // local stripe lock. + JdbcCatalog noLockCatalog = initCatalogWithoutLock(); + try { + String databaseName = "no_lock_concurrent_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + noLockCatalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + noLockCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + noLockCatalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat( + noLockCatalog + .listTables(databaseName) + .contains(identifier.getObjectName())) + .isNotEqualTo( + noLockCatalog + .listViews(databaseName) + .contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } finally { + noLockCatalog.close(); + } + } + + @Test + public void testConcurrentCreateTableAndViewCannotShareNameAcrossCatalogInstancesWithoutLock() + throws Exception { + String uri = + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"; + JdbcCatalog tableCatalog = initCatalogWithoutLock(uri); + JdbcCatalog viewCatalog = initCatalogWithoutLock(uri); + try { + String databaseName = "no_lock_two_catalogs_db"; + Identifier identifier = Identifier.create(databaseName, "same_name"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + tableCatalog.createDatabase(databaseName, false); + Callable createTable = + () -> { + try { + tableCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + return true; + } catch (Catalog.TableAlreadyExistException e) { + return false; + } + }; + Callable createView = + () -> { + try { + viewCatalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future tableCreated = executor.submit(createTable); + Future viewCreated = executor.submit(createView); + + assertThat(ImmutableList.of(tableCreated.get(), viewCreated.get())) + .containsExactlyInAnyOrder(true, false); + assertThat( + tableCatalog + .listTables(databaseName) + .contains(identifier.getObjectName())) + .isNotEqualTo( + viewCatalog + .listViews(databaseName) + .contains(identifier.getObjectName())); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } finally { + tableCatalog.close(); + viewCatalog.close(); + } + } + + @Test + public void testAlterView() throws Exception { + Identifier identifier = Identifier.create("alter_view_db", "my_view"); + View view = createView(identifier); + catalog.createDatabase(identifier.getDatabaseName(), false); + + assertDoesNotThrow( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + true)); + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + false)) + .isInstanceOf(Catalog.ViewNotExistException.class); + + catalog.createView(identifier, view, false); + catalog.alterView(identifier, ImmutableList.of(ViewChange.setOption("k", "v")), false); + assertThat(catalog.getView(identifier).options()).containsEntry("k", "v"); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.removeOption("k")), false); + assertThat(catalog.getView(identifier).options()).doesNotContainKey("k"); + + catalog.alterView( + identifier, ImmutableList.of(ViewChange.updateComment("new comment")), false); + assertThat(catalog.getView(identifier).comment()).hasValue("new comment"); + + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.addDialect("flink_1", "SELECT * FROM FLINK_TABLE_1")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_1"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.addDialect( + "flink_1", "SELECT * FROM FLINK_TABLE_1")), + false)) + .isInstanceOf(Catalog.DialectAlreadyExistException.class); + + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect("flink_1", "SELECT * FROM FLINK_TABLE_2")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_2"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect( + "missing", "SELECT * FROM FLINK_TABLE_2")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.dropDialect("flink_1")), false); + assertThat(catalog.getView(identifier).query("flink_1")).isEqualTo(view.query()); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.dropDialect("missing")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + } } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index c0774b1c587a..8818ecc8b8a5 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -238,6 +238,13 @@ under the License. ${project.version} test + + + org.xerial + sqlite-jdbc + 3.44.0.0 + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java new file mode 100644 index 000000000000..a3b1af7aeb5b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.jdbc.JdbcCatalog; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for JDBC Catalog View support. */ +public class JdbcCatalogViewITCase extends CatalogITCaseBase { + + private static final String DATABASE_NAME = "test_db"; + private static final String TABLE_NAME = "test_table"; + + @TempDir java.nio.file.Path tempFile; + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + sql(String.format("CREATE DATABASE %s", DATABASE_NAME)); + sql(String.format("USE %s", DATABASE_NAME)); + sql( + String.format( + "CREATE TABLE %s.%s (id INT, name STRING, amount DOUBLE)", + DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "INSERT INTO %s.%s VALUES (1, 'Alice', 100.0), (2, 'Bob', 200.0), (3, 'Charlie', 150.0)", + DATABASE_NAME, TABLE_NAME)); + } + + @Override + protected Map catalogOptions() { + Map options = new HashMap<>(); + options.put("metastore", "jdbc"); + options.put( + CatalogOptions.URI.key(), + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + // Disable lock for simpler testing + options.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return options; + } + + @Override + protected String getTempDirPath() { + return tempFile.toUri().toString(); + } + + @Test + public void testCreateAndQueryView() { + // Create a view + String viewName = "sales_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT name, amount FROM %s.%s WHERE amount > 100", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("Bob"); + assertThat(result.toString()).contains("Charlie"); + } + + @Test + public void testShowViews() { + // Create multiple views + sql( + String.format( + "CREATE VIEW %s.view1 AS SELECT * FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "CREATE VIEW %s.view2 AS SELECT id, name FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + + // List views + List result = sql("SHOW VIEWS"); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("view1"); + assertThat(result.toString()).contains("view2"); + } + + @Test + public void testDropView() { + // Create a view + String viewName = "temp_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Verify view exists + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).contains(viewName); + + // Drop the view + sql(String.format("DROP VIEW %s.%s", DATABASE_NAME, viewName)); + + // Verify view is dropped + views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testViewWithAggregation() { + // Create a view with aggregation + String viewName = "agg_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT COUNT(*) as cnt, SUM(amount) as total FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + assertThat(result.get(0).getField(0)).isEqualTo(3L); // count + assertThat((Double) result.get(0).getField(1)).isEqualTo(450.0); // sum + } + + @Test + public void testCreateViewIfNotExists() { + String viewName = "idempotent_view"; + + // Create view first time + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Create view again with IF NOT EXISTS - should not throw error + sql( + String.format( + "CREATE VIEW IF NOT EXISTS %s.%s AS SELECT id FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Original view should remain unchanged + List result = + sql(String.format("SELECT * FROM %s.%s LIMIT 1", DATABASE_NAME, viewName)); + assertThat(result.get(0).getArity()).isEqualTo(3); // original view has 3 columns + } + + @Test + public void testDropViewIfExists() { + String viewName = "maybe_exists_view"; + + // Drop non-existent view with IF EXISTS - should not throw error + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Create and drop + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Verify it's gone + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testShowCreateView() { + String viewName = "describable_view"; + String query = + String.format( + "SELECT `name`, `amount` FROM `%s`.`%s` WHERE `amount` > 100", + DATABASE_NAME, TABLE_NAME); + sql(String.format("CREATE VIEW %s.%s AS %s", DATABASE_NAME, viewName, query)); + + // Show create view + List result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + String createStatement = result.get(0).toString(); + assertThat(createStatement).contains(viewName); + assertThat(createStatement).contains("amount"); + } +}