diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..5c1eb37fb8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -75,7 +75,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _AppendFiles from pyiceberg.table.update.sorting import UpdateSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics @@ -384,7 +384,7 @@ def _build_partition_predicate( def _append_snapshot_producer( self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH - ) -> _FastAppendFiles: + ) -> _AppendFiles[Any]: """Determine the append type based on table properties. Args: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..5e9c20556c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -496,36 +496,7 @@ def files_affected(self) -> bool: return len(self._deleted_entries()) > 0 -class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]): - def _existing_manifests(self) -> list[ManifestFile]: - """To determine if there are any existing manifest files. - - A fast append will add another ManifestFile to the ManifestList. - All the existing manifest files are considered existing. - """ - existing_manifests = [] - - if self._parent_snapshot_id is not None: - previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) - - if previous_snapshot is None: - raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}") - - for manifest in previous_snapshot.manifests(io=self._io): - if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id: - existing_manifests.append(manifest) - - return existing_manifests - - def _deleted_entries(self) -> list[ManifestEntry]: - """To determine if we need to record any deleted manifest entries. - - In case of an append, nothing is deleted. - """ - return [] - - -class _MergeAppendFiles(_FastAppendFiles): +class _MergingSnapshotProducer(_SnapshotProducer[U]): _target_size_bytes: int _min_count_to_merge: int _merge_enabled: bool @@ -561,8 +532,8 @@ def __init__( def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile]: """To perform any post-processing on the manifests before writing them to the new snapshot. - In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge - if automatic merge is enabled. + We merge manifests based on the target size and the minimum count to merge if automatic + merge is enabled. """ unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA] unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES] @@ -577,7 +548,44 @@ def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests -class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): +class _AppendFiles(_SnapshotProducer[U]): + def _existing_manifests(self) -> list[ManifestFile]: + """To determine if there are any existing manifest files. + + An append will add another ManifestFile to the ManifestList. + All the existing manifest files are considered existing. + """ + existing_manifests = [] + + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + + if previous_snapshot is None: + raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}") + + for manifest in previous_snapshot.manifests(io=self._io): + if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id: + existing_manifests.append(manifest) + + return existing_manifests + + def _deleted_entries(self) -> list[ManifestEntry]: + """To determine if we need to record any deleted manifest entries. + + In case of an append, nothing is deleted. + """ + return [] + + +class _FastAppendFiles(_AppendFiles["_FastAppendFiles"]): + pass + + +class _MergeAppendFiles(_MergingSnapshotProducer["_MergeAppendFiles"], _AppendFiles["_MergeAppendFiles"]): + pass + + +class _OverwriteFiles(_MergingSnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. Data and delete files were added and removed in a logical overwrite operation. @@ -742,7 +750,13 @@ def __init__( def _group_by_spec(self, manifests: list[ManifestFile]) -> dict[int, list[ManifestFile]]: groups = defaultdict(list) for manifest in manifests: - groups[manifest.partition_spec_id].append(manifest) + # filter out manifests that only has non-live data files + if ( + manifest.has_added_files() + or manifest.has_existing_files() + or manifest.added_snapshot_id == self._snapshot_producer._snapshot_id + ): + groups[manifest.partition_spec_id].append(manifest) return groups def _create_manifest(self, spec_id: int, manifest_bin: list[ManifestFile]) -> ManifestFile: diff --git a/tests/integration/test_writes/test_manifest_merging.py b/tests/integration/test_writes/test_manifest_merging.py new file mode 100644 index 0000000000..920f53185d --- /dev/null +++ b/tests/integration/test_writes/test_manifest_merging.py @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.expressions import EqualTo +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import LongType, NestedField +from utils import _create_table + +_SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="partition_col", field_type=LongType(), required=False), +) + +_PARTITION_SPEC = PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="partition_col")) + +_DATA_P1 = pa.table({"id": [1, 2, 3], "partition_col": [1, 1, 1]}) +_DATA_P2 = pa.table({"id": [4, 5, 6], "partition_col": [2, 2, 2]}) +_DATA_P1_NEW = pa.table({"id": [10, 11, 12], "partition_col": [1, 1, 1]}) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_overwrite_with_merging_bounds_manifest_count(session_catalog: Catalog, format_version: int) -> None: + """After a partial overwrite with manifest merging enabled, the manifest count should + not exceed the count before the overwrite, even when many manifests were accumulated.""" + identifier = f"default.test_overwrite_merges_manifests_v{format_version}" + + # Build up 6 manifests via fast append (merging disabled) + tbl = _create_table( + session_catalog, + identifier, + { + "format-version": str(format_version), + TableProperties.MANIFEST_MERGE_ENABLED: "false", + }, + partition_spec=_PARTITION_SPEC, + schema=_SCHEMA, + ) + + for _ in range(3): + tbl.append(_DATA_P1) + tbl.append(_DATA_P2) + + assert len(tbl.inspect.manifests()) == 6 + + # Enable merging before the overwrite + with tbl.transaction() as tx: + tx.set_properties( + { + TableProperties.MANIFEST_MERGE_ENABLED: "true", + TableProperties.MANIFEST_MIN_MERGE_COUNT: "2", + } + ) + + # Overwrite partition_col=1 only: the 3 partition_col=2 manifests are preserved + # and merged together with the new manifest by _MergeAppendFiles._process_manifests + tbl.overwrite(_DATA_P1_NEW, EqualTo("partition_col", 1)) + + assert len(tbl.inspect.manifests()) < 6 + + # Data correctness: partition_col=2 has 3 × 3 = 9 rows; partition_col=1 replaced with 3 new rows + result = tbl.scan().to_arrow() + assert len(result) == 12 + assert sorted(result.column("id").to_pylist()) == [4, 4, 4, 5, 5, 5, 6, 6, 6, 10, 11, 12] + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_overwrite_without_merging_increases_manifest_count(session_catalog: Catalog, format_version: int) -> None: + """Control test: without manifest merging, a partial overwrite grows the manifest count.""" + identifier = f"default.test_overwrite_no_merge_manifests_v{format_version}" + + tbl = _create_table( + session_catalog, + identifier, + { + "format-version": str(format_version), + TableProperties.MANIFEST_MERGE_ENABLED: "false", + }, + partition_spec=_PARTITION_SPEC, + schema=_SCHEMA, + ) + + for _ in range(3): + tbl.append(_DATA_P1) + for _ in range(3): + tbl.append(_DATA_P2) + + assert len(tbl.inspect.manifests()) == 6 + + # Overwrite without merging: new manifest is added on top of the existing ones + tbl.overwrite(_DATA_P1_NEW, EqualTo("partition_col", 1)) + + assert len(tbl.inspect.manifests()) == 4 + + # Data correctness is identical regardless of merging strategy + result = tbl.scan().to_arrow() + assert len(result) == 12 + assert sorted(result.column("id").to_pylist()) == [4, 4, 4, 5, 5, 5, 6, 6, 6, 10, 11, 12] + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_fast_append_does_not_merge_manifests(session_catalog: Catalog, format_version: int) -> None: + """Fast append bypasses _MergingSnapshotProducer, so manifests grow with each append + even when manifest merging properties are set to trigger early.""" + identifier = f"default.test_fast_append_no_merge_v{format_version}" + + tbl = _create_table( + session_catalog, + identifier, + { + "format-version": str(format_version), + TableProperties.MANIFEST_MERGE_ENABLED: "false", + TableProperties.MANIFEST_MIN_MERGE_COUNT: "2", + }, + schema=_SCHEMA, + ) + + for expected_count in range(1, 6): + tbl.append(_DATA_P1) + assert len(tbl.inspect.manifests()) == expected_count