From d1d7c96473d01d7c39cf69c7f794596dd0cb9eef Mon Sep 17 00:00:00 2001 From: huangxiaoping <1754789345@qq.com> Date: Tue, 16 Jun 2026 12:01:25 +0800 Subject: [PATCH 1/3] [spark] Reject ALTER TABLE REPLACE COLUMNS to avoid silent data corruption Spark translates REPLACE COLUMNS into a DeleteColumn + AddColumn batch. Re-adding columns assigns new field ids while existing data files keep the old ids, so same-named columns are read back as null. Detect this pattern and throw UnsupportedOperationException instead. --- .../org/apache/paimon/spark/SparkCatalog.java | 41 ++++++++++++++++++- .../spark/SparkSchemaEvolutionITCase.java | 17 ++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 165be98980d9..f2be68e10b46 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -350,9 +350,16 @@ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTable @Override public org.apache.spark.sql.connector.catalog.Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException { - List schemaChanges = - Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); try { + if (isReplaceColumns(changes)) { + throw new UnsupportedOperationException( + "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " + + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, " + + "and ADD COLUMN instead."); + } + + List schemaChanges = + Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false); return loadTable(ident); } catch (Catalog.TableNotExistException e) { @@ -362,6 +369,36 @@ public org.apache.spark.sql.connector.catalog.Table alterTable( } } + /** + * Detects whether the given changes originate from an {@code ALTER TABLE ... REPLACE COLUMNS} + * statement. + * + *

Spark translates {@code REPLACE COLUMNS} into a batch that drops every existing column and + * re-adds the new set, i.e. a combination of {@link TableChange.DeleteColumn} and {@link + * TableChange.AddColumn} only. Other column changes such as rename or type update are never + * produced by {@code REPLACE COLUMNS}, so we match exclusively on these two types to avoid + * mistaking a legitimate mixed batch (e.g. a programmatic DROP + RENAME) for a replace. + * + *

This operation must be rejected because re-adding columns assigns brand-new field ids + * while existing data files keep the old ids; same-named columns would then be treated as new + * columns and read back as null, silently corrupting data. + */ + private boolean isReplaceColumns(TableChange[] changes) { + boolean hasDeleteColumn = false; + boolean hasAddColumn = false; + for (TableChange change : changes) { + if (change instanceof TableChange.DeleteColumn) { + hasDeleteColumn = true; + } else if (change instanceof TableChange.AddColumn) { + hasAddColumn = true; + } else { + return false; + } + } + + return hasDeleteColumn && hasAddColumn; + } + @Override public org.apache.spark.sql.connector.catalog.Table createTable( Identifier ident, diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 7afe3c76cf78..4e15b0880bde 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -248,6 +248,23 @@ public void testDropColumns() { .contains(showCreateString("testDropColumns", "a INT NOT NULL")); } + @Test + public void testReplaceColumnsUnsupported() { + createTable("testReplaceColumnsUnsupported"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE testReplaceColumnsUnsupported REPLACE COLUMNS " + + "(a BIGINT, bb STRING, c STRING)")) + .satisfies( + anyCauseMatches( + UnsupportedOperationException.class, + "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " + + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, " + + "and ADD COLUMN instead.")); + } + @Test public void testDropPartitionKey() { spark.sql( From aae387879ee35c17c536e3ae65fe99611f137cf7 Mon Sep 17 00:00:00 2001 From: huangxiaoping <1754789345@qq.com> Date: Wed, 17 Jun 2026 17:40:34 +0800 Subject: [PATCH 2/3] [spark] reject replace columns by resolve rule --- .../org/apache/paimon/spark/SparkCatalog.java | 41 +------------------ .../catalyst/analysis/PaimonAnalysis.scala | 10 +++++ .../spark/SparkSchemaEvolutionITCase.java | 19 +++++++++ 3 files changed, 31 insertions(+), 39 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index f2be68e10b46..165be98980d9 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -350,16 +350,9 @@ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTable @Override public org.apache.spark.sql.connector.catalog.Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException { + List schemaChanges = + Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); try { - if (isReplaceColumns(changes)) { - throw new UnsupportedOperationException( - "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " - + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, " - + "and ADD COLUMN instead."); - } - - List schemaChanges = - Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false); return loadTable(ident); } catch (Catalog.TableNotExistException e) { @@ -369,36 +362,6 @@ public org.apache.spark.sql.connector.catalog.Table alterTable( } } - /** - * Detects whether the given changes originate from an {@code ALTER TABLE ... REPLACE COLUMNS} - * statement. - * - *

Spark translates {@code REPLACE COLUMNS} into a batch that drops every existing column and - * re-adds the new set, i.e. a combination of {@link TableChange.DeleteColumn} and {@link - * TableChange.AddColumn} only. Other column changes such as rename or type update are never - * produced by {@code REPLACE COLUMNS}, so we match exclusively on these two types to avoid - * mistaking a legitimate mixed batch (e.g. a programmatic DROP + RENAME) for a replace. - * - *

This operation must be rejected because re-adding columns assigns brand-new field ids - * while existing data files keep the old ids; same-named columns would then be treated as new - * columns and read back as null, silently corrupting data. - */ - private boolean isReplaceColumns(TableChange[] changes) { - boolean hasDeleteColumn = false; - boolean hasAddColumn = false; - for (TableChange change : changes) { - if (change instanceof TableChange.DeleteColumn) { - hasDeleteColumn = true; - } else if (change instanceof TableChange.AddColumn) { - hasAddColumn = true; - } else { - return false; - } - } - - return hasDeleteColumn && hasAddColumn; - } - @Override public org.apache.spark.sql.connector.catalog.Table createTable( Identifier ident, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index c26eeedd3f62..87bdfeffba89 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -81,6 +81,16 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { if d.resolved => PaimonDropPartitions.validate(table, parts.asResolvedPartitionSpecs) d + + case r: ReplaceColumns if r.resolved && isPaimonTable(r.table) => + // Spark rewrites REPLACE COLUMNS into a batch that drops every existing column and re-adds + // the new set. Re-adding columns assigns brand-new field ids while existing data files keep + // the old ids, so same-named columns are read back as null, silently corrupting data. Reject + // it here, before the change batch reaches the catalog where it is indistinguishable from an + // ordinary drop+add. + throw new UnsupportedOperationException( + "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " + + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, and ADD COLUMN instead.") } private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String, String] = { diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 4e15b0880bde..685e24603a7a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -265,6 +265,25 @@ public void testReplaceColumnsUnsupported() { + "and ADD COLUMN instead.")); } + @Test + public void testDropAndAddColumnsInOneBatch() { + createTable("testDropAndAddColumnsInOneBatch"); + + // A batch that drops some (but not all) existing columns and adds new ones is an ordinary + // combination of schema changes, not REPLACE COLUMNS, so it must still be allowed. + spark.sql("ALTER TABLE testDropAndAddColumnsInOneBatch DROP COLUMN b, ADD COLUMN d STRING"); + + List afterAlter = + spark.sql("SHOW CREATE TABLE testDropAndAddColumnsInOneBatch").collectAsList(); + assertThat(afterAlter.toString()) + .contains( + showCreateString( + "testDropAndAddColumnsInOneBatch", + "a INT NOT NULL", + "c STRING", + "d STRING")); + } + @Test public void testDropPartitionKey() { spark.sql( From 49bab28a786a63104d506be114f0027049baf838 Mon Sep 17 00:00:00 2001 From: huangxiaoping <1754789345@qq.com> Date: Wed, 17 Jun 2026 18:40:23 +0800 Subject: [PATCH 3/3] [spark] Fix test case --- .../spark/SparkSchemaEvolutionITCase.java | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 685e24603a7a..4e15b0880bde 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -265,25 +265,6 @@ public void testReplaceColumnsUnsupported() { + "and ADD COLUMN instead.")); } - @Test - public void testDropAndAddColumnsInOneBatch() { - createTable("testDropAndAddColumnsInOneBatch"); - - // A batch that drops some (but not all) existing columns and adds new ones is an ordinary - // combination of schema changes, not REPLACE COLUMNS, so it must still be allowed. - spark.sql("ALTER TABLE testDropAndAddColumnsInOneBatch DROP COLUMN b, ADD COLUMN d STRING"); - - List afterAlter = - spark.sql("SHOW CREATE TABLE testDropAndAddColumnsInOneBatch").collectAsList(); - assertThat(afterAlter.toString()) - .contains( - showCreateString( - "testDropAndAddColumnsInOneBatch", - "a INT NOT NULL", - "c STRING", - "d STRING")); - } - @Test public void testDropPartitionKey() { spark.sql(