From 1470f3176a423651e6604c442e33624a1bcc3c08 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 27 Apr 2026 21:00:03 +0800 Subject: [PATCH 1/3] replace --- docs/content/spark/sql-ddl.md | 56 ++++++ .../paimon/catalog/AbstractCatalog.java | 90 +++++++++ .../apache/paimon/catalog/CachingCatalog.java | 8 + .../org/apache/paimon/catalog/Catalog.java | 19 ++ .../paimon/catalog/DelegateCatalog.java | 6 + .../paimon/catalog/FileSystemCatalog.java | 14 ++ .../org/apache/paimon/rest/RESTCatalog.java | 5 + .../paimon/catalog/CatalogTestBase.java | 115 ++++++++++++ .../apache/paimon/jdbc/JdbcCatalogTest.java | 6 + .../apache/paimon/rest/RESTCatalogTest.java | 6 + .../org/apache/paimon/hive/HiveCatalog.java | 35 ++++ .../PaimonCreateTableAsSelectStrategy.scala | 74 ++++---- .../PaimonReplaceTableAsSelectStrategy.scala | 33 ++++ .../PaimonCreateTableAsSelectStrategy.scala | 63 +++---- .../PaimonReplaceTableAsSelectStrategy.scala | 33 ++++ .../PaimonCreateTableAsSelectStrategy.scala | 63 +++---- .../PaimonReplaceTableAsSelectStrategy.scala | 168 +++++++++++++++++ .../PaimonCreateTableAsSelectStrategy.scala | 90 --------- .../spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++- .../paimon/spark/RollbackStagedTable.java | 173 ++++++++++++++++++ .../org/apache/paimon/spark/SparkCatalog.java | 78 ++++++++ .../paimon/spark/SparkGenericCatalog.java | 49 +++++ .../spark/catalog/SparkBaseCatalog.java | 7 +- .../spark/execution/PaimonStrategy.scala | 10 +- .../sql/execution/PaimonStrategyHelper.scala | 11 +- .../PaimonCreateTableAsSelectStrategy.scala | 61 +++--- .../PaimonReplaceTableAsSelectStrategy.scala | 151 +++++++++++++++ .../shim/PaimonTableOptionUtils.scala} | 27 ++- .../spark/sql/paimon/shims/SparkShim.scala | 50 ++++- .../apache/paimon/spark/sql/DDLTestBase.scala | 157 ++++++++++++++++ .../sql/DDLWithHiveCatalogTestBase.scala | 135 ++++++++++++++ .../spark/sql/DataFrameWriteTestBase.scala | 86 +++++++++ .../spark/sql/FormatTableTestBase.scala | 44 +++++ .../spark/sql/paimon/shims/Spark3Shim.scala | 107 ++++++++++- .../spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++- 35 files changed, 1953 insertions(+), 287 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala delete mode 100644 paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala rename paimon-spark/{paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala => paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala} (53%) diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index 6ca8831c1518..42b101896fc2 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -285,6 +285,62 @@ CREATE TABLE my_table_all ( CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all; ``` +### Replace Table + +Paimon supports preserving snapshot history for Spark `REPLACE TABLE` from **Spark 3.4**. + +```sql +CREATE TABLE my_table ( + user_id BIGINT, + item_id BIGINT, + behavior STRING +) TBLPROPERTIES ( + 'primary-key' = 'user_id', + 'bucket' = '2' +); + +INSERT INTO my_table VALUES (1, 10, 'pv'); + +REPLACE TABLE my_table ( + user_id BIGINT, + item_id BIGINT, + category STRING +) TBLPROPERTIES ( + 'primary-key' = 'user_id', + 'bucket' = '4' +); +``` + +In Paimon, this is not an atomic replacement. Paimon changes Spark's drop+create replace path to +truncate the current table and commit a new schema, while preserving the table location and snapshot +history. The current table becomes empty and uses the new schema, but old snapshots can still be +queried by time travel. + +```sql +SELECT * FROM my_table; + +SELECT * FROM my_table VERSION AS OF 1; +``` + +`REPLACE TABLE` requires the table to exist. If the table does not exist, use +`CREATE OR REPLACE TABLE` instead. + +`REPLACE TABLE` does not accept `AS SELECT`. To replace a table and populate it with query results, +use `CREATE OR REPLACE TABLE ... AS SELECT`. + +```sql +CREATE OR REPLACE TABLE my_table +TBLPROPERTIES ( + 'primary-key' = 'user_id', + 'bucket' = '4' +) +AS SELECT user_id, item_id, behavior FROM source_table; +``` + +When the existing table and target table use different table types, +uses its fallback drop+create behavior instead of snapshot-preserving replace +behavior. + ### Create Table Like A new table can be created from an existing source table. Available from **Spark 3.4**. diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4130c8d5759f..4c6144fc4ef0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; +import org.apache.paimon.TableType; import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -40,6 +41,7 @@ import org.apache.paimon.table.Instant; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableSnapshot; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.SnapshotNotExistException; @@ -495,6 +497,94 @@ public void alterTable( protected abstract void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; + @Override + public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + checkNotBranch(identifier, "replaceTable"); + checkNotSystemTable(identifier, "replaceTable"); + validateCreateTable(newSchema, false); + validateCustomTablePath(newSchema.options()); + copyTableDefaultOptions(newSchema.options()); + + Table existing; + try { + existing = getTable(identifier); + } catch (TableNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw e; + } + + TableType targetTableType = Options.fromMap(newSchema.options()).get(TYPE); + if (!(existing instanceof FileStoreTable) || !targetTableType.equals(TableType.TABLE)) { + dropAndCreateTable(identifier, newSchema); + return; + } + + // todo: support this + List oldPartitionKeys = ((FileStoreTable) existing).schema().partitionKeys(); + List newPartitionKeys = newSchema.partitionKeys(); + if (!Objects.equals(oldPartitionKeys, newPartitionKeys)) { + throw new UnsupportedOperationException( + "replaceTable does not support changing partition keys (old=" + + oldPartitionKeys + + ", new=" + + newPartitionKeys + + "). Drop and re-create the table instead."); + } + + replaceTableImpl(identifier, (FileStoreTable) existing, newSchema); + } + + private void dropAndCreateTable(Identifier identifier, Schema newSchema) + throws TableNotExistException { + dropTable(identifier, false); + try { + createTable(identifier, newSchema, false); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw new RuntimeException(e); + } + } + + /** Truncate visible data first, then append the new schema. Non-atomic on failure. */ + protected void replaceTableImpl( + Identifier identifier, FileStoreTable existingTable, Schema newSchema) + throws TableNotExistException { + truncateTable(existingTable); + try { + appendNewSchema(existingTable, newSchema); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Append a new schema (id = latest + 1) via atomic CAS. Returns the new schema id. */ + protected long appendNewSchema(FileStoreTable existingTable, Schema newSchema) + throws Exception { + SchemaManager sm = existingTable.schemaManager(); + while (true) { + TableSchema latest = sm.latestOrThrow("Cannot replace: schema chain is empty."); + TableSchema staged = TableSchema.create(latest.id() + 1, newSchema); + if (sm.commit(staged)) { + return staged.id(); + } + } + } + + protected void truncateTable(FileStoreTable existingTable) { + try (TableCommitImpl commit = + existingTable.newCommit("replace-table-" + java.util.UUID.randomUUID())) { + commit.truncateTable(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public Table getTable(Identifier identifier) throws TableNotExistException { return CatalogUtils.loadTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index aad5d16a11e0..b59cfa675f5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -224,6 +225,13 @@ public void alterTable( invalidateTable(identifier); } + @Override + public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + super.replaceTable(identifier, newSchema, ignoreIfNotExists); + invalidateTable(identifier); + } + @Override public Table getTable(Identifier identifier) throws TableNotExistException { Table table = tableCache.getIfPresent(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index ab66f937ea02..57fa040a2acd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -358,6 +358,25 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); } + /** + * Replace an existing table with a new {@link Schema}. + * + *

For compatible FileStore tables, it truncates visible data and appends a new schema to the + * schema chain, preserving old schemas and snapshots for time travel. Other table kinds may + * fall back to drop-and-create behavior. + * + * @param identifier path of the table to be replaced + * @param newSchema the new {@link Schema} + * @param ignoreIfNotExists if true, do nothing when the table does not exist + * @throws TableNotExistException if the table does not exist and {@code ignoreIfNotExists} is + * false + */ + default void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + throw new UnsupportedOperationException( + "Catalog " + getClass().getName() + " does not support replaceTable."); + } + // ======================= partition methods =============================== /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index ec5138a8cbfb..0f18f7d04540 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -171,6 +171,12 @@ public void alterTable( wrapped.alterTable(identifier, changes, ignoreIfNotExists); } + @Override + public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + wrapped.replaceTable(identifier, newSchema, ignoreIfNotExists); + } + @Override public void registerTable(Identifier identifier, String path) throws TableAlreadyExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index cea266f7034b..b0b8b1ac7f30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -25,6 +25,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,6 +180,19 @@ protected void alterTableImpl(Identifier identifier, List changes) } } + @Override + protected void replaceTableImpl( + Identifier identifier, FileStoreTable existingTable, Schema newSchema) { + truncateTable(existingTable); + try { + runWithLock(identifier, () -> appendNewSchema(existingTable, newSchema)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + protected static T uncheck(Callable callable) { try { return callable.call(); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 155d6a989324..b514ef4d618f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -585,6 +585,11 @@ public void alterTable( } } + @Override + public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) { + throw new UnsupportedOperationException("RESTCatalog does not support replaceTable yet."); + } + @Override public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 4f4aed63faad..66f363a43ec6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -1171,6 +1171,117 @@ public void testAlterMaterializedTable() throws Exception { baseAlterTable(initOptions); } + @Test + public void testReplaceTable() throws Exception { + if (!supportsReplaceTable()) { + return; + } + catalog.createDatabase("replace_db", true); + Identifier identifier = Identifier.create("replace_db", "t"); + + Schema initialSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("data", DataTypes.STRING()) + .column("pt", DataTypes.STRING()) + .partitionKeys("pt") + .primaryKey("id", "pt") + .option("bucket", "2") + .build(); + catalog.createTable(identifier, initialSchema, false); + + Table created = catalog.getTable(identifier); + String oldLocation = ((FileStoreTable) created).location().toString(); + BatchWriteBuilder writeBuilder = created.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write( + GenericRow.of(1, BinaryString.fromString("old"), BinaryString.fromString("a"))); + commit.commit(write.prepareCommit()); + } + + long oldSnapshotId = + ((FileStoreTable) catalog.getTable(identifier)) + .snapshotManager() + .latestSnapshotId(); + + // Replace with new PK + bucket (partition keys unchanged) + Schema newSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("pt", DataTypes.STRING()) + .partitionKeys("pt") + .primaryKey("id", "pt") + .option("bucket", "4") + .build(); + catalog.replaceTable(identifier, newSchema, false); + + FileStoreTable replaced = (FileStoreTable) catalog.getTable(identifier); + assertThat(replaced.partitionKeys()).containsExactly("pt"); + assertThat(replaced.primaryKeys()).containsExactly("id", "pt"); + assertThat(replaced.options().get("bucket")).isEqualTo("4"); + assertThat(replaced.location().toString()).isEqualTo(oldLocation); + assertThat(read(replaced, null, null, null, null)).isEmpty(); + + // Time-travel to old snapshot still returns old data with old schema + FileStoreTable oldView = + replaced.copy(Collections.singletonMap("scan.snapshot-id", "" + oldSnapshotId)); + assertThat(oldView.schema().fieldNames()).containsExactly("id", "data", "pt"); + assertThat(read(oldView, null, null, null, null)).hasSize(1); + + // Changing partition keys is rejected + Schema changePartitionKeys = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("pt", DataTypes.STRING()) + .primaryKey("id") + .option("bucket", "4") + .build(); + assertThatExceptionOfType(UnsupportedOperationException.class) + .isThrownBy(() -> catalog.replaceTable(identifier, changePartitionKeys, false)) + .withMessageContaining("partition keys"); + + if (supportsFormatTable()) { + Schema formatSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option(TYPE.key(), TableType.FORMAT_TABLE.toString()) + .option(CoreOptions.FILE_FORMAT.key(), "csv") + .build(); + catalog.replaceTable(identifier, formatSchema, false); + assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class); + + Schema paimonSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option("bucket", "-1") + .build(); + catalog.replaceTable(identifier, paimonSchema, false); + assertThat(catalog.getTable(identifier)).isInstanceOf(FileStoreTable.class); + } + + // ignoreIfNotExists = true: missing table is silently skipped + Identifier missing = Identifier.create("replace_db", "missing"); + catalog.replaceTable(missing, newSchema, true); + + // ignoreIfNotExists = false: missing table throws + assertThatExceptionOfType(Catalog.TableNotExistException.class) + .isThrownBy(() -> catalog.replaceTable(missing, newSchema, false)); + + // System table is rejected + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + catalog.replaceTable( + Identifier.create("replace_db", "$system_table"), + newSchema, + false)); + } + @Test public void testView() throws Exception { if (!supportsView()) { @@ -1658,6 +1769,10 @@ protected boolean supportsViewDialects() { return true; } + protected boolean supportsReplaceTable() { + return true; + } + protected void checkPartition(Partition expected, Partition actual) { assertThat(actual).isEqualTo(expected); } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 2b121dd4cdf7..fd3c6fdc5950 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -140,6 +140,12 @@ protected boolean supportsAlterDatabase() { return true; } + @Override + protected boolean supportsReplaceTable() { + // jdbc lock interferes with the test data commit; replace path itself works at runtime + return false; + } + @Test public void testRepairTableNotExist() throws Exception { String databaseName = "repair_db"; diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index a07c6eaed281..00da79046ac4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -3173,6 +3173,12 @@ protected boolean supportsAlterDatabase() { return true; } + // TODO: implement this + @Override + protected boolean supportsReplaceTable() { + return false; + } + // TODO implement this @Override @Test diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index ac0706b40bb0..d32ec08648b5 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -1296,6 +1296,41 @@ private void alterTableToHms( clients().execute(client -> HiveAlterTableUtils.alterTable(client, identifier, table)); } + @Override + protected void replaceTableImpl( + Identifier identifier, FileStoreTable existingTable, Schema newSchema) + throws TableNotExistException { + Table hmsTable = getHmsTable(identifier); + if (!isPaimonTable(hmsTable)) { + throw new UnsupportedOperationException("Only data table support replaceTable."); + } + + truncateTable(existingTable); + + SchemaManager schemaManager = existingTable.schemaManager(); + long newSchemaId; + try { + newSchemaId = runWithLock(identifier, () -> appendNewSchema(existingTable, newSchema)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Failed to replaceTable " + identifier.getFullName(), e); + } + + // currently only changes to main branch affect metastore + if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) { + return; + } + + try { + TableSchema newTableSchema = schemaManager.schema(newSchemaId); + alterTableToHms(hmsTable, identifier, newTableSchema, Collections.emptySet()); + } catch (Exception te) { + schemaManager.deleteSchema(newSchemaId); + throw new RuntimeException(te); + } + } + @Override public boolean caseSensitive() { return options.getOptional(CASE_SENSITIVE).orElse(false); diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index cc6258e6eb69..87ff01c3c74e 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,13 +18,12 @@ package org.apache.spark.sql.execution.shim -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions +import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -34,47 +33,40 @@ import scala.collection.JavaConverters._ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + case CreateTableAsSelect( + catalog: SparkCatalog, + ident, + parts, + query, + props, + options, + ifNotExists) => + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newProps.get("provider").orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val isPartitionedFormatTable = { + catalog match { + case formatCatalog: FormatTableCatalog => + formatCatalog.isFormatTable(newProps.get("provider").orNull) && parts.nonEmpty + case _ => false + } + } - CreateTableAsSelectExec( - catalog, - ident, - parts, - query, - planLater(query), - newProps, - new CaseInsensitiveStringMap(writeOptions.asJava), - ifNotExists - ) :: Nil + if (isPartitionedFormatTable) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") } + + CreateTableAsSelectExec( + catalog, + ident, + parts, + query, + planLater(query), + newProps, + new CaseInsensitiveStringMap(writeOptions.asJava), + ifNotExists + ) :: Nil case _ => Nil } } diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala new file mode 100644 index 000000000000..d637c006da3c --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.shim + +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan + +case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends Strategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil +} + +case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil +} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index a09996f1534a..3bf90c7e3943 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,15 +18,12 @@ package org.apache.spark.sql.execution.shim -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -47,46 +44,32 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) tableSpec: TableSpec, options, ifNotExists) => - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val isPartitionedFormatTable = { + catalog match { + case formatCatalog: FormatTableCatalog => + formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && parts.nonEmpty + case _ => false + } + } - CreateTableAsSelectExec( - catalog.asTableCatalog, - ident.asIdentifier, - parts, - query, - planLater(query), - qualifyLocInTableSpec(newTableSpec), - new CaseInsensitiveStringMap(writeOptions.asJava), - ifNotExists - ) :: Nil + if (isPartitionedFormatTable) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") } + + CreateTableAsSelectExec( + catalog.asTableCatalog, + ident.asIdentifier, + parts, + query, + planLater(query), + qualifiedSpec, + new CaseInsensitiveStringMap(writeOptions.asJava), + ifNotExists + ) :: Nil case _ => Nil } } diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala new file mode 100644 index 000000000000..d637c006da3c --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.shim + +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan + +case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends Strategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil +} + +case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil +} diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 4a82f35188dd..8fa9a2f2445c 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,15 +18,12 @@ package org.apache.spark.sql.execution.shim -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -49,46 +46,32 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) ifNotExists, analyzedQuery) => assert(analyzedQuery.isDefined) - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val isPartitionedFormatTable = { + catalog match { + case formatCatalog: FormatTableCatalog => + formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && parts.nonEmpty + case _ => false + } + } - CreateTableAsSelectExec( - catalog.asTableCatalog, - ident, - parts, - analyzedQuery.get, - planLater(query), - qualifyLocInTableSpec(newTableSpec), - new CaseInsensitiveStringMap(writeOptions.asJava), - ifNotExists - ) :: Nil + if (isPartitionedFormatTable) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") } + + CreateTableAsSelectExec( + catalog.asTableCatalog, + ident, + parts, + analyzedQuery.get, + planLater(query), + qualifiedSpec, + new CaseInsensitiveStringMap(writeOptions.asJava), + ifNotExists + ) :: Nil case _ => Nil } } diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala new file mode 100644 index 000000000000..4bb4c0589951 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.shim + +import org.apache.paimon.CoreOptions.TYPE +import org.apache.paimon.options.Options +import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkSource, SparkTable} +import org.apache.paimon.spark.catalog.SparkBaseCatalog + +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, ResolvedIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} +import org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, ReplaceTableAsSelectExec} +import org.apache.spark.sql.paimon.shims.SparkShimLoader +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) + extends Strategy + with PaimonStrategyHelper { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReplaceTableAsSelect( + ResolvedIdentifier(catalog: SparkBaseCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate, + analyzedQuery) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => + assert(analyzedQuery.isDefined) + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) + val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava) + if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { + AtomicReplaceTableAsSelectExec( + catalog.asInstanceOf[StagingTableCatalog], + ident, + parts, + analyzedQuery.get, + planLater(query), + qualifiedSpec, + writeOpts, + orCreate = orCreate, + invalidateCache + ) :: Nil + } else { + ReplaceTableAsSelectExec( + catalog, + ident, + parts, + analyzedQuery.get, + planLater(query), + qualifiedSpec, + writeOpts, + orCreate = orCreate, + invalidateCache + ) :: Nil + } + case _ => Nil + } + + private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: Identifier): Unit = { + tableCatalog.invalidateTable(ident) + } +} + +case class PaimonReplaceTableStrategy(spark: SparkSession) + extends Strategy + with PaimonStrategyHelper { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case replace @ ReplaceTable( + ResolvedIdentifier(catalog: SparkBaseCatalog, ident), + schemaOrColumns, + parts, + tableSpec: TableSpec, + orCreate) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => + val columns = + SparkShimLoader.shim.toReplaceTableColumns( + replace.tableSchema, + schemaOrColumns, + catalog, + ident) + val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty) + if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { + SparkShimLoader.shim.createAtomicReplaceTableExec( + catalog.asInstanceOf[StagingTableCatalog], + ident, + columns, + parts, + qualifiedSpec, + orCreate = orCreate) :: Nil + } else { + SparkShimLoader.shim.createReplaceTableExec( + catalog, + ident, + columns, + parts, + qualifiedSpec, + orCreate = orCreate) :: Nil + } + case _ => Nil + } +} + +private[shim] object PaimonReplaceTableStrategyHelper { + + def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec): Boolean = catalog match { + case _: SparkCatalog => true + case _: SparkGenericCatalog => + tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME)) + case _ => false + } + + /** + * Whether replace can use Spark's staged replace path. Paimon's replaceTable is not a + * rollbackable atomic replace; it swaps the current schema and truncates current data while + * preserving old snapshots. Return false for cases replaceTable would reject so Spark falls back + * to drop+create. + */ + def canAtomicReplace( + catalog: SparkBaseCatalog, + ident: Identifier, + tableSpec: TableSpec, + parts: Seq[Transform]): Boolean = { + try { + val existing = catalog.loadTable(ident) + if (!existing.isInstanceOf[SparkTable]) return false + val existingProvider = + Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME) + val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME) + if (!existingProvider.equalsIgnoreCase(targetProvider)) return false + val existingType = Options.fromMap(existing.properties()).get(TYPE) + val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE) + if (existingType != targetType) return false + val existingParts = existing.partitioning().toSeq + existingParts.size == parts.size && + existingParts.zip(parts).forall { case (a, b) => a.toString == b.toString } + } catch { + case _: NoSuchTableException => true + } + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala deleted file mode 100644 index 61e25b7c16a9..000000000000 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.shim - -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions -import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog -import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec - -import scala.collection.JavaConverters._ - -case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) - extends SparkStrategy - with PaimonStrategyHelper { - - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableAsSelect( - ResolvedIdentifier(catalog: SparkCatalog, ident), - parts, - query, - tableSpec: TableSpec, - options, - ifNotExists, - true) => - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } - - CreateTableAsSelectExec( - catalog.asTableCatalog, - ident, - parts, - query, - qualifyLocInTableSpec(newTableSpec), - writeOptions, - ifNotExists) :: Nil - } - case _ => Nil - } -} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 11fdfbd5790b..e08c87d4d33b 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -32,14 +32,15 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction} import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} +import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, IdentityColumn, ResolveDefaultColumns} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.SparkFormatTable +import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan} import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.internal.SQLConf @@ -102,6 +103,102 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def createReplaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + ReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createReplaceTableExec( + catalog: TableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + ReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableExec( + catalog: StagingTableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def toReplaceTableColumns( + tableSchema: StructType, + schemaOrColumns: Any, + catalog: TableCatalog, + ident: Identifier): Array[Column] = { + val statementType = "REPLACE TABLE" + val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]] + ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog, ident) + GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident, statementType) + IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident) + columns.map(_.toV2Column(statementType)).toArray + } + + override def copyTableSpec( + tableSpec: TableSpec, + additionalProperties: Map[String, String], + location: Option[String]): TableSpec = { + tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, location = location) + } + + private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: Identifier): Unit = { + tableCatalog.invalidateTable(ident) + } + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java new file mode 100644 index 000000000000..2f90545b6030 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.SupportsDelete; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +/** A staged table that rolls back by invoking the provided abort action. */ +class RollbackStagedTable implements StagedTable, SupportsRead, SupportsWrite, SupportsDelete { + + @FunctionalInterface + interface Action { + void run() throws Exception; + } + + @FunctionalInterface + interface StagedAction { + void run(RollbackStagedTable stagedTable) throws Exception; + } + + private static final Action DO_NOTHING = () -> {}; + + private final Table table; + private final StagedAction commitAction; + private final Action abortAction; + private boolean committed; + private boolean aborted; + private boolean writeStarted; + + RollbackStagedTable(Table table, Action abortAction) { + this(table, DO_NOTHING, abortAction); + } + + RollbackStagedTable(Table table, Action commitAction, Action abortAction) { + this(table, ignored -> commitAction.run(), abortAction); + } + + RollbackStagedTable(Table table, StagedAction commitAction, Action abortAction) { + this.table = table; + this.commitAction = commitAction; + this.abortAction = abortAction; + } + + @Override + public String name() { + return table.name(); + } + + @Override + public StructType schema() { + return table.schema(); + } + + @Override + public Transform[] partitioning() { + return table.partitioning(); + } + + @Override + public Map properties() { + return table.properties(); + } + + @Override + public Set capabilities() { + return table.capabilities(); + } + + @Override + public void deleteWhere(Filter[] filters) { + call(SupportsDelete.class, t -> t.deleteWhere(filters)); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return callReturning(SupportsRead.class, t -> t.newScanBuilder(options)); + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + writeStarted = true; + return callReturning(SupportsWrite.class, t -> t.newWriteBuilder(info)); + } + + boolean hasWriteStarted() { + return writeStarted; + } + + @Override + public void commitStagedChanges() { + if (committed || aborted) { + return; + } + + try { + commitAction.run(this); + committed = true; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void abortStagedChanges() { + if (committed || aborted) { + return; + } + + try { + abortAction.run(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + aborted = true; + } + } + + private void call(Class requiredClass, Consumer task) { + callReturning( + requiredClass, + instance -> { + task.accept(instance); + return null; + }); + } + + private R callReturning(Class requiredClass, Function task) { + if (requiredClass.isInstance(table)) { + return task.apply(requiredClass.cast(table)); + } + + throw new UnsupportedOperationException( + String.format( + "Table does not implement %s: %s (%s)", + requiredClass.getSimpleName(), table.name(), table.getClass().getName())); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index c68e7768abe6..f7b4471a6e25 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -61,6 +61,7 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.StagedTable; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; @@ -386,6 +387,65 @@ public boolean dropTable(Identifier ident) { } } + @Override + public StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { + return stageCreateDirectly(ident, schema, partitions, properties); + } + + @Override + public StagedTable stageReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws NoSuchNamespaceException, NoSuchTableException { + return stageReplaceInternal(ident, schema, partitions, properties); + } + + @Override + public StagedTable stageCreateOrReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws NoSuchNamespaceException { + try { + return stageReplaceInternal(ident, schema, partitions, properties); + } catch (NoSuchTableException e) { + try { + return stageCreate(ident, schema, partitions, properties); + } catch (TableAlreadyExistsException ex) { + throw new RuntimeException(ex); + } + } + } + + private StagedTable stageReplaceInternal( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws NoSuchNamespaceException, NoSuchTableException { + org.apache.paimon.catalog.Identifier tableIdent = toIdentifier(ident, catalogName); + Schema targetSchema = toInitialSchema(schema, partitions, properties); + + try { + catalog.replaceTable(tableIdent, targetSchema, false); + } catch (Catalog.TableNotExistException e) { + throw new NoSuchTableException(ident); + } + + // For FileStore tables this is not an atomic replacement: it changes Spark's drop+create + // replace path to truncating the current table and committing a new schema, so snapshot + // history is preserved. Other table-type changes fall back to drop+create in the catalog. + return new RollbackStagedTable(loadTable(ident), () -> {}); + } + private SchemaChange toSchemaChange(TableChange change) { if (change instanceof TableChange.SetProperty) { TableChange.SetProperty set = (TableChange.SetProperty) change; @@ -454,6 +514,24 @@ private static SchemaChange.Move getMove( return move; } + private StagedTable stageCreateDirectly( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { + org.apache.spark.sql.connector.catalog.Table table = + createTable(ident, schema, partitions, properties); + if (table == null) { + try { + table = loadTable(ident); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + } + return new RollbackStagedTable(table, () -> dropTable(ident)); + } + private Schema toInitialSchema( StructType schema, Transform[] partitions, Map properties) { Map normalizedProperties = new HashMap<>(properties); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index e79af9a0b488..b89eaf092574 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -43,6 +43,8 @@ import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.PaimonCatalogUtils; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -245,6 +247,49 @@ public void renameTable(Identifier from, Identifier to) } } + @Override + public StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { + if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) { + return sparkCatalog.stageCreate(ident, schema, partitions, properties); + } else { + return asStagingTableCatalog().stageCreate(ident, schema, partitions, properties); + } + } + + @Override + public StagedTable stageReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws NoSuchNamespaceException, NoSuchTableException { + if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) { + return sparkCatalog.stageReplace(ident, schema, partitions, properties); + } else { + return asStagingTableCatalog().stageReplace(ident, schema, partitions, properties); + } + } + + @Override + public StagedTable stageCreateOrReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) + throws NoSuchNamespaceException { + if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) { + return sparkCatalog.stageCreateOrReplace(ident, schema, partitions, properties); + } else { + return asStagingTableCatalog() + .stageCreateOrReplace(ident, schema, partitions, properties); + } + } + @Override public final void initialize(String name, CaseInsensitiveStringMap options) { SparkSession sparkSession = PaimonSparkSession$.MODULE$.active(); @@ -362,6 +407,10 @@ private FunctionCatalog asFunctionCatalog() { return (FunctionCatalog) getDelegateCatalog(); } + private StagingTableCatalog asStagingTableCatalog() { + return (StagingTableCatalog) getDelegateCatalog(); + } + // ======================= Function methods =============================== @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java index ac6736e2e1c0..284b5d099fb6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -25,6 +25,7 @@ import org.apache.paimon.spark.procedure.ProcedureBuilder; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableCatalogCapability; @@ -39,7 +40,11 @@ /** Spark base catalog. */ public abstract class SparkBaseCatalog - implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog { + implements TableCatalog, + SupportsNamespaces, + ProcedureCatalog, + WithPaimonCatalog, + StagingTableCatalog { protected String catalogName; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 3be8b5a74e79..63c61a16e8b5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ShowCreateTable} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowCreateTable} import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog} import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation} -import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy +import org.apache.spark.sql.execution.shim.{PaimonCreateTableAsSelectStrategy, PaimonReplaceTableAsSelectStrategy, PaimonReplaceTableStrategy} import org.apache.spark.sql.paimon.shims.SparkShimLoader import scala.collection.JavaConverters._ @@ -51,6 +51,12 @@ case class PaimonStrategy(spark: SparkSession) case ctas: CreateTableAsSelect => PaimonCreateTableAsSelectStrategy(spark)(ctas) + case rtas: ReplaceTableAsSelect => + PaimonReplaceTableAsSelectStrategy(spark)(rtas) + + case rt: ReplaceTable => + PaimonReplaceTableStrategy(spark)(rt) + case c @ PaimonCallCommand(procedure, args) => val input = buildInternalRow(args) PaimonCallExec(c.output, procedure, input) :: Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala index 9fb3a7b54a25..09cb26bd7bc8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.paimon.shims.SparkShimLoader trait PaimonStrategyHelper { @@ -34,8 +35,12 @@ trait PaimonStrategyHelper { spark.sharedState.hadoopConf) } - protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.copy(location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) + protected def qualifyTableSpec( + tableSpec: TableSpec, + tableOptions: Map[String, String]): TableSpec = { + SparkShimLoader.shim.copyTableSpec( + tableSpec, + tableOptions, + tableSpec.location.map(makeQualifiedDBObjectPath)) } - } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 61e25b7c16a9..34caa3f7a119 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,20 +18,15 @@ package org.apache.spark.sql.execution.shim -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec -import scala.collection.JavaConverters._ - case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends SparkStrategy with PaimonStrategyHelper { @@ -47,44 +42,30 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) options, ifNotExists, true) => - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val isPartitionedFormatTable = { + catalog match { + case formatCatalog: FormatTableCatalog => + formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && parts.nonEmpty + case _ => false + } + } - CreateTableAsSelectExec( - catalog.asTableCatalog, - ident, - parts, - query, - qualifyLocInTableSpec(newTableSpec), - writeOptions, - ifNotExists) :: Nil + if (isPartitionedFormatTable) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") } + + CreateTableAsSelectExec( + catalog.asTableCatalog, + ident, + parts, + query, + qualifiedSpec, + writeOptions, + ifNotExists) :: Nil case _ => Nil } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala new file mode 100644 index 000000000000..d107397b20a9 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.shim + +import org.apache.paimon.CoreOptions.TYPE +import org.apache.paimon.options.Options +import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkSource, SparkTable} +import org.apache.paimon.spark.catalog.SparkBaseCatalog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, ResolvedIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, TableCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} +import org.apache.spark.sql.paimon.shims.SparkShimLoader + +import scala.collection.JavaConverters._ + +case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) + extends SparkStrategy + with PaimonStrategyHelper { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReplaceTableAsSelect( + ResolvedIdentifier(catalog: SparkBaseCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate, + true) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => + val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) + if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { + SparkShimLoader.shim.createAtomicReplaceTableAsSelectExec( + catalog.asInstanceOf[StagingTableCatalog], + ident, + parts, + query, + qualifiedSpec, + writeOptions, + orCreate = orCreate) :: Nil + } else { + SparkShimLoader.shim.createReplaceTableAsSelectExec( + catalog, + ident, + parts, + query, + qualifiedSpec, + writeOptions, + orCreate = orCreate) :: Nil + } + case _ => Nil + } +} + +case class PaimonReplaceTableStrategy(spark: SparkSession) + extends SparkStrategy + with PaimonStrategyHelper { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case replace @ ReplaceTable( + ResolvedIdentifier(catalog: SparkBaseCatalog, ident), + schemaOrColumns, + parts, + tableSpec: TableSpec, + orCreate) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => + val columns = + SparkShimLoader.shim.toReplaceTableColumns( + replace.tableSchema, + schemaOrColumns, + catalog, + ident) + val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty) + if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { + SparkShimLoader.shim.createAtomicReplaceTableExec( + catalog.asInstanceOf[StagingTableCatalog], + ident, + columns, + parts, + qualifiedSpec, + orCreate = orCreate) :: Nil + } else { + SparkShimLoader.shim.createReplaceTableExec( + catalog, + ident, + columns, + parts, + qualifiedSpec, + orCreate = orCreate) :: Nil + } + case _ => Nil + } +} + +private[shim] object PaimonReplaceTableStrategyHelper { + + def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec): Boolean = catalog match { + case _: SparkCatalog => true + case _: SparkGenericCatalog => + tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME)) + case _ => false + } + + /** + * Whether replace can use Spark's staged replace path. Paimon's replaceTable is not a + * rollbackable atomic replace; it swaps the current schema and truncates current data while + * preserving old snapshots. Return false for cases replaceTable would reject so Spark falls back + * to drop+create. + */ + def canAtomicReplace( + catalog: SparkBaseCatalog, + ident: Identifier, + tableSpec: TableSpec, + parts: Seq[Transform]): Boolean = { + try { + val existing = catalog.loadTable(ident) + if (!existing.isInstanceOf[SparkTable]) return false + val existingProvider = + Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME) + val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME) + if (!existingProvider.equalsIgnoreCase(targetProvider)) return false + val existingType = Options.fromMap(existing.properties()).get(TYPE) + val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE) + if (existingType != targetType) return false + val existingParts = existing.partitioning().toSeq + existingParts.size == parts.size && + existingParts.zip(parts).forall { case (a, b) => a.toString == b.toString } + } catch { + case _: NoSuchTableException => true + } + } + +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala similarity index 53% rename from paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala index 9fb3a7b54a25..c6507062ec1c 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala @@ -16,26 +16,21 @@ * limitations under the License. */ -package org.apache.spark.sql.execution +package org.apache.spark.sql.execution.shim -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogUtils -import org.apache.spark.sql.catalyst.plans.logical.TableSpec -import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.paimon.CoreOptions +import org.apache.paimon.iceberg.IcebergOptions -trait PaimonStrategyHelper { +import scala.collection.JavaConverters._ - def spark: SparkSession +private[shim] object PaimonTableOptionUtils { - protected def makeQualifiedDBObjectPath(location: String): String = { - CatalogUtils.makeQualifiedDBObjectPath( - spark.sharedState.conf.get(WAREHOUSE_PATH), - location, - spark.sharedState.hadoopConf) - } + private val tableOptionKeys = + (CoreOptions.getOptions.asScala.map(_.key()) ++ IcebergOptions.getOptions.asScala.map( + _.key())).toSet - protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.copy(location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) + def splitTableAndWriteOptions( + options: Map[String, String]): (Map[String, String], Map[String, String]) = { + options.partition { case (key, _) => tableOptionKeys.contains(key) } } - } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 3ceb4943966d..38efd8c0066a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith, UpdateAction} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.StructType @@ -66,6 +67,51 @@ trait SparkShim { partitions: Array[Transform], properties: JMap[String, String]): Table + def createReplaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan + + def createAtomicReplaceTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan + + def createReplaceTableExec( + catalog: TableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan + + def createAtomicReplaceTableExec( + catalog: StagingTableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan + + def toReplaceTableColumns( + tableSchema: StructType, + schemaOrColumns: Any, + catalog: TableCatalog, + ident: Identifier): Array[Column] + + def copyTableSpec( + tableSpec: TableSpec, + additionalProperties: Map[String, String], + location: Option[String]): TableSpec + def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 3f2c0e899646..47a280868db3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -276,6 +276,163 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } + test("Paimon DDL: REPLACE TABLE replaces in-place and preserves old snapshots") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + val oldLocation = loadTable("t").location().toString + val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId() + + sql(""" + |REPLACE TABLE t (id BIGINT, name STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4') + |""".stripMargin) + + val replaced = loadTable("t") + Assertions.assertEquals(oldLocation, replaced.location().toString) + Assertions.assertEquals("4", replaced.options().get("bucket")) + Assertions.assertEquals(Seq("id", "name"), spark.table("t").schema.fieldNames.toSeq) + checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) + + checkAnswer( + sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"), + Seq((1L, "old")).toDF()) + } + } + + test("Paimon DDL: REPLACE TABLE without SELECT fails if table is missing") { + assume(gteqSpark3_4) + withTable("missing") { + val error = intercept[AnalysisException] { + sql(""" + |REPLACE TABLE missing (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + }.getMessage + + Assertions.assertTrue( + error.contains("TABLE_OR_VIEW_NOT_FOUND") || + error.contains("cannot be found") || + error.contains("not found")) + } + } + + test("Paimon DDL: CREATE TABLE fails when table exists") { + withTable("t") { + sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon") + + val error = intercept[AnalysisException] { + sql("CREATE TABLE t (id BIGINT, name STRING) USING paimon") + }.getMessage + + Assertions.assertTrue( + error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") || error.contains("already exists")) + } + } + + test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT on partitioned table") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING, pt STRING) + |USING paimon + |PARTITIONED BY (pt) + |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old', 'p0')") + val oldLocation = loadTable("t").location().toString + Seq((2L, "x2", "p1"), (3L, "x3", "p2")) + .toDF("id", "data", "pt") + .createOrReplaceTempView("source") + + sql(""" + |CREATE OR REPLACE TABLE t + |USING paimon + |PARTITIONED BY (pt) + |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3') + |AS SELECT * FROM source + |""".stripMargin) + + val replaced = loadTable("t") + Assertions.assertEquals(oldLocation, replaced.location().toString) + Assertions.assertEquals("3", replaced.options().get("bucket")) + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF()) + } + } + + test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT supports incompatible schema") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + val oldLocation = loadTable("t").location().toString + val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId() + Seq(("2", 20), ("3", 30)).toDF("id", "amount").createOrReplaceTempView("source") + + sql(""" + |CREATE OR REPLACE TABLE t + |USING paimon + |TBLPROPERTIES ('bucket' = '-1') + |AS SELECT * FROM source + |""".stripMargin) + + val replaced = loadTable("t") + Assertions.assertEquals(oldLocation, replaced.location().toString) + Assertions.assertEquals("-1", replaced.options().get("bucket")) + Assertions.assertEquals(Seq("id", "amount"), spark.table("t").schema.fieldNames.toSeq) + Assertions.assertEquals("string", spark.table("t").schema("id").dataType.typeName) + Assertions.assertEquals("integer", spark.table("t").schema("amount").dataType.typeName) + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3", 30)).toDF()) + checkAnswer( + sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"), + Seq((1L, "old")).toDF()) + } + } + + test("Paimon DDL: REPLACE TABLE supports incompatible schema and preserves old snapshots") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + val oldLocation = loadTable("t").location().toString + val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId() + + sql(""" + |REPLACE TABLE t (id STRING, amount INT) + |USING paimon + |TBLPROPERTIES ('bucket' = '-1') + |""".stripMargin) + + val replaced = loadTable("t") + Assertions.assertEquals(oldLocation, replaced.location().toString) + Assertions.assertEquals("-1", replaced.options().get("bucket")) + Assertions.assertEquals(Seq("id", "amount"), spark.table("t").schema.fieldNames.toSeq) + Assertions.assertEquals("string", spark.table("t").schema("id").dataType.typeName) + Assertions.assertEquals("integer", spark.table("t").schema("amount").dataType.typeName) + checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) + checkAnswer( + sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"), + Seq((1L, "old")).toDF()) + } + } + fileFormats.foreach { format => test(s"Paimon DDL: create table with char/varchar/string, file.format: $format") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index d395a166922c..feae8be0dab5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -726,6 +726,141 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon replace") { + assume(gteqSpark3_4) + spark.sql(s"USE $sparkCatalogName") + withDatabase("paimon_db") { + spark.sql("CREATE DATABASE paimon_db") + spark.sql("USE paimon_db") + + withTable("rt", "rtas", "missing") { + spark.sql(""" + |CREATE TABLE rt (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + spark.sql("INSERT INTO rt VALUES (1, 'old')") + val oldSnapshotId = loadTable("paimon_db", "rt").snapshotManager().latestSnapshotId() + + spark.sql(""" + |REPLACE TABLE rt (id BIGINT, name STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4') + |""".stripMargin) + + val replaced = loadTable("paimon_db", "rt") + Assertions.assertEquals("4", replaced.options().get("bucket")) + Assertions.assertEquals(Seq("id", "name"), spark.table("rt").schema.fieldNames.toSeq) + checkAnswer(spark.sql("SELECT * FROM rt"), Seq.empty[Row]) + checkAnswer( + spark.sql(s"SELECT id, data FROM rt VERSION AS OF $oldSnapshotId"), + Row(1L, "old") :: Nil) + + val error = intercept[AnalysisException] { + spark.sql(""" + |REPLACE TABLE missing (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + }.getMessage + Assertions.assertTrue( + error.contains("TABLE_OR_VIEW_NOT_FOUND") || + error.contains("cannot be found") || + error.contains("not found")) + + Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source") + spark.sql(""" + |CREATE TABLE rtas (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + spark.sql("INSERT INTO rtas VALUES (1, 'old')") + val oldLocation = loadTable("paimon_db", "rtas").location().toString + val oldRtasSnapshotId = + loadTable("paimon_db", "rtas").snapshotManager().latestSnapshotId() + spark.sql(""" + |CREATE OR REPLACE TABLE rtas + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3') + |AS SELECT * FROM source + |""".stripMargin) + + val replacedAsSelect = loadTable("paimon_db", "rtas") + Assertions.assertEquals(oldLocation, replacedAsSelect.location().toString) + Assertions.assertEquals("3", replacedAsSelect.options().get("bucket")) + checkAnswer(spark.sql("SELECT * FROM rtas"), Row(2L, "new") :: Nil) + checkAnswer( + spark.sql(s"SELECT id, data FROM rtas VERSION AS OF $oldRtasSnapshotId"), + Row(1L, "old") :: Nil) + } + } + } + + test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon replace fallback") { + assume(gteqSpark3_4) + spark.sql(s"USE $sparkCatalogName") + withDatabase("paimon_db") { + spark.sql("CREATE DATABASE paimon_db") + spark.sql("USE paimon_db") + + withTable("csv_to_paimon", "rtas_csv_to_paimon") { + spark.sql("CREATE TABLE csv_to_paimon (id BIGINT, data STRING) USING csv") + spark.sql("INSERT INTO csv_to_paimon VALUES (1, 'csv')") + + spark.sql(""" + |REPLACE TABLE csv_to_paimon (id BIGINT, name STRING) + |USING paimon + |TBLPROPERTIES ('bucket' = '-1') + |""".stripMargin) + + val paimonTable = loadTable("paimon_db", "csv_to_paimon") + Assertions.assertEquals("-1", paimonTable.options().get("bucket")) + Assertions.assertEquals( + Seq("id", "name"), + spark.table("csv_to_paimon").schema.fieldNames.toSeq) + checkAnswer(spark.sql("SELECT * FROM csv_to_paimon"), Seq.empty[Row]) + + Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("provider_source") + spark.sql(""" + |CREATE TABLE rtas_csv_to_paimon (id BIGINT, data STRING) + |USING csv + |""".stripMargin) + + spark.sql(""" + |CREATE OR REPLACE TABLE rtas_csv_to_paimon + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3') + |AS SELECT * FROM provider_source + |""".stripMargin) + + val rtasPaimonTable = loadTable("paimon_db", "rtas_csv_to_paimon") + Assertions.assertEquals("3", rtasPaimonTable.options().get("bucket")) + checkAnswer(spark.sql("SELECT * FROM rtas_csv_to_paimon"), Row(2L, "new") :: Nil) + } + } + } + + test("Paimon DDL with hive catalog: SparkGenericCatalog CTAS with non-Paimon provider") { + assume(gteqSpark3_4) + spark.sql(s"USE $sparkCatalogName") + withDatabase("paimon_db") { + spark.sql("CREATE DATABASE paimon_db") + spark.sql("USE paimon_db") + + withTable("csv_ctas") { + Seq((1L, "x1"), (2L, "x2")).toDF("id", "data").createOrReplaceTempView("source") + spark.sql("CREATE TABLE csv_ctas USING csv AS SELECT * FROM source") + + val csvTable = spark.sessionState.catalog.getTableMetadata( + TableIdentifier("csv_ctas", Some("paimon_db"))) + Assertions.assertTrue(csvTable.provider.contains("csv")) + checkAnswer( + spark.sql("SELECT * FROM csv_ctas ORDER BY id"), + Row(1L, "x1") :: Row(2L, "x2") :: Nil) + } + } + } + test("Paimon DDL with hive catalog: Create Table As Select") { Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { catalogName => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala index b25e41a3fb42..01def579efef 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala @@ -183,6 +183,92 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { } } + test("Paimon dataframe: writer v2 replace") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + + val oldLocation = loadTable("t").location().toString + val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId() + + spark + .range(2, 4) + .selectExpr("id", "concat('v', cast(id as string)) AS data") + .writeTo("t") + .using("paimon") + .tableProperty("primary-key", "id") + .tableProperty("bucket", "3") + .replace() + + val table = loadTable("t") + Assertions.assertEquals("3", table.options().get("bucket")) + Assertions.assertEquals(oldLocation, table.location().toString) + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(2L, "v2") :: Row(3L, "v3") :: Nil) + checkAnswer(sql(s"SELECT * FROM t VERSION AS OF $oldSnapshotId"), Row(1L, "old") :: Nil) + } + } + + test("Paimon dataframe: writer v2 create fails when table exists") { + assume(gteqSpark3_4) + withTable("t") { + sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon") + + val error = intercept[Exception] { + spark + .range(2) + .selectExpr("id", "concat('v', cast(id as string)) AS data") + .writeTo("t") + .using("paimon") + .create() + }.getMessage + + Assertions.assertTrue( + error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") || error.contains("already exists")) + } + } + + test("Paimon dataframe: writer v2 create or replace") { + assume(gteqSpark3_4) + withTable("t") { + spark + .range(2) + .selectExpr("id", "concat('v', cast(id as string)) AS data") + .writeTo("t") + .using("paimon") + .tableProperty("primary-key", "id") + .tableProperty("bucket", "2") + .createOrReplace() + + val createdLocation = loadTable("t").location().toString + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(0L, "v0") :: Row(1L, "v1") :: Nil) + + spark + .range(3, 5) + .selectExpr( + "id", + "concat('v', cast(id as string)) AS data", + "concat('n', cast(id as string)) AS note") + .writeTo("t") + .using("paimon") + .tableProperty("primary-key", "id") + .tableProperty("bucket", "4") + .createOrReplace() + + val table = loadTable("t") + Assertions.assertEquals(Seq("id", "data", "note"), spark.table("t").schema.fieldNames.toSeq) + Assertions.assertEquals("4", table.options().get("bucket")) + Assertions.assertEquals(createdLocation, table.location().toString) + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Row(3L, "v3", "n3") :: Row(4L, "v4", "n4") :: Nil) + } + } + test("Paimon: DataFrameWrite partition table") { withTable("t") { spark.sql(s""" diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala index d031f7e60c94..503005f247e8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -35,6 +35,8 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec abstract class FormatTableTestBase extends PaimonHiveTestBase with AdaptiveSparkPlanHelper { + import testImplicits._ + override protected def beforeEach(): Unit = { sql(s"USE $paimonHiveCatalogName") sql(s"USE $hiveDbName") @@ -158,6 +160,48 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase with AdaptiveSpark } } + test("Format table: create or replace as select supports table type change") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source") + + sql(""" + |CREATE OR REPLACE TABLE t + |USING csv + |AS SELECT * FROM source + |""".stripMargin) + + assert(paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).isInstanceOf[FormatTable]) + checkAnswer(sql("SELECT * FROM t"), Seq(Row(2L, "new"))) + } + } + + test("Format table: replace table supports table type change") { + assume(gteqSpark3_4) + withTable("t") { + sql(""" + |CREATE TABLE t (id BIGINT, data STRING) + |USING paimon + |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 'old')") + + sql(""" + |REPLACE TABLE t (id BIGINT, data STRING) + |USING csv + |""".stripMargin) + + assert(paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).isInstanceOf[FormatTable]) + checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) + } + } + test("Format table: read compressed files") { for (format <- Seq("csv", "json")) { withTable("compress_t") { diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index c71b3df9237e..1798b12410c2 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -32,17 +32,19 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith, UpdateAction} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction} // NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We access them only via // reflection inside the `mergeRowsKeep*` method bodies so that loading `Spark3Shim` does not fail // on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the module targets 3.5.8 at // compile time but must also run on 3.2 / 3.3). import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, ResolveDefaultColumns} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.SparkFormatTable +import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan} import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.internal.SQLConf @@ -86,6 +88,103 @@ class Spark3Shim extends SparkShim { tableCatalog.createTable(ident, schema, partitions, properties) } + override def createReplaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + ReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createReplaceTableExec( + catalog: TableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + ReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableExec( + catalog: StagingTableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def toReplaceTableColumns( + tableSchema: StructType, + schemaOrColumns: Any, + catalog: TableCatalog, + ident: Identifier): Array[Column] = { + val statementType = "CREATE TABLE" + val schema = schemaOrColumns.asInstanceOf[StructType] + ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog, ident) + val newSchema = + ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(schema, statementType) + GeneratedColumn.validateGeneratedColumns(newSchema, catalog, ident, statementType) + structTypeToV2Columns(newSchema) + } + + override def copyTableSpec( + tableSpec: TableSpec, + additionalProperties: Map[String, String], + location: Option[String]): TableSpec = { + tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, location = location) + } + + private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: Identifier): Unit = { + tableCatalog.invalidateTable(ident) + } + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index b66b896927a2..516c02a09fec 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -32,14 +32,15 @@ import org.apache.spark.sql.catalyst.analysis.CTESubstitution import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction} import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert, Keep, Update} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} +import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, IdentityColumn, ResolveDefaultColumns} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.SparkFormatTable +import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan} import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink @@ -86,6 +87,102 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def createReplaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + ReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableAsSelectExec( + catalog, + ident, + partitioning, + query, + tableSpec, + writeOptions, + orCreate = orCreate, + invalidateCache) + } + + override def createReplaceTableExec( + catalog: TableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + ReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def createAtomicReplaceTableExec( + catalog: StagingTableCatalog, + ident: Identifier, + columns: Array[Column], + partitioning: Seq[Transform], + tableSpec: TableSpec, + orCreate: Boolean): SparkPlan = { + AtomicReplaceTableExec( + catalog, + ident, + columns, + partitioning, + tableSpec, + orCreate = orCreate, + invalidateCache) + } + + override def toReplaceTableColumns( + tableSchema: StructType, + schemaOrColumns: Any, + catalog: TableCatalog, + ident: Identifier): Array[Column] = { + val statementType = "REPLACE TABLE" + val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]] + ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog, ident) + GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident, statementType) + IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident) + columns.map(_.toV2Column(statementType)).toArray + } + + override def copyTableSpec( + tableSpec: TableSpec, + additionalProperties: Map[String, String], + location: Option[String]): TableSpec = { + tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, location = location) + } + + private def invalidateCache(tableCatalog: TableCatalog, ident: Identifier): Unit = { + tableCatalog.invalidateTable(ident) + } + override def createCTERelationRef( cteId: Long, resolved: Boolean, From 984642535416ce0439fa3e121e6b868d341622e6 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 15 May 2026 13:24:23 +0800 Subject: [PATCH 2/3] update --- .../PaimonCreateTableAsSelectStrategy.scala | 4 +-- .../PaimonCreateTableAsSelectStrategy.scala | 2 +- .../PaimonCreateTableAsSelectStrategy.scala | 2 +- .../PaimonReplaceTableAsSelectStrategy.scala | 2 +- .../sql/execution/PaimonStrategyHelper.scala | 16 +++++++++ .../PaimonCreateTableAsSelectStrategy.scala | 2 +- .../PaimonReplaceTableAsSelectStrategy.scala | 2 +- .../shim/PaimonTableOptionUtils.scala | 36 ------------------- 8 files changed, 23 insertions(+), 43 deletions(-) delete mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 87ff01c3c74e..96f194668f09 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -24,7 +24,7 @@ import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} import org.apache.spark.sql.connector.catalog.CatalogV2Util -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -41,7 +41,7 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate props, options, ifNotExists) => - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions val isPartitionedFormatTable = { diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 3bf90c7e3943..42d6fe28160f 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -44,7 +44,7 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) tableSpec: TableSpec, options, ifNotExists) => - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) val isPartitionedFormatTable = { diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 8fa9a2f2445c..14f540f3e171 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -46,7 +46,7 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) ifNotExists, analyzedQuery) => assert(analyzedQuery.isDefined) - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) val isPartitionedFormatTable = { diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala index 4bb4c0589951..b741627cdbbb 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -51,7 +51,7 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) orCreate, analyzedQuery) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => assert(analyzedQuery.isDefined) - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava) if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala index 09cb26bd7bc8..2ee33a182984 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala @@ -18,12 +18,17 @@ package org.apache.spark.sql.execution +import org.apache.paimon.CoreOptions +import org.apache.paimon.iceberg.IcebergOptions + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.paimon.shims.SparkShimLoader +import scala.collection.JavaConverters._ + trait PaimonStrategyHelper { def spark: SparkSession @@ -44,3 +49,14 @@ trait PaimonStrategyHelper { tableSpec.location.map(makeQualifiedDBObjectPath)) } } + +object PaimonStrategyHelper { + private val tableOptionKeys: Set[String] = + (CoreOptions.getOptions.asScala.map(_.key()) ++ IcebergOptions.getOptions.asScala.map( + _.key())).toSet + + def splitTableAndWriteOptions( + options: Map[String, String]): (Map[String, String], Map[String, String]) = { + options.partition { case (key, _) => tableOptionKeys.contains(key) } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 34caa3f7a119..cc0c12960cfb 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -42,7 +42,7 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) options, ifNotExists, true) => - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) val isPartitionedFormatTable = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala index d107397b20a9..60410b631eb5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -46,7 +46,7 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) options, orCreate, true) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) => - val (tableOptions, writeOptions) = PaimonTableOptionUtils.splitTableAndWriteOptions(options) + val (tableOptions, writeOptions) = PaimonStrategyHelper.splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { SparkShimLoader.shim.createAtomicReplaceTableAsSelectExec( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala deleted file mode 100644 index c6507062ec1c..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonTableOptionUtils.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.shim - -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions - -import scala.collection.JavaConverters._ - -private[shim] object PaimonTableOptionUtils { - - private val tableOptionKeys = - (CoreOptions.getOptions.asScala.map(_.key()) ++ IcebergOptions.getOptions.asScala.map( - _.key())).toSet - - def splitTableAndWriteOptions( - options: Map[String, String]): (Map[String, String], Map[String, String]) = { - options.partition { case (key, _) => tableOptionKeys.contains(key) } - } -} From a3922a18da95ef20ab5920436ff153148521e498 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 15 May 2026 16:09:56 +0800 Subject: [PATCH 3/3] Add rest test --- .../java/org/apache/paimon/rest/RESTApi.java | 19 ++++++++ .../org/apache/paimon/rest/ResourcePaths.java | 11 +++++ .../rest/requests/ReplaceTableRequest.java | 47 +++++++++++++++++++ .../org/apache/paimon/rest/RESTCatalog.java | 21 ++++++++- .../paimon/catalog/CatalogTestBase.java | 21 --------- .../apache/paimon/rest/RESTCatalogServer.java | 39 +++++++++++++++ .../apache/paimon/rest/RESTCatalogTest.java | 3 +- 7 files changed, 136 insertions(+), 25 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 88761ca5f06a..7433a30388f8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -51,6 +51,7 @@ import org.apache.paimon.rest.requests.RegisterTableRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; +import org.apache.paimon.rest.requests.ReplaceTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; import org.apache.paimon.rest.requests.RollbackSchemaRequest; import org.apache.paimon.rest.requests.RollbackTableRequest; @@ -778,6 +779,24 @@ public void alterTable(Identifier identifier, List changes) { restAuthFunction); } + /** + * Replace table. + * + * @param identifier database name and table name. + * @param schema schema to replace table + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public void replaceTable(Identifier identifier, Schema schema) { + ReplaceTableRequest request = new ReplaceTableRequest(schema); + client.post( + resourcePaths.replaceTable( + identifier.getDatabaseName(), identifier.getObjectName()), + request, + restAuthFunction); + } + /** * Auth table query. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 6d1d16c82f19..66a1653232b8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -98,6 +98,17 @@ public String renameTable() { return SLASH.join(V1, prefix, TABLES, "rename"); } + public String replaceTable(String databaseName, String objectName) { + return SLASH.join( + V1, + prefix, + DATABASES, + encodeString(databaseName), + TABLES, + encodeString(objectName), + "replace"); + } + public String commitTable(String databaseName, String objectName) { return SLASH.join( V1, diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java new file mode 100644 index 000000000000..526724a3137f --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.rest.RESTRequest; +import org.apache.paimon.schema.Schema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for replacing table. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ReplaceTableRequest implements RESTRequest { + + private static final String FIELD_SCHEMA = "schema"; + + @JsonProperty(FIELD_SCHEMA) + private final Schema schema; + + @JsonCreator + public ReplaceTableRequest(@JsonProperty(FIELD_SCHEMA) Schema schema) { + this.schema = schema; + } + + @JsonGetter(FIELD_SCHEMA) + public Schema getSchema() { + return schema; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index b514ef4d618f..0eaa95b81548 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -586,8 +586,25 @@ public void alterTable( } @Override - public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) { - throw new UnsupportedOperationException("RESTCatalog does not support replaceTable yet."); + public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + checkNotBranch(identifier, "replaceTable"); + checkNotSystemTable(identifier, "replaceTable"); + validateCreateTable(newSchema, dataTokenEnabled); + try { + tableDefaultOptions.forEach(newSchema.options()::putIfAbsent); + api.replaceTable(identifier, newSchema); + } catch (NoSuchResourceException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(identifier); + } + } catch (NotImplementedException e) { + throw new UnsupportedOperationException(e.getMessage()); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } catch (BadRequestException e) { + throw new IllegalArgumentException(e.getMessage()); + } } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 66f363a43ec6..07539d914f4b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -1243,27 +1243,6 @@ public void testReplaceTable() throws Exception { .isThrownBy(() -> catalog.replaceTable(identifier, changePartitionKeys, false)) .withMessageContaining("partition keys"); - if (supportsFormatTable()) { - Schema formatSchema = - Schema.newBuilder() - .column("id", DataTypes.INT()) - .column("name", DataTypes.STRING()) - .option(TYPE.key(), TableType.FORMAT_TABLE.toString()) - .option(CoreOptions.FILE_FORMAT.key(), "csv") - .build(); - catalog.replaceTable(identifier, formatSchema, false); - assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class); - - Schema paimonSchema = - Schema.newBuilder() - .column("id", DataTypes.INT()) - .column("name", DataTypes.STRING()) - .option("bucket", "-1") - .build(); - catalog.replaceTable(identifier, paimonSchema, false); - assertThat(catalog.getTable(identifier)).isInstanceOf(FileStoreTable.class); - } - // ignoreIfNotExists = true: missing table is silently skipped Identifier missing = Identifier.create("replace_db", "missing"); catalog.replaceTable(missing, newSchema, true); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 51d2f0cf137f..74e4f4e465a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -63,6 +63,7 @@ import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; +import org.apache.paimon.rest.requests.ReplaceTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; import org.apache.paimon.rest.requests.RollbackSchemaRequest; import org.apache.paimon.rest.requests.RollbackTableRequest; @@ -429,6 +430,10 @@ && isTableByIdRequest(request.getPath())) { resources.length == 4 && ResourcePaths.TABLES.equals(resources[1]) && "rollback-schema".equals(resources[3]); + boolean isReplaceTable = + resources.length == 4 + && ResourcePaths.TABLES.equals(resources[1]) + && "replace".equals(resources[3]); boolean isPartitions = resources.length == 4 && ResourcePaths.TABLES.equals(resources[1]) @@ -552,6 +557,8 @@ && isTableByIdRequest(request.getPath())) { } } else if (isRollbackSchema) { return rollbackSchemaHandle(identifier, restAuthParameter.data()); + } else if (isReplaceTable) { + return replaceTableHandle(identifier, restAuthParameter.data()); } else if (isTable) { return tableHandle( restAuthParameter.method(), @@ -1754,6 +1761,38 @@ private MockResponse tableHandle(String method, String data, Identifier identifi } } + private MockResponse replaceTableHandle(Identifier identifier, String data) throws Exception { + ReplaceTableRequest requestBody = RESTApi.fromJson(data, ReplaceTableRequest.class); + Schema newSchema = requestBody.getSchema(); + if (!tableMetadataStore.containsKey(identifier.getFullName())) { + throw new Catalog.TableNotExistException(identifier); + } + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + if (isFormatTable(tableMetadata.schema().toSchema()) || isFormatTable(newSchema)) { + throw new UnsupportedOperationException("replaceTable does not support format tables."); + } + catalog.replaceTable(identifier, newSchema, false); + TableSchema replacedSchema = catalog.loadTableSchema(identifier); + TableMetadata newTableMetadata = + createTableMetadata( + identifier, + replacedSchema.id(), + replacedSchema.toSchema(), + tableMetadata.uuid(), + tableMetadata.isExternal()); + tableMetadataStore.put(identifier.getFullName(), newTableMetadata); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + Snapshot truncateSnapshot = table.snapshotManager().latestSnapshot(); + if (truncateSnapshot != null) { + tableLatestSnapshotStore.put( + identifier.getFullName(), new TableSnapshot(truncateSnapshot, 0L, 0L, 0L, 0L)); + } else { + tableLatestSnapshotStore.remove(identifier.getFullName()); + } + tablePartitionsStore.remove(identifier.getFullName()); + return new MockResponse().setResponseCode(200); + } + private MockResponse renameTableHandle(String data) throws Exception { RenameTableRequest requestBody = RESTApi.fromJson(data, RenameTableRequest.class); Identifier fromTable = requestBody.getSource(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 00da79046ac4..8cefa2ec7b42 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -3173,10 +3173,9 @@ protected boolean supportsAlterDatabase() { return true; } - // TODO: implement this @Override protected boolean supportsReplaceTable() { - return false; + return true; } // TODO implement this