diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..d75072872b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: # avoid copying metadata for each data file table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + default_spec = table_metadata.spec() partition_summary_limit = int( table_metadata.properties.get( @@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: for data_file in self._added_data_files: ssc.add_file( data_file=data_file, - partition_spec=table_metadata.spec(), - schema=table_metadata.schema(), + partition_spec=default_spec, + schema=schema, ) if len(self._deleted_data_files) > 0: @@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ssc.remove_file( data_file=data_file, partition_spec=specs[data_file.spec_id], - schema=table_metadata.schema(), + schema=schema, ) previous_snapshot = ( @@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> data_file=entry.data_file, ) + # avoid copying metadata for each evaluator + table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive - ).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive + schema, self._predicate, case_sensitive=self._case_sensitive ).eval existing_manifests = [] @@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Should be the current tip of the _target_branch parent_snapshot_id_for_delete_source = self._parent_snapshot_id if parent_snapshot_id_for_delete_source is not None: - snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) + snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) if snapshot: # Ensure snapshot is found for manifest_file in snapshot.manifests(io=self._io): if manifest_file.content == ManifestContent.DATA: @@ -542,18 +546,19 @@ def __init__( from pyiceberg.table import TableProperties super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) + table_properties = self._transaction.table_metadata.properties self._target_size_bytes = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, ) # type: ignore self._min_count_to_merge = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MIN_MERGE_COUNT, TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, ) # type: ignore self._merge_enabled = property_as_bool( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..077027f7b9 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -551,3 +551,45 @@ def test_latest_ancestor_before_timestamp() -> None: result = latest_ancestor_before_timestamp(metadata, 1000) assert result is None + + +def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None: + """Transaction.table_metadata replays staged updates via update_table_metadata on + every access, so the snapshot producer must not read it once per item. Guards the + hoisting introduced in #2674 and extended here. + """ + from unittest import mock + + from pyiceberg.table.update import update_table_metadata + from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles + + def make_file() -> DataFile: + return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record()) + + txn = table_v2.transaction() + + with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy: + # _summary() cost must not scale with the number of data files + def summary_calls(n_files: int) -> int: + append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + for _ in range(n_files): + append.append_data_file(make_file()) + spy.reset_mock() + append._summary() + return spy.call_count + + few, many = summary_calls(10), summary_calls(100) + assert few == many, f"_summary() update_table_metadata calls scale with file count ({few} vs {many})" + assert many <= 2, f"_summary() triggered {many} update_table_metadata calls; expected O(1)" + + # _MergeAppendFiles.__init__ should add exactly one call over _FastAppendFiles.__init__ + spy.reset_mock() + _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + fast_init = spy.call_count + spy.reset_mock() + _MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + merge_init = spy.call_count + assert merge_init - fast_init == 1, ( + f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra update_table_metadata " + "calls over its superclass; expected 1 (hoisted)" + )