Skip to content

Commit 69ca97b

Browse files
authored
Fix DELETED manifest entry snapshot_id in OverwriteFiles (#3237)
# Rationale for this change When _OverwriteFiles._deleted_entries() creates DELETED manifest entries, it now sets snapshot_id to the current (deleting) snapshot's ID instead of retaining the original INSERT snapshot's ID. Closes #3236 According to the [Iceberg spec (Manifest Entry Fields)](https://iceberg.apache.org/spec/#manifest-entry-fields), `snapshot_id` for a DELETED entry (status=2) should be the snapshot ID in which the file was deleted. However, `_OverwriteFiles._deleted_entries()` was copying the original entry's `snapshot_id` (from the INSERT snapshot) into the new DELETED entry. This caused downstream consumers that filter manifest entries by `snapshot_id` (e.g. Iceberg Java's `IncrementalChangelogScan`) to silently miss DELETED files, breaking CDC pipelines. ## Are these changes tested? Added `test_manifest_entry_snapshot_id_after_partial_deletes` in `tests/integration/test_deletes.py`. ## Are there any user-facing changes? N/A --------- Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 2558dcd commit 69ca97b

2 files changed

Lines changed: 50 additions & 1 deletion

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
652652
return [
653653
ManifestEntry.from_args(
654654
status=ManifestEntryStatus.DELETED,
655-
snapshot_id=entry.snapshot_id,
655+
snapshot_id=self._snapshot_id,
656656
sequence_number=entry.sequence_number,
657657
file_sequence_number=entry.file_sequence_number,
658658
data_file=entry.data_file,

tests/integration/test_deletes.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,3 +975,52 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho
975975
assert after_delete_snapshot is not None
976976

977977
assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)
978+
979+
980+
@pytest.mark.integration
981+
def test_manifest_entry_snapshot_id_after_partial_deletes(session_catalog: RestCatalog) -> None:
982+
identifier = "default.test_manifest_entry_snapshot_id_after_partial_deletes"
983+
try:
984+
session_catalog.drop_table(identifier)
985+
except NoSuchTableError:
986+
pass
987+
988+
schema = pa.schema(
989+
[
990+
("id", pa.int32()),
991+
("name", pa.string()),
992+
]
993+
)
994+
995+
table = session_catalog.create_table(identifier, schema)
996+
data = pa.Table.from_pylist(
997+
[
998+
{"id": 1, "name": "keep"},
999+
{"id": 2, "name": "keep"},
1000+
{"id": 3, "name": "delete_me"},
1001+
{"id": 4, "name": "delete_me"},
1002+
],
1003+
schema=schema,
1004+
)
1005+
table.append(data)
1006+
1007+
# Partial delete: only some rows match, triggering CoW overwrite via _OverwriteFiles
1008+
table.delete(EqualTo("name", "delete_me"))
1009+
after_delete_snapshot = table.refresh().current_snapshot()
1010+
assert after_delete_snapshot is not None
1011+
1012+
manifests = after_delete_snapshot.manifests(table.io)
1013+
deleted_entries = [
1014+
entry
1015+
for manifest in manifests
1016+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False)
1017+
if entry.status == ManifestEntryStatus.DELETED
1018+
]
1019+
1020+
assert len(deleted_entries) > 0, "Expected at least one DELETED manifest entry from the CoW overwrite"
1021+
1022+
for entry in deleted_entries:
1023+
assert entry.snapshot_id == after_delete_snapshot.snapshot_id, (
1024+
f"DELETED entry snapshot_id should be {after_delete_snapshot.snapshot_id} "
1025+
f"(the deleting snapshot), but was {entry.snapshot_id}"
1026+
)

0 commit comments

Comments
 (0)