From 61e05ec7355915617b1a0689571aedb2a7bb844d Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 14:47:20 -0700 Subject: [PATCH] Roll added manifests at commit.manifest.target-size-bytes in fast-append _FastAppendFiles previously wrote every added data file into a single manifest, regardless of commit.manifest.target-size-bytes. Java's RollingManifestWriter honours this property on the fast-append path too; this change brings pyiceberg to parity. The first manifest is written inline until it reaches the target, which yields an exact entries-per-manifest count; remaining chunks are then submitted to the existing ExecutorFactory pool so the GIL-bound encode of chunk K overlaps with the compress/upload (both GIL-releasing) of earlier chunks. _write_delete_manifest and _existing_manifests are submitted first so they run concurrently with the inline first chunk. Small appends (under the target in one manifest) take the same code path and produce a single manifest as before. Multiple small manifests let query planners read them in parallel, prune whole manifests via the manifest-list partition bounds without opening them, and keep delete/overwrite rewrites bounded to the affected manifest instead of the full set. --- pyiceberg/table/update/snapshot.py | 68 ++++++++++++------ tests/table/test_snapshot_producer.py | 99 +++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 20 deletions(-) create mode 100644 tests/table/test_snapshot_producer.py diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..7957d6ef12 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -21,6 +21,7 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Callable +from concurrent.futures import Future from datetime import datetime from functools import cached_property from typing import TYPE_CHECKING, Generic @@ -176,24 +177,30 @@ def _process_manifests(self, manifests: list[ManifestFile]) -> list[ManifestFile return manifests def _manifests(self) -> list[ManifestFile]: - def _write_added_manifest() -> list[ManifestFile]: - if self._added_data_files: - with self.new_manifest_writer( - spec=self._transaction.table_metadata.spec(), - ) as writer: - for data_file in self._added_data_files: - writer.add( - ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - snapshot_id=self._snapshot_id, - sequence_number=None, - file_sequence_number=None, - data_file=data_file, - ) - ) - return [writer.to_manifest_file()] - else: - return [] + from pyiceberg.table import TableProperties + + table_metadata = self._transaction.table_metadata + spec = table_metadata.spec() + target_size_bytes: int = property_as_int( + table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + + def _added_entry(data_file: DataFile) -> ManifestEntry: + return ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + sequence_number=None, + file_sequence_number=None, + data_file=data_file, + ) + + def _write_added_chunk(files: list[DataFile]) -> ManifestFile: + with self.new_manifest_writer(spec=spec) as writer: + for data_file in files: + writer.add(_added_entry(data_file)) + return writer.to_manifest_file() def _write_delete_manifest() -> list[ManifestFile]: # Check if we need to mark the files as deleted @@ -217,11 +224,32 @@ def _write_delete_manifest() -> list[ManifestFile]: executor = ExecutorFactory.get_or_create() - added_manifests = executor.submit(_write_added_manifest) delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) + # Roll added data files into multiple manifests sized to + # commit.manifest.target-size-bytes. The first manifest is written + # inline until it reaches the target, which yields an exact + # entries-per-manifest count; remaining chunks are then fanned out + # across the executor so one chunk's GIL-bound encode overlaps with + # the compress/upload of earlier chunks. + added_manifests: list[ManifestFile] = [] + added_futures: list[Future[ManifestFile]] = [] + if self._added_data_files: + added = self._added_data_files + with self.new_manifest_writer(spec=spec) as first: + i = 0 + while i < len(added) and (first.tell() < target_size_bytes or i == 0): + first.add(_added_entry(added[i])) + i += 1 + added_manifests.append(first.to_manifest_file()) + chunk_size = i + added_futures = [ + executor.submit(_write_added_chunk, added[j : j + chunk_size]) for j in range(i, len(added), chunk_size) + ] + + added_manifests += [f.result() for f in added_futures] + return self._process_manifests(added_manifests + delete_manifests.result() + existing_manifests.result()) def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties diff --git a/tests/table/test_snapshot_producer.py b/tests/table/test_snapshot_producer.py new file mode 100644 index 0000000000..e2ce41ba4a --- /dev/null +++ b/tests/table/test_snapshot_producer.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat +from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties +from pyiceberg.table.snapshots import Operation +from pyiceberg.table.update.snapshot import _FastAppendFiles +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def catalog(tmp_path: str) -> InMemoryCatalog: + cat = InMemoryCatalog("test", warehouse=str(tmp_path)) + cat.create_namespace("default") + return cat + + +def _make_data_file(i: int) -> DataFile: + return DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"file:///tmp/part-{i:08d}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=12345, + column_sizes={1: 100, 2: 100}, + value_counts={1: 100, 2: 100}, + null_value_counts={1: 0, 2: 0}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + key_metadata=None, + split_offsets=None, + equality_ids=None, + sort_order_id=None, + spec_id=0, + ) + + +def test_fast_append_rolls_added_manifests_at_target_size(catalog: InMemoryCatalog) -> None: + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "name", StringType(), required=False), + ) + table = catalog.create_table( + "default.roll", + schema=schema, + properties={TableProperties.MANIFEST_TARGET_SIZE_BYTES: "4096"}, + ) + + txn = table.transaction() + append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table.io) + n_files = 200 + for i in range(n_files): + append.append_data_file(_make_data_file(i)) + + manifests = append._manifests() + + assert len(manifests) > 1, f"expected added files to roll into multiple manifests, got {len(manifests)}" + assert sum(m.added_files_count or 0 for m in manifests) == n_files + # The first manifest is written until it reaches the target, so it may slightly + # overshoot; remaining manifests are chunked by the same entry count and should + # be in the same ballpark. + for m in manifests: + assert m.manifest_length < 4 * 4096, f"manifest {m.manifest_path} is {m.manifest_length} bytes" + + +def test_fast_append_single_manifest_when_under_target(catalog: InMemoryCatalog) -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + table = catalog.create_table("default.small", schema=schema) + + txn = table.transaction() + append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table.io) + for i in range(3): + append.append_data_file(_make_data_file(i)) + + manifests = append._manifests() + assert len(manifests) == 1 + assert manifests[0].added_files_count == 3