From dbea63220c6bcf22e7d8461bc30c9617607d4b21 Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Wed, 21 Jan 2026 10:57:13 +0800 Subject: [PATCH] [spark] Support UPDATE on Paimon append-only table in spark V2 write --- .../paimon/spark/sql/UpdateTableTest.scala | 6 +++++ .../paimon/spark/sql/UpdateTableTest.scala | 6 +++++ .../catalyst/analysis/PaimonDeleteTable.scala | 3 --- .../catalyst/analysis/PaimonUpdateTable.scala | 27 +++++++++++++------ .../catalyst/analysis/RowLevelHelper.scala | 11 ++++++-- .../paimon/spark/write/PaimonBatchWrite.scala | 1 + 6 files changed, 41 insertions(+), 13 deletions(-) diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala index bcc07bcb9372..3a0f56cd4820 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase { super.sparkConf.set("spark.paimon.write.use-v2-write", "false") } } + +class V2UpdateTableTest extends UpdateTableTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala index bcc07bcb9372..3a0f56cd4820 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase { super.sparkConf.set("spark.paimon.write.use-v2-write", "false") } } + +class V2UpdateTableTest extends UpdateTableTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala index 46297ec7a696..7c9aaddc243a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala @@ -18,12 +18,9 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand import org.apache.paimon.table.FileStoreTable -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index a60647eb5f0e..27206fb32ffa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -22,7 +22,7 @@ import org.apache.paimon.spark.commands.UpdatePaimonTableCommand import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import scala.collection.JavaConverters._ @@ -42,8 +42,6 @@ object PaimonUpdateTable table.getTable match { case paimonTable: FileStoreTable => val relation = PaimonRelation.getPaimonRelation(u.table) - val alignedExpressions = - generateAlignedExpressions(relation.output, assignments).zip(relation.output) val primaryKeys = paimonTable.primaryKeys().asScala.toSeq if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) { @@ -55,11 +53,24 @@ object PaimonUpdateTable "Update operation is not supported when data evolution is enabled yet.") } - UpdatePaimonTableCommand( - relation, - paimonTable, - condition.getOrElse(TrueLiteral), - alignedExpressions) + val alignedExpressions = + generateAlignedExpressions(relation.output, assignments).zip(relation.output) + + val alignedAssignments = alignedExpressions.map { + case (expression, field) => Assignment(field, expression) + } + + val alignedUpdateTable = u.copy(assignments = alignedAssignments) + + if (!shouldFallbackToV1Update(table, alignedUpdateTable)) { + alignedUpdateTable + } else { + UpdatePaimonTableCommand( + relation, + paimonTable, + condition.getOrElse(TrueLiteral), + alignedExpressions) + } case _ => throw new RuntimeException("Update Operation is only supported for FileStoreTable.") diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index b41ceed6274e..8ec139b60715 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -24,9 +24,9 @@ import org.apache.paimon.table.{FileStoreTable, Table} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} -trait RowLevelHelper extends SQLConfHelper { +trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper { val operation: RowLevelOp @@ -95,4 +95,11 @@ trait RowLevelHelper extends SQLConfHelper { table.getTable.asInstanceOf[FileStoreTable], condition) } + + /** Determines if DataSourceV2 update is not supported for the given table. */ + protected def shouldFallbackToV1Update(table: SparkTable, updateTable: UpdateTable): Boolean = { + shouldFallbackToV1(table) || + !updateTable.rewritable || + !updateTable.aligned + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index 1b58483e69ef..d546eebf4c1b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -112,6 +112,7 @@ case class PaimonBatchWrite( private def buildDeletedCommitMessage( deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = { + logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files") deletedFiles .groupBy(f => (f.partition, f.bucket)) .map {