From 37b3f520b78e20ba1e32ac144b0071e52d274215 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 2 Apr 2026 21:08:19 +0800 Subject: [PATCH 1/5] reproduce row id bug --- .../spark/sql/RowTrackingTestBase.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 0ea9d21bcf2a..f28ef4fb4ef1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -952,4 +952,30 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { assert(!indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p1"))) } } + + test("Data Evolution: self merge via row_tracking system table") { + withTable("target") { + sql( + "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO target VALUES (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3')") + + // Add a new column and backfill via $row_tracking system table + sql("ALTER TABLE target ADD COLUMNS (status STRING)") + + sql(s""" + |MERGE INTO target AS t + |USING `target$$row_tracking` AS s + |ON t._ROW_ID = s._ROW_ID + |WHEN MATCHED THEN UPDATE SET t.status = 'active' + |""".stripMargin) + + checkAnswer( + sql("SELECT a, b, c, status FROM target ORDER BY a"), + Seq( + Row(1, 10, "c1", "active"), + Row(2, 20, "c2", "active"), + Row(3, 30, "c3", "active")) + ) + } + } } From 0c07f04e130112ff9e15788697e92f6a7cea2409 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 5 Apr 2026 15:08:52 +0800 Subject: [PATCH 2/5] [spark] Add _ROW_ID existing check before adding _ROW_ID --- .../src/main/scala/org/apache/paimon/spark/read/BaseScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala index 42f724d3e692..b1a60d05ecad 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala @@ -68,7 +68,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging { val coreOptions: CoreOptions = CoreOptions.fromMap(table.options()) lazy val tableRowType: RowType = { - if (coreOptions.rowTrackingEnabled()) { + if (coreOptions.rowTrackingEnabled() && !table.rowType().containsField(SpecialFields.ROW_ID.name())) { SpecialFields.rowTypeWithRowTracking(table.rowType()) } else { table.rowType() From dffb3a17269298b7233b5320e8eefd96cd82c41b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 5 Apr 2026 15:14:52 +0800 Subject: [PATCH 3/5] fix code format --- .../main/scala/org/apache/paimon/spark/read/BaseScan.scala | 5 ++++- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 5 +---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala index b1a60d05ecad..3237475d69e0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala @@ -68,7 +68,10 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging { val coreOptions: CoreOptions = CoreOptions.fromMap(table.options()) lazy val tableRowType: RowType = { - if (coreOptions.rowTrackingEnabled() && !table.rowType().containsField(SpecialFields.ROW_ID.name())) { + if ( + coreOptions + .rowTrackingEnabled() && !table.rowType().containsField(SpecialFields.ROW_ID.name()) + ) { SpecialFields.rowTypeWithRowTracking(table.rowType()) } else { table.rowType() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index f28ef4fb4ef1..6d00ed4d0959 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -971,10 +971,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { checkAnswer( sql("SELECT a, b, c, status FROM target ORDER BY a"), - Seq( - Row(1, 10, "c1", "active"), - Row(2, 20, "c2", "active"), - Row(3, 30, "c3", "active")) + Seq(Row(1, 10, "c1", "active"), Row(2, 20, "c2", "active"), Row(3, 30, "c3", "active")) ) } } From 22b61ad45f8c76ae776230bf02941f60d04649ff Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 14 May 2026 23:57:48 +0800 Subject: [PATCH 4/5] Remove self-merge test case, to be moved to a separate PR --- .../spark/sql/RowTrackingTestBase.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 6d00ed4d0959..94b11b8c8158 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -953,26 +953,4 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - test("Data Evolution: self merge via row_tracking system table") { - withTable("target") { - sql( - "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO target VALUES (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3')") - - // Add a new column and backfill via $row_tracking system table - sql("ALTER TABLE target ADD COLUMNS (status STRING)") - - sql(s""" - |MERGE INTO target AS t - |USING `target$$row_tracking` AS s - |ON t._ROW_ID = s._ROW_ID - |WHEN MATCHED THEN UPDATE SET t.status = 'active' - |""".stripMargin) - - checkAnswer( - sql("SELECT a, b, c, status FROM target ORDER BY a"), - Seq(Row(1, 10, "c1", "active"), Row(2, 20, "c2", "active"), Row(3, 30, "c3", "active")) - ) - } - } } From 07053f11e13d6565a7d20d4f7efb8af33ec4af03 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 15 May 2026 00:30:15 +0800 Subject: [PATCH 5/5] add test case for querying row_tracking system table --- .../apache/paimon/spark/sql/RowTrackingTestBase.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 94b11b8c8158..122caedc04a7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -953,4 +953,15 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } + test("query row_tracking system table should not throw duplicate _ROW_ID error") { + withTable("T") { + sql( + "CREATE TABLE T (a INT, b STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") + checkAnswer( + sql("SELECT a, b FROM `T$row_tracking` ORDER BY a"), + Seq(Row(1, "a"), Row(2, "b")) + ) + } + } }