Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}

class V2UpdateTableTest extends UpdateTableTestBase {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}

class V2UpdateTableTest extends UpdateTableTestBase {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule

import scala.collection.JavaConverters._
Expand All @@ -42,8 +42,6 @@ object PaimonUpdateTable
table.getTable match {
case paimonTable: FileStoreTable =>
val relation = PaimonRelation.getPaimonRelation(u.table)
val alignedExpressions =
generateAlignedExpressions(relation.output, assignments).zip(relation.output)

val primaryKeys = paimonTable.primaryKeys().asScala.toSeq
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
Expand All @@ -55,11 +53,24 @@ object PaimonUpdateTable
"Update operation is not supported when data evolution is enabled yet.")
}

UpdatePaimonTableCommand(
relation,
paimonTable,
condition.getOrElse(TrueLiteral),
alignedExpressions)
val alignedExpressions =
generateAlignedExpressions(relation.output, assignments).zip(relation.output)

val alignedAssignments = alignedExpressions.map {
case (expression, field) => Assignment(field, expression)
}

val alignedUpdateTable = u.copy(assignments = alignedAssignments)

if (!shouldFallbackToV1Update(table, alignedUpdateTable)) {
alignedUpdateTable
} else {
UpdatePaimonTableCommand(
relation,
paimonTable,
condition.getOrElse(TrueLiteral),
alignedExpressions)
}

case _ =>
throw new RuntimeException("Update Operation is only supported for FileStoreTable.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.apache.paimon.table.{FileStoreTable, Table}

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}

trait RowLevelHelper extends SQLConfHelper {
trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {

val operation: RowLevelOp

Expand Down Expand Up @@ -95,4 +95,11 @@ trait RowLevelHelper extends SQLConfHelper {
table.getTable.asInstanceOf[FileStoreTable],
condition)
}

/** Determines if DataSourceV2 update is not supported for the given table. */
protected def shouldFallbackToV1Update(table: SparkTable, updateTable: UpdateTable): Boolean = {
shouldFallbackToV1(table) ||
!updateTable.rewritable ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateTable.rewritable should always be true here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron When the table contains a CHAR column, it returns false.

!updateTable.aligned
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ case class PaimonBatchWrite(

private def buildDeletedCommitMessage(
deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files")
deletedFiles
.groupBy(f => (f.partition, f.bucket))
.map {
Expand Down
Loading