-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[spark] Record the write operation type in snapshot properties #8236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aa9488d
87c8a42
3673c26
1c8f92d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ public class Snapshot implements Serializable { | |
| protected static final String FIELD_STATISTICS = "statistics"; | ||
| protected static final String FIELD_PROPERTIES = "properties"; | ||
| protected static final String FIELD_NEXT_ROW_ID = "nextRowId"; | ||
| protected static final String FIELD_OPERATION = "operation"; | ||
|
|
||
| // version of snapshot | ||
| @JsonProperty(FIELD_VERSION) | ||
|
|
@@ -181,6 +182,11 @@ public class Snapshot implements Serializable { | |
| @Nullable | ||
| protected final Long nextRowId; | ||
|
|
||
| @JsonInclude(JsonInclude.Include.NON_NULL) | ||
| @JsonProperty(FIELD_OPERATION) | ||
| @Nullable | ||
| protected final Operation operation; | ||
|
|
||
| public Snapshot( | ||
| long id, | ||
| long schemaId, | ||
|
|
@@ -202,6 +208,52 @@ public Snapshot( | |
| @Nullable String statistics, | ||
| @Nullable Map<String, String> properties, | ||
| @Nullable Long nextRowId) { | ||
| this( | ||
| id, | ||
| schemaId, | ||
| baseManifestList, | ||
| baseManifestListSize, | ||
| deltaManifestList, | ||
| deltaManifestListSize, | ||
| changelogManifestList, | ||
| changelogManifestListSize, | ||
| indexManifest, | ||
| commitUser, | ||
| commitIdentifier, | ||
| commitKind, | ||
| timeMillis, | ||
| totalRecordCount, | ||
| deltaRecordCount, | ||
| changelogRecordCount, | ||
| watermark, | ||
| statistics, | ||
| properties, | ||
| nextRowId, | ||
| null); | ||
| } | ||
|
|
||
| public Snapshot( | ||
| long id, | ||
| long schemaId, | ||
| String baseManifestList, | ||
| @Nullable Long baseManifestListSize, | ||
| String deltaManifestList, | ||
| @Nullable Long deltaManifestListSize, | ||
| @Nullable String changelogManifestList, | ||
| @Nullable Long changelogManifestListSize, | ||
| @Nullable String indexManifest, | ||
| String commitUser, | ||
| long commitIdentifier, | ||
| CommitKind commitKind, | ||
| long timeMillis, | ||
| long totalRecordCount, | ||
| long deltaRecordCount, | ||
| @Nullable Long changelogRecordCount, | ||
| @Nullable Long watermark, | ||
| @Nullable String statistics, | ||
| @Nullable Map<String, String> properties, | ||
| @Nullable Long nextRowId, | ||
| @Nullable Operation operation) { | ||
| this( | ||
| CURRENT_VERSION, | ||
| id, | ||
|
|
@@ -223,7 +275,55 @@ public Snapshot( | |
| watermark, | ||
| statistics, | ||
| properties, | ||
| nextRowId); | ||
| nextRowId, | ||
| operation); | ||
| } | ||
|
|
||
| public Snapshot( | ||
| int version, | ||
| long id, | ||
| long schemaId, | ||
| String baseManifestList, | ||
| @Nullable Long baseManifestListSize, | ||
| String deltaManifestList, | ||
| @Nullable Long deltaManifestListSize, | ||
| @Nullable String changelogManifestList, | ||
| @Nullable Long changelogManifestListSize, | ||
| @Nullable String indexManifest, | ||
| String commitUser, | ||
| long commitIdentifier, | ||
| CommitKind commitKind, | ||
| long timeMillis, | ||
| long totalRecordCount, | ||
| long deltaRecordCount, | ||
| @Nullable Long changelogRecordCount, | ||
| @Nullable Long watermark, | ||
| @Nullable String statistics, | ||
| @Nullable Map<String, String> properties, | ||
| @Nullable Long nextRowId) { | ||
| this( | ||
| version, | ||
| id, | ||
| schemaId, | ||
| baseManifestList, | ||
| baseManifestListSize, | ||
| deltaManifestList, | ||
| deltaManifestListSize, | ||
| changelogManifestList, | ||
| changelogManifestListSize, | ||
| indexManifest, | ||
| commitUser, | ||
| commitIdentifier, | ||
| commitKind, | ||
| timeMillis, | ||
| totalRecordCount, | ||
| deltaRecordCount, | ||
| changelogRecordCount, | ||
| watermark, | ||
| statistics, | ||
| properties, | ||
| nextRowId, | ||
| null); | ||
| } | ||
|
|
||
| @JsonCreator | ||
|
|
@@ -249,7 +349,8 @@ public Snapshot( | |
| @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, | ||
| @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, | ||
| @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties, | ||
| @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) { | ||
| @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, | ||
| @JsonProperty(FIELD_OPERATION) @Nullable Operation operation) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for restoring the no-version constructor overload. The versioned public constructor is still source/binary incompatible because the old |
||
| this.version = version; | ||
| this.id = id; | ||
| this.schemaId = schemaId; | ||
|
|
@@ -271,6 +372,7 @@ public Snapshot( | |
| this.statistics = statistics; | ||
| this.properties = properties; | ||
| this.nextRowId = nextRowId; | ||
| this.operation = operation; | ||
| } | ||
|
|
||
| @JsonGetter(FIELD_VERSION) | ||
|
|
@@ -388,6 +490,12 @@ public Long nextRowId() { | |
| return nextRowId; | ||
| } | ||
|
|
||
| @JsonGetter(FIELD_OPERATION) | ||
| @Nullable | ||
| public Operation operation() { | ||
| return operation; | ||
| } | ||
|
|
||
| public String toJson() { | ||
| return JsonSerdeUtil.toJson(this); | ||
| } | ||
|
|
@@ -415,7 +523,8 @@ public int hashCode() { | |
| watermark, | ||
| statistics, | ||
| properties, | ||
| nextRowId); | ||
| nextRowId, | ||
| operation); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -447,7 +556,8 @@ public boolean equals(Object o) { | |
| && Objects.equals(watermark, that.watermark) | ||
| && Objects.equals(statistics, that.statistics) | ||
| && Objects.equals(properties, that.properties) | ||
| && Objects.equals(nextRowId, that.nextRowId); | ||
| && Objects.equals(nextRowId, that.nextRowId) | ||
| && operation == that.operation; | ||
| } | ||
|
|
||
| /** Type of changes in this snapshot. */ | ||
|
|
@@ -469,6 +579,19 @@ public enum CommitKind { | |
| ANALYZE | ||
| } | ||
|
|
||
| /** Logical operation type that produced this snapshot. */ | ||
| public enum Operation { | ||
| WRITE, | ||
| OVERWRITE, | ||
| DELETE, | ||
| TRUNCATE, | ||
| UPDATE, | ||
| MERGE, | ||
| CREATE_TABLE_AS_SELECT, | ||
| REPLACE_TABLE_AS_SELECT, | ||
| CREATE_OR_REPLACE_TABLE_AS_SELECT | ||
| } | ||
|
|
||
| // =================== Utils for reading ========================= | ||
|
|
||
| public static Snapshot fromJson(String json) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.