diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index c26eeedd3f62..87bdfeffba89 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -81,6 +81,16 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { if d.resolved => PaimonDropPartitions.validate(table, parts.asResolvedPartitionSpecs) d + + case r: ReplaceColumns if r.resolved && isPaimonTable(r.table) => + // Spark rewrites REPLACE COLUMNS into a batch that drops every existing column and re-adds + // the new set. Re-adding columns assigns brand-new field ids while existing data files keep + // the old ids, so same-named columns are read back as null, silently corrupting data. Reject + // it here, before the change batch reaches the catalog where it is indistinguishable from an + // ordinary drop+add. + throw new UnsupportedOperationException( + "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " + + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, and ADD COLUMN instead.") } private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String, String] = { diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 7afe3c76cf78..4e15b0880bde 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -248,6 +248,23 @@ public void testDropColumns() { .contains(showCreateString("testDropColumns", "a INT NOT NULL")); } + @Test + public void testReplaceColumnsUnsupported() { + createTable("testReplaceColumnsUnsupported"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE testReplaceColumnsUnsupported REPLACE COLUMNS " + + "(a BIGINT, bb STRING, c STRING)")) + .satisfies( + anyCauseMatches( + UnsupportedOperationException.class, + "ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. " + + "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, " + + "and ADD COLUMN instead.")); + } + @Test public void testDropPartitionKey() { spark.sql(