diff --git a/docs/docs/concepts/spec/snapshot.md b/docs/docs/concepts/spec/snapshot.md index f9762b0599fa..dd2bf28ed23c 100644 --- a/docs/docs/concepts/spec/snapshot.md +++ b/docs/docs/concepts/spec/snapshot.md @@ -62,3 +62,6 @@ Snapshot File is JSON, it includes: 15. changelogRecordCount: record count of all changelog produced in this snapshot. 16. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark. 17. statistics: stats file name for statistics of this table. +18. properties: additional key-value properties of this snapshot. +19. nextRowId: next row id for row tracking. +20. operation: logical operation type, e.g. WRITE, DELETE, UPDATE, MERGE. Null if not set. diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index 93f525a7e3fb..4a200dfeccdd 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -69,6 +69,7 @@ public class Snapshot implements Serializable { protected static final String FIELD_STATISTICS = "statistics"; protected static final String FIELD_PROPERTIES = "properties"; protected static final String FIELD_NEXT_ROW_ID = "nextRowId"; + protected static final String FIELD_OPERATION = "operation"; // version of snapshot @JsonProperty(FIELD_VERSION) @@ -181,6 +182,11 @@ public class Snapshot implements Serializable { @Nullable protected final Long nextRowId; + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_OPERATION) + @Nullable + protected final Operation operation; + public Snapshot( long id, long schemaId, @@ -202,6 +208,52 @@ public Snapshot( @Nullable String statistics, @Nullable Map properties, @Nullable Long nextRowId) { + this( + id, + schemaId, + baseManifestList, + baseManifestListSize, + deltaManifestList, + deltaManifestListSize, + changelogManifestList, + changelogManifestListSize, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics, + properties, + nextRowId, + null); + } + + public Snapshot( + long id, + long schemaId, + String baseManifestList, + @Nullable Long baseManifestListSize, + String deltaManifestList, + @Nullable Long deltaManifestListSize, + @Nullable String changelogManifestList, + @Nullable Long changelogManifestListSize, + @Nullable String indexManifest, + String commitUser, + long commitIdentifier, + CommitKind commitKind, + long timeMillis, + long totalRecordCount, + long deltaRecordCount, + @Nullable Long changelogRecordCount, + @Nullable Long watermark, + @Nullable String statistics, + @Nullable Map properties, + @Nullable Long nextRowId, + @Nullable Operation operation) { this( CURRENT_VERSION, id, @@ -223,7 +275,55 @@ public Snapshot( watermark, statistics, properties, - nextRowId); + nextRowId, + operation); + } + + public Snapshot( + int version, + long id, + long schemaId, + String baseManifestList, + @Nullable Long baseManifestListSize, + String deltaManifestList, + @Nullable Long deltaManifestListSize, + @Nullable String changelogManifestList, + @Nullable Long changelogManifestListSize, + @Nullable String indexManifest, + String commitUser, + long commitIdentifier, + CommitKind commitKind, + long timeMillis, + long totalRecordCount, + long deltaRecordCount, + @Nullable Long changelogRecordCount, + @Nullable Long watermark, + @Nullable String statistics, + @Nullable Map properties, + @Nullable Long nextRowId) { + this( + version, + id, + schemaId, + baseManifestList, + baseManifestListSize, + deltaManifestList, + deltaManifestListSize, + changelogManifestList, + changelogManifestListSize, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics, + properties, + nextRowId, + null); } @JsonCreator @@ -249,7 +349,8 @@ public Snapshot( @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) @Nullable Map properties, - @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) { + @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_OPERATION) @Nullable Operation operation) { this.version = version; this.id = id; this.schemaId = schemaId; @@ -271,6 +372,7 @@ public Snapshot( this.statistics = statistics; this.properties = properties; this.nextRowId = nextRowId; + this.operation = operation; } @JsonGetter(FIELD_VERSION) @@ -388,6 +490,12 @@ public Long nextRowId() { return nextRowId; } + @JsonGetter(FIELD_OPERATION) + @Nullable + public Operation operation() { + return operation; + } + public String toJson() { return JsonSerdeUtil.toJson(this); } @@ -415,7 +523,8 @@ public int hashCode() { watermark, statistics, properties, - nextRowId); + nextRowId, + operation); } @Override @@ -447,7 +556,8 @@ public boolean equals(Object o) { && Objects.equals(watermark, that.watermark) && Objects.equals(statistics, that.statistics) && Objects.equals(properties, that.properties) - && Objects.equals(nextRowId, that.nextRowId); + && Objects.equals(nextRowId, that.nextRowId) + && operation == that.operation; } /** Type of changes in this snapshot. */ @@ -469,6 +579,19 @@ public enum CommitKind { ANALYZE } + /** Logical operation type that produced this snapshot. */ + public enum Operation { + WRITE, + OVERWRITE, + DELETE, + TRUNCATE, + UPDATE, + MERGE, + CREATE_TABLE_AS_SELECT, + REPLACE_TABLE_AS_SELECT, + CREATE_OR_REPLACE_TABLE_AS_SELECT + } + // =================== Utils for reading ========================= public static Snapshot fromJson(String json) { diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 0ab3429dfc1f..05b76d7153ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -63,7 +63,8 @@ public Changelog(Snapshot snapshot) { snapshot.watermark(), snapshot.statistics(), snapshot.properties, - snapshot.nextRowId); + snapshot.nextRowId, + snapshot.operation); } @JsonCreator @@ -89,7 +90,8 @@ public Changelog( @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) Map properties, - @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) { + @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_OPERATION) @Nullable Operation operation) { super( version, id, @@ -111,7 +113,8 @@ public Changelog( watermark, statistics, properties, - nextRowId); + nextRowId, + operation); } public static Changelog fromJson(String json) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 31fb3c52cab6..dbd316d8fa28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -42,6 +42,8 @@ public interface FileStoreCommit extends AutoCloseable { FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot); + FileStoreCommit withOperation(Snapshot.Operation operation); + /** Find out which committables need to be retried when recovering from the failure. */ List filterCommitted(List committables); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index a316c257113c..4eba344be20d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -161,6 +161,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private CommitMetrics commitMetrics; private boolean appendCommitCheckConflict = false; private long lastCommittedSnapshotId = -1L; + @Nullable private Snapshot.Operation operation; public FileStoreCommitImpl( SnapshotCommit snapshotCommit, @@ -249,6 +250,12 @@ public FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot) return this; } + @Override + public FileStoreCommit withOperation(Snapshot.Operation operation) { + this.operation = operation; + return this; + } + @Override public List filterCommitted(List committables) { // nothing to filter, fast exit @@ -1085,7 +1092,8 @@ CommitResult tryCommitOnce( statsFileName, // if empty properties, just set to null properties.isEmpty() ? null : properties, - nextRowIdStart); + nextRowIdStart, + operation); } catch (Throwable e) { // fails when preparing for commit, we should clean up commitCleaner.cleanUpReuseTmpManifests( @@ -1190,7 +1198,8 @@ public boolean replaceManifestList( latest.statistics(), // if empty properties, just set to null latest.properties(), - nextRowId); + nextRowId, + null); return commitSnapshotImpl(newSnapshot, emptyList()); } @@ -1269,7 +1278,8 @@ private boolean compactManifestOnce() { latestSnapshot.watermark(), latestSnapshot.statistics(), latestSnapshot.properties(), - latestSnapshot.nextRowId()); + latestSnapshot.nextRowId(), + null); return commitSnapshotImpl(newSnapshot, emptyList()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java index 84215980d5bb..2cfc181eada2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.Snapshot; import org.apache.paimon.Snapshot.CommitKind; import org.apache.paimon.annotation.Public; import org.apache.paimon.stats.Statistics; @@ -71,4 +72,9 @@ public interface BatchTableCommit extends TableCommit { /** Compact the manifest entries. Generates a snapshot with {@link CommitKind#COMPACT}. */ void compactManifests(); + + /** Set the logical operation type (e.g. WRITE, DELETE, MERGE) recorded in the snapshot. */ + default BatchTableCommit withOperation(Snapshot.Operation operation) { + return this; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 0311c9bbe4d8..6a9e3046cc14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.Path; @@ -170,6 +171,12 @@ public TableCommitImpl rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot) return this; } + @Override + public TableCommitImpl withOperation(Snapshot.Operation operation) { + commit.withOperation(operation); + return this; + } + @Override public InnerTableCommit withMetricRegistry(MetricRegistry registry) { commit.withMetrics(new CommitMetrics(registry, tableName)); @@ -185,11 +192,13 @@ public void commit(List commitMessages) { @Override public void truncateTable() { checkCommitted(); + commit.withOperation(Snapshot.Operation.TRUNCATE); commit.truncateTable(COMMIT_IDENTIFIER); } @Override public void truncatePartitions(List> partitionSpecs) { + commit.withOperation(Snapshot.Operation.TRUNCATE); commit.dropPartitions(partitionSpecs, COMMIT_IDENTIFIER); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 394dc4d58a83..201192fde636 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -108,7 +108,9 @@ public class SnapshotsTable implements ReadonlyTable { new DataField(10, "delta_record_count", new BigIntType(true)), new DataField(11, "changelog_record_count", new BigIntType(true)), new DataField(12, "watermark", new BigIntType(true)), - new DataField(13, "next_row_id", new BigIntType(true)))); + new DataField(13, "next_row_id", new BigIntType(true)), + new DataField( + 14, "operation", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -339,7 +341,10 @@ private InternalRow toRow(Snapshot snapshot) { snapshot.deltaRecordCount(), snapshot.changelogRecordCount(), snapshot.watermark(), - snapshot.nextRowId()); + snapshot.nextRowId(), + snapshot.operation() == null + ? null + : BinaryString.fromString(snapshot.operation().toString())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index 192df2d4705a..22d86b6fca14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -80,6 +80,7 @@ public Tag( @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) Map properties, @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_OPERATION) @Nullable Operation operation, @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { super( @@ -103,7 +104,8 @@ public Tag( watermark, statistics, properties, - nextRowId); + nextRowId, + operation); this.tagCreateTime = tagCreateTime; this.tagTimeRetained = tagTimeRetained; } @@ -142,6 +144,7 @@ public static Tag fromSnapshotAndTagTtl( snapshot.statistics(), snapshot.properties(), snapshot.nextRowId(), + snapshot.operation(), tagCreateTime, tagTimeRetained); } @@ -168,7 +171,8 @@ public Snapshot trimToSnapshot() { watermark, statistics, properties, - nextRowId); + nextRowId, + operation); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java index 0cd75f3528d7..3a035c8a3388 100644 --- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java @@ -78,6 +78,52 @@ public void testSnapshotWithSizes() { assertThat(Snapshot.fromJson(snapshot.toJson())).isEqualTo(snapshot); } + @Test + public void testSnapshotWithOperation() { + // Old snapshot without operation field: operation should be null + String oldJson = + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 1,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : \"m-0\",\n" + + " \"deltaManifestList\" : \"m-1\",\n" + + " \"commitUser\" : \"user\",\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"totalRecordCount\" : 10,\n" + + " \"deltaRecordCount\" : 5\n" + + "}"; + Snapshot old = Snapshot.fromJson(oldJson); + assertThat(old.operation()).isNull(); + + // New snapshot with operation field + String newJson = + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 2,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : \"m-0\",\n" + + " \"deltaManifestList\" : \"m-1\",\n" + + " \"commitUser\" : \"user\",\n" + + " \"commitIdentifier\" : 1,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 2000,\n" + + " \"totalRecordCount\" : 20,\n" + + " \"deltaRecordCount\" : 10,\n" + + " \"operation\" : \"MERGE\"\n" + + "}"; + Snapshot withOp = Snapshot.fromJson(newJson); + assertThat(withOp.operation()).isEqualTo(Snapshot.Operation.MERGE); + + // Round-trip: toJson -> fromJson preserves operation + assertThat(Snapshot.fromJson(withOp.toJson())).isEqualTo(withOp); + + // Null operation is omitted in JSON + assertThat(old.toJson()).doesNotContain("operation"); + } + public static SnapshotManager newSnapshotManager(FileIO fileIO, Path tablePath) { return newSnapshotManager(fileIO, tablePath, DEFAULT_MAIN_BRANCH); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java index b17c912aafd8..ffdc12539aca 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java @@ -993,7 +993,8 @@ private void replaceGlobalIndexRangesWithPartitionSpanningRanges(FileStoreTable latest.watermark(), latest.statistics(), latest.properties(), - latest.nextRowId()); + latest.nextRowId(), + latest.operation()); SnapshotManager snapshotManager = table.snapshotManager(); snapshotManager .fileIO() diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java index 6e8ae6f36a47..0924b942037c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java @@ -137,6 +137,7 @@ private static Snapshot createSnapshot(long id) throws IOException { null, null, null, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 1da28a5bef79..c4de0f44e1a3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -420,6 +420,7 @@ private Snapshot snapshotWithManifestLists( null, null, null, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index c3371e1c19b9..0695406bde6c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -740,6 +740,7 @@ private Snapshot snapshot(long id) { null, null, null, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java index 7a3bbf961c5c..c1a6b6f7140a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java @@ -118,7 +118,10 @@ private List getExpectedResult(long[] snapshotIds) { snapshot.deltaRecordCount(), snapshot.changelogRecordCount(), snapshot.watermark(), - snapshot.nextRowId())); + snapshot.nextRowId(), + snapshot.operation() == null + ? null + : BinaryString.fromString(snapshot.operation().toString()))); } return expectedRow; diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index ef0e1627ee26..d262110f0331 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -410,6 +410,7 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, null); tagManager.createTag( snapshot1, @@ -439,6 +440,7 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, null); tagManager.createTag( snapshot2, diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java index d7dbeacb7f57..436e86c2113f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -51,6 +51,7 @@ public class TagTest { null, null, null, + null, null); @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 8e47b74aa345..02b21ba90637 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -293,6 +293,7 @@ public static Snapshot createSnapshotWithMillis(long id, long millis) { null, null, null, + null, null); } @@ -317,6 +318,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis, long watermark) watermark, null, null, + null, null); } @@ -342,6 +344,7 @@ private Changelog createChangelogWithMillis(long id, long millis) { null, null, null, + null, null)); } @@ -373,6 +376,7 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException null, null, null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } @@ -426,6 +430,7 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru null, null, null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 44095b7d2e04..4aa8fb784089 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} @@ -66,7 +68,10 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate query, planLater(query), newProps, - new CaseInsensitiveStringMap(writeOptions.asJava), + new CaseInsensitiveStringMap( + (writeOptions + + (PaimonWriteOptions.OPERATION_OPTION -> Snapshot.Operation.CREATE_TABLE_AS_SELECT + .name())).asJava), ifNotExists ) :: Nil case _ => Nil diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index ca10cb259f4e..eb3e0444597e 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName @@ -69,7 +71,10 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) query, planLater(query), qualifiedSpec, - new CaseInsensitiveStringMap(writeOptions.asJava), + new CaseInsensitiveStringMap( + (writeOptions + + (PaimonWriteOptions.OPERATION_OPTION -> Snapshot.Operation.CREATE_TABLE_AS_SELECT + .name())).asJava), ifNotExists ) :: Nil case _ => Nil diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index be377c17d1e8..0e0f3037d219 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier @@ -71,7 +73,10 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) analyzedQuery.get, planLater(query), qualifiedSpec, - new CaseInsensitiveStringMap(writeOptions.asJava), + new CaseInsensitiveStringMap( + (writeOptions + + (PaimonWriteOptions.OPERATION_OPTION -> Snapshot.Operation.CREATE_TABLE_AS_SELECT + .name())).asJava), ifNotExists ) :: Nil case _ => Nil diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala index ec4c0498e1e0..ee9064dcc36b 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.catalog.SparkBaseCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier @@ -62,7 +64,11 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) val (tableOptions, writeOptions) = splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) - val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava) + val operation = + if (orCreate) Snapshot.Operation.CREATE_OR_REPLACE_TABLE_AS_SELECT + else Snapshot.Operation.REPLACE_TABLE_AS_SELECT + val writeOpts = new CaseInsensitiveStringMap( + (writeOptions + (PaimonWriteOptions.OPERATION_OPTION -> operation.name())).asJava) val pinnedQuery = pinSnapshotInQuery(catalog, ident, analyzedQuery.get) if (canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 4e5080c0d59c..c14ee04648a9 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction +import org.apache.paimon.Snapshot import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile import org.apache.paimon.index.GlobalIndexMeta @@ -236,7 +237,7 @@ case class MergeIntoPaimonDataEvolutionTable( if (plan.snapshotId() != null) { writer.rowIdCheckConflict(plan.snapshotId()) } - writer.commit(updateCommit ++ insertCommit) + writer.commit(updateCommit ++ insertCommit, Snapshot.Operation.MERGE) } finally { if (persistSourceDss.isDefined) { persistSourceDss.get.unpersist(blocking = false) diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 89689e108c45..c24d4d6675b9 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.{PaimonRelation, PaimonUpdateAction} import org.apache.paimon.spark.schema.{PaimonMetadataColumn, SparkSystemColumns} @@ -78,7 +79,7 @@ case class MergeIntoPaimonTable( } else { performMergeForNonPkTable(sparkSession) } - writer.commit(commitMessages) + writer.commit(commitMessages, Snapshot.Operation.MERGE) Seq.empty[Row] } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index c78f78392405..6a5a141f082f 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.write +import org.apache.paimon.Snapshot import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan import org.apache.paimon.table.FileStoreTable @@ -36,8 +37,15 @@ class PaimonBatchWrite( writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]) - extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None) + extends PaimonBatchWriteBase( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) with BatchWrite with Serializable { @@ -57,6 +65,13 @@ object PaimonBatchWrite { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None): PaimonBatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 0f7ea24e66af..5be40bbea622 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.Snapshot import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser @@ -209,8 +210,15 @@ class Spark4Shim extends SparkShim { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation]): BatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) override def createFormatTableBatchWrite( table: FormatTable, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index d052d3547185..cd29efa1527b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.Snapshot import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable @@ -46,7 +47,7 @@ case class DeleteFromPaimonTableCommand( } else { performNonPrimaryKeyDelete(sparkSession) } - writer.commit(commitMessages) + writer.commit(commitMessages, Snapshot.Operation.DELETE) Seq.empty[Row] } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 321eb6f1478f..7464176c8763 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction +import org.apache.paimon.Snapshot import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile import org.apache.paimon.index.GlobalIndexMeta @@ -235,7 +236,7 @@ case class MergeIntoPaimonDataEvolutionTable( if (plan.snapshotId() != null) { writer.rowIdCheckConflict(plan.snapshotId()) } - writer.commit(updateCommit ++ insertCommit) + writer.commit(updateCommit ++ insertCommit, Snapshot.Operation.MERGE) } finally { if (persistSourceDss.isDefined) { persistSourceDss.get.unpersist(blocking = false) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 89689e108c45..c24d4d6675b9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.{PaimonRelation, PaimonUpdateAction} import org.apache.paimon.spark.schema.{PaimonMetadataColumn, SparkSystemColumns} @@ -78,7 +79,7 @@ case class MergeIntoPaimonTable( } else { performMergeForNonPkTable(sparkSession) } - writer.commit(commitMessages) + writer.commit(commitMessages, Snapshot.Operation.MERGE) Seq.empty[Row] } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 8e310dc9e092..92df5a7f06bc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -414,6 +414,10 @@ case class PaimonSparkWriter( } def commit(commitMessages: Seq[CommitMessage]): Unit = { + commit(commitMessages, null) + } + + def commit(commitMessages: Seq[CommitMessage], operation: Snapshot.Operation): Unit = { val finalWriteBuilder = if (postponeBatchWriteFixedBucket) { writeBuilder .asInstanceOf[BatchWriteBuilderImpl] @@ -424,6 +428,9 @@ case class PaimonSparkWriter( writeBuilder } val tableCommit = finalWriteBuilder.newCommit() + if (operation != null) { + tableCommit.withOperation(operation) + } try { tableCommit.commit(commitMessages.toList.asJava) } catch { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 1feb0f1f7618..1f1622fe2b77 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.Snapshot import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN} import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable @@ -52,7 +53,7 @@ case class UpdatePaimonTableCommand( } else { performUpdateForNonPkTable(sparkSession) } - writer.commit(commitMessages) + writer.commit(commitMessages, Snapshot.Operation.UPDATE) Seq.empty[Row] } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index fcf061ec6733..f400409fdae1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -19,10 +19,12 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE +import org.apache.paimon.Snapshot import org.apache.paimon.options.Options import org.apache.paimon.spark._ import org.apache.paimon.spark.catalyst.analysis.ReplacePaimonFunctions import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.paimon.table.FileStoreTable import org.apache.spark.internal.Logging @@ -60,8 +62,17 @@ case class WriteIntoPaimonTable( if (overwritePartition != null) { writer.writeBuilder.withOverwrite(overwritePartition.asJava) } + val operation = Option(options.get(PaimonWriteOptions.OPERATION_OPTION)) + .map(Snapshot.Operation.valueOf) + .getOrElse { + if (overwritePartition != null || dynamicPartitionOverwriteMode) { + Snapshot.Operation.OVERWRITE + } else { + Snapshot.Operation.WRITE + } + } val commitMessages = writer.write(replacedData) - writer.commit(commitMessages) + writer.commit(commitMessages, operation) Seq.empty } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala index 24c3c19761c2..5db630bec7ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.rowops +import org.apache.paimon.Snapshot import org.apache.paimon.options.Options import org.apache.paimon.spark.PaimonBaseScanBuilder import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH_COLUMN, ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN} @@ -52,6 +53,7 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info: RowLevelOpera override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { val options = Options.fromMap(info.options) val builder = new PaimonV2WriteBuilder(table, info.schema(), options) + builder.withOperationType(Snapshot.Operation.valueOf(command().toString)) assert(copyOnWriteScan.isDefined) builder.overwriteFiles(copyOnWriteScan.get) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala index 42d2ebcd8590..e925cad76b8e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.write +import org.apache.paimon.Snapshot import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} import org.apache.paimon.spark.SparkTypeUtils import org.apache.paimon.spark.catalyst.Compatibility @@ -55,7 +56,8 @@ abstract class PaimonBatchWriteBase( val writeSchema: StructType, val dataSchema: StructType, val overwritePartitions: Option[Map[String, String]], - val copyOnWriteScan: Option[PaimonCopyOnWriteScan]) + val copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None) extends WriteHelper with Serializable { @@ -114,6 +116,9 @@ abstract class PaimonBatchWriteBase( logInfo(s"Committing to table ${table.name()}") val batchTableCommit = batchWriteBuilder.newCommit() batchTableCommit.withMetricRegistry(metricRegistry) + val operation = operationType.getOrElse( + if (overwritePartitions.isDefined) Snapshot.Operation.OVERWRITE else Snapshot.Operation.WRITE) + batchTableCommit.withOperation(operation) val addCommitMessage = WriteTaskResult.merge(messages) val deletedCommitMessage = copyOnWriteScan match { case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala index 2ae1dd53a367..6dd02172a6a2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.write import org.apache.paimon.CoreOptions.ChangelogProducer +import org.apache.paimon.Snapshot import org.apache.paimon.options.Options import org.apache.paimon.spark._ import org.apache.paimon.spark.commands.SchemaEvolutionHelper @@ -41,7 +42,8 @@ class PaimonV2Write( overwritePartitions: Option[Map[String, String]], copyOnWriteScan: Option[PaimonCopyOnWriteScan], dataSchema: StructType, - options: Options + options: Options, + operationType: Option[Snapshot.Operation] = None ) extends Write with RequiresDistributionAndOrdering with SchemaEvolutionHelper @@ -69,7 +71,8 @@ class PaimonV2Write( writeSchema, dataSchema, overwritePartitions, - copyOnWriteScan) + copyOnWriteScan, + operationType) } override def supportedCustomMetrics(): Array[CustomMetric] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala index 91f4f861ce48..f57c4e99aa22 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.write import org.apache.paimon.CoreOptions +import org.apache.paimon.Snapshot import org.apache.paimon.options.Options import org.apache.paimon.table.FileStoreTable import org.apache.paimon.types.RowType @@ -30,13 +31,27 @@ import scala.collection.JavaConverters._ class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType, options: Options) extends BaseV2WriteBuilder(table) { + private var _operationType: Option[Snapshot.Operation] = + Option(options.get(PaimonWriteOptions.OPERATION_OPTION)).map(Snapshot.Operation.valueOf) + + def withOperationType(operationType: Snapshot.Operation): PaimonV2WriteBuilder = { + _operationType = Option(operationType) + this + } + override def build: PaimonV2Write = { val finalTable = overwriteDynamic match { case Some(o) => table.copy(Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> o.toString).asJava) case _ => table } - new PaimonV2Write(finalTable, overwritePartitions, copyOnWriteScan, dataSchema, options) + new PaimonV2Write( + finalTable, + overwritePartitions, + copyOnWriteScan, + dataSchema, + options, + _operationType) } override def partitionRowType(): RowType = table.schema().logicalPartitionType() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteOptions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteOptions.scala new file mode 100644 index 000000000000..16633cad3755 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteOptions.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write + +/** Internal Spark write-option keys. */ +object PaimonWriteOptions { + + /** Carries the snapshot operation type through Spark's CTAS/RTAS write-option map. */ + val OPERATION_OPTION: String = "__paimon.operation" +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index fbfb6e4ecab3..1a8e3ffe4b75 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier @@ -66,8 +68,10 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) parts, query, qualifiedSpec, - writeOptions, - ifNotExists) :: Nil + writeOptions + + (PaimonWriteOptions.OPERATION_OPTION -> Snapshot.Operation.CREATE_TABLE_AS_SELECT.name()), + ifNotExists + ) :: Nil case _ => Nil } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala index 4c21a448e9bd..4bbcc0af25e2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.shim +import org.apache.paimon.Snapshot import org.apache.paimon.spark.catalog.SparkBaseCatalog +import org.apache.paimon.spark.write.PaimonWriteOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier @@ -53,6 +55,11 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) val (tableOptions, writeOptions) = splitTableAndWriteOptions(options) val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions) + val operation = + if (orCreate) Snapshot.Operation.CREATE_OR_REPLACE_TABLE_AS_SELECT + else Snapshot.Operation.REPLACE_TABLE_AS_SELECT + val finalWriteOptions = + writeOptions + (PaimonWriteOptions.OPERATION_OPTION -> operation.name()) // Pin snapshot in query to prevent self-referencing RTAS from reading truncated data val pinnedQuery = pinSnapshotInQuery(catalog, ident, query) if (canAtomicReplace(catalog, ident, qualifiedSpec, parts)) { @@ -62,7 +69,7 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) parts, pinnedQuery, qualifiedSpec, - writeOptions, + finalWriteOptions, orCreate = orCreate) :: Nil } else { SparkShimLoader.shim.createReplaceTableAsSelectExec( @@ -71,7 +78,7 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) parts, pinnedQuery, qualifiedSpec, - writeOptions, + finalWriteOptions, orCreate = orCreate) :: Nil } case _ => Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 7883903b30df..739038fff18d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.Snapshot import org.apache.paimon.data.variant.Variant import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow} import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan @@ -127,7 +128,8 @@ trait SparkShim { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation]): BatchWrite /** Same `BatchWrite` mixin problem as [[createPaimonBatchWrite]], but for `FormatTable` writes. */ def createFormatTableBatchWrite( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SnapshotOperationTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SnapshotOperationTest.scala new file mode 100644 index 000000000000..fcc4d06650b8 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SnapshotOperationTest.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.Snapshot +import org.apache.paimon.spark.PaimonSparkTestBase + +/** Verifies the logical operation type recorded in the committed snapshot. */ +class SnapshotOperationTest extends PaimonSparkTestBase { + + private def latestOperation(tableName: String): Snapshot.Operation = { + val snapshot = loadTable(tableName).snapshotManager().latestSnapshot() + assert(snapshot != null, s"table $tableName has no snapshot") + snapshot.operation() + } + + test("Snapshot operation: INSERT / OVERWRITE / UPDATE / DELETE / MERGE") { + for (useV2Write <- Seq("true", "false")) { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) { + withTable("t", "s") { + sql("CREATE TABLE t (id INT, name STRING)") + sql("CREATE TABLE s (id INT, name STRING)") + sql("INSERT INTO s VALUES (1, 'merged'), (9, 'new')") + sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')") + assert(latestOperation("t") == Snapshot.Operation.WRITE) + + sql("INSERT OVERWRITE t VALUES (1, 'a2'), (2, 'b2'), (3, 'c2')") + assert(latestOperation("t") == Snapshot.Operation.OVERWRITE) + + sql("UPDATE t SET name = 'updated' WHERE id = 1") + assert(latestOperation("t") == Snapshot.Operation.UPDATE) + + sql("DELETE FROM t WHERE id = 2") + assert(latestOperation("t") == Snapshot.Operation.DELETE) + + sql(""" + |MERGE INTO t USING s ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.name = s.name + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + assert(latestOperation("t") == Snapshot.Operation.MERGE) + } + } + } + } + + test("Snapshot operation: CTAS / RTAS") { + for (useV2Write <- Seq("true", "false")) { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) { + withTable("src", "ctas", "rtas") { + sql("CREATE TABLE src (id INT, name STRING)") + sql("INSERT INTO src VALUES (1, 'a'), (2, 'b')") + + sql("CREATE TABLE ctas AS SELECT * FROM src") + assert(latestOperation("ctas") == Snapshot.Operation.CREATE_TABLE_AS_SELECT) + + sql("CREATE OR REPLACE TABLE rtas AS SELECT * FROM src") + assert(latestOperation("rtas") == Snapshot.Operation.CREATE_OR_REPLACE_TABLE_AS_SELECT) + + // rtas twice + sql("CREATE OR REPLACE TABLE rtas AS SELECT * FROM src") + assert(latestOperation("rtas") == Snapshot.Operation.CREATE_OR_REPLACE_TABLE_AS_SELECT) + + // pure REPLACE TABLE (no CREATE) + sql("REPLACE TABLE rtas AS SELECT * FROM src") + assert(latestOperation("rtas") == Snapshot.Operation.REPLACE_TABLE_AS_SELECT) + } + } + } + } + + test("Snapshot operation: TRUNCATE") { + withTable("t") { + sql("CREATE TABLE t (id INT, name STRING, dt STRING) PARTITIONED BY (dt)") + sql("INSERT INTO t VALUES (1, 'a', '2024-01-01'), (2, 'b', '2024-01-02')") + + // Full-table DELETE (no WHERE) is optimized to truncateTable + sql("DELETE FROM t") + assert(latestOperation("t") == Snapshot.Operation.TRUNCATE) + + sql("INSERT INTO t VALUES (3, 'c', '2024-01-01'), (4, 'd', '2024-01-02')") + + // Partition DELETE is optimized to truncatePartitions + sql("DELETE FROM t WHERE dt = '2024-01-01'") + assert(latestOperation("t") == Snapshot.Operation.TRUNCATE) + + sql("INSERT INTO t VALUES (5, 'e', '2024-01-01')") + + // TRUNCATE TABLE + sql("TRUNCATE TABLE t") + assert(latestOperation("t") == Snapshot.Operation.TRUNCATE) + } + } +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index 89eb15054c5b..57b89270d8d8 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.write +import org.apache.paimon.Snapshot import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan import org.apache.paimon.table.FileStoreTable @@ -33,8 +34,15 @@ class PaimonBatchWrite( writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]) - extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None) + extends PaimonBatchWriteBase( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) with BatchWrite with Serializable { @@ -54,6 +62,13 @@ object PaimonBatchWrite { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None): PaimonBatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) } diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index cbefee5d0487..9559efaa77c0 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.Snapshot import org.apache.paimon.data.variant.Variant import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser @@ -195,8 +196,15 @@ class Spark3Shim extends SparkShim { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation]): BatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) override def createFormatTableBatchWrite( table: FormatTable, diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index 38683e79c017..374062c6b4b0 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.write +import org.apache.paimon.Snapshot import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan import org.apache.paimon.table.FileStoreTable @@ -35,8 +36,15 @@ class PaimonBatchWrite( writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]) - extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None) + extends PaimonBatchWriteBase( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) with BatchWrite with Serializable { @@ -56,6 +64,13 @@ object PaimonBatchWrite { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation] = None): PaimonBatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 9c4a4daa6a55..2a76d92fda78 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.Snapshot import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser @@ -193,8 +194,15 @@ class Spark4Shim extends SparkShim { writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = - new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + copyOnWriteScan: Option[PaimonCopyOnWriteScan], + operationType: Option[Snapshot.Operation]): BatchWrite = + new PaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan, + operationType) override def createFormatTableBatchWrite( table: FormatTable,