Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/docs/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ Snapshot File is JSON, it includes:
15. changelogRecordCount: record count of all changelog produced in this snapshot.
16. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
17. statistics: stats file name for statistics of this table.
18. properties: additional key-value properties of this snapshot.
19. nextRowId: next row id for row tracking.
20. operation: logical operation type, e.g. WRITE, DELETE, UPDATE, MERGE. Null if not set.
131 changes: 127 additions & 4 deletions paimon-api/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class Snapshot implements Serializable {
protected static final String FIELD_STATISTICS = "statistics";
protected static final String FIELD_PROPERTIES = "properties";
protected static final String FIELD_NEXT_ROW_ID = "nextRowId";
protected static final String FIELD_OPERATION = "operation";

// version of snapshot
@JsonProperty(FIELD_VERSION)
Expand Down Expand Up @@ -181,6 +182,11 @@ public class Snapshot implements Serializable {
@Nullable
protected final Long nextRowId;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_OPERATION)
@Nullable
protected final Operation operation;

public Snapshot(
long id,
long schemaId,
Expand All @@ -202,6 +208,52 @@ public Snapshot(
@Nullable String statistics,
@Nullable Map<String, String> properties,
@Nullable Long nextRowId) {
this(
id,
schemaId,
baseManifestList,
baseManifestListSize,
deltaManifestList,
deltaManifestListSize,
changelogManifestList,
changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics,
properties,
nextRowId,
null);
}

public Snapshot(
long id,
long schemaId,
String baseManifestList,
@Nullable Long baseManifestListSize,
String deltaManifestList,
@Nullable Long deltaManifestListSize,
@Nullable String changelogManifestList,
@Nullable Long changelogManifestListSize,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
long totalRecordCount,
long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics,
@Nullable Map<String, String> properties,
@Nullable Long nextRowId,
@Nullable Operation operation) {
Comment thread
Zouxxyy marked this conversation as resolved.
this(
CURRENT_VERSION,
id,
Expand All @@ -223,7 +275,55 @@ public Snapshot(
watermark,
statistics,
properties,
nextRowId);
nextRowId,
operation);
}

public Snapshot(
int version,
long id,
long schemaId,
String baseManifestList,
@Nullable Long baseManifestListSize,
String deltaManifestList,
@Nullable Long deltaManifestListSize,
@Nullable String changelogManifestList,
@Nullable Long changelogManifestListSize,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
long totalRecordCount,
long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics,
@Nullable Map<String, String> properties,
@Nullable Long nextRowId) {
this(
version,
id,
schemaId,
baseManifestList,
baseManifestListSize,
deltaManifestList,
deltaManifestListSize,
changelogManifestList,
changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics,
properties,
nextRowId,
null);
}

@JsonCreator
Expand All @@ -249,7 +349,8 @@ public Snapshot(
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties,
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_OPERATION) @Nullable Operation operation) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for restoring the no-version constructor overload. The versioned public constructor is still source/binary incompatible because the old Snapshot(int version, ..., Long nextRowId) signature was replaced by the new one that requires operation. Since Snapshot is @Public, could we also keep that old versioned overload and delegate to this constructor with operation = null?

this.version = version;
this.id = id;
this.schemaId = schemaId;
Expand All @@ -271,6 +372,7 @@ public Snapshot(
this.statistics = statistics;
this.properties = properties;
this.nextRowId = nextRowId;
this.operation = operation;
}

@JsonGetter(FIELD_VERSION)
Expand Down Expand Up @@ -388,6 +490,12 @@ public Long nextRowId() {
return nextRowId;
}

@JsonGetter(FIELD_OPERATION)
@Nullable
public Operation operation() {
return operation;
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand Down Expand Up @@ -415,7 +523,8 @@ public int hashCode() {
watermark,
statistics,
properties,
nextRowId);
nextRowId,
operation);
}

@Override
Expand Down Expand Up @@ -447,7 +556,8 @@ public boolean equals(Object o) {
&& Objects.equals(watermark, that.watermark)
&& Objects.equals(statistics, that.statistics)
&& Objects.equals(properties, that.properties)
&& Objects.equals(nextRowId, that.nextRowId);
&& Objects.equals(nextRowId, that.nextRowId)
&& operation == that.operation;
}

/** Type of changes in this snapshot. */
Expand All @@ -469,6 +579,19 @@ public enum CommitKind {
ANALYZE
}

/** Logical operation type that produced this snapshot. */
public enum Operation {
WRITE,
OVERWRITE,
DELETE,
TRUNCATE,
UPDATE,
MERGE,
CREATE_TABLE_AS_SELECT,
REPLACE_TABLE_AS_SELECT,
CREATE_OR_REPLACE_TABLE_AS_SELECT
}

// =================== Utils for reading =========================

public static Snapshot fromJson(String json) {
Expand Down
9 changes: 6 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public Changelog(Snapshot snapshot) {
snapshot.watermark(),
snapshot.statistics(),
snapshot.properties,
snapshot.nextRowId);
snapshot.nextRowId,
snapshot.operation);
}

@JsonCreator
Expand All @@ -89,7 +90,8 @@ public Changelog(
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_OPERATION) @Nullable Operation operation) {
super(
version,
id,
Expand All @@ -111,7 +113,8 @@ public Changelog(
watermark,
statistics,
properties,
nextRowId);
nextRowId,
operation);
}

public static Changelog fromJson(String json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface FileStoreCommit extends AutoCloseable {

FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);

FileStoreCommit withOperation(Snapshot.Operation operation);

/** Find out which committables need to be retried when recovering from the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private CommitMetrics commitMetrics;
private boolean appendCommitCheckConflict = false;
private long lastCommittedSnapshotId = -1L;
@Nullable private Snapshot.Operation operation;

public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
Expand Down Expand Up @@ -249,6 +250,12 @@ public FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot)
return this;
}

@Override
public FileStoreCommit withOperation(Snapshot.Operation operation) {
this.operation = operation;
return this;
}

@Override
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables) {
// nothing to filter, fast exit
Expand Down Expand Up @@ -1085,7 +1092,8 @@ CommitResult tryCommitOnce(
statsFileName,
// if empty properties, just set to null
properties.isEmpty() ? null : properties,
nextRowIdStart);
nextRowIdStart,
operation);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
commitCleaner.cleanUpReuseTmpManifests(
Expand Down Expand Up @@ -1190,7 +1198,8 @@ public boolean replaceManifestList(
latest.statistics(),
// if empty properties, just set to null
latest.properties(),
nextRowId);
nextRowId,
null);

return commitSnapshotImpl(newSnapshot, emptyList());
}
Expand Down Expand Up @@ -1269,7 +1278,8 @@ private boolean compactManifestOnce() {
latestSnapshot.watermark(),
latestSnapshot.statistics(),
latestSnapshot.properties(),
latestSnapshot.nextRowId());
latestSnapshot.nextRowId(),
null);

return commitSnapshotImpl(newSnapshot, emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.stats.Statistics;
Expand Down Expand Up @@ -71,4 +72,9 @@ public interface BatchTableCommit extends TableCommit {

/** Compact the manifest entries. Generates a snapshot with {@link CommitKind#COMPACT}. */
void compactManifests();

/** Set the logical operation type (e.g. WRITE, DELETE, MERGE) recorded in the snapshot. */
default BatchTableCommit withOperation(Snapshot.Operation operation) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -170,6 +171,12 @@ public TableCommitImpl rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot)
return this;
}

@Override
public TableCommitImpl withOperation(Snapshot.Operation operation) {
commit.withOperation(operation);
return this;
}

@Override
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
commit.withMetrics(new CommitMetrics(registry, tableName));
Expand All @@ -185,11 +192,13 @@ public void commit(List<CommitMessage> commitMessages) {
@Override
public void truncateTable() {
checkCommitted();
commit.withOperation(Snapshot.Operation.TRUNCATE);
commit.truncateTable(COMMIT_IDENTIFIER);
}

@Override
public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
commit.withOperation(Snapshot.Operation.TRUNCATE);
commit.dropPartitions(partitionSpecs, COMMIT_IDENTIFIER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public class SnapshotsTable implements ReadonlyTable {
new DataField(10, "delta_record_count", new BigIntType(true)),
new DataField(11, "changelog_record_count", new BigIntType(true)),
new DataField(12, "watermark", new BigIntType(true)),
new DataField(13, "next_row_id", new BigIntType(true))));
new DataField(13, "next_row_id", new BigIntType(true)),
new DataField(
14, "operation", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -339,7 +341,10 @@ private InternalRow toRow(Snapshot snapshot) {
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.nextRowId());
snapshot.nextRowId(),
snapshot.operation() == null
? null
: BinaryString.fromString(snapshot.operation().toString()));
}
}
}
Loading
Loading