diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index cca0af7628..5e3cbd3ac9 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -51,6 +51,7 @@ StringType, StructType, ) +from pyiceberg.utils.config import Config UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -891,15 +892,32 @@ def __hash__(self) -> int: return hash(self.manifest_path) -# Global cache for ManifestFile objects, keyed by manifest_path. -# This deduplicates ManifestFile objects across manifest lists, which commonly -# share manifests after append operations. -_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128) - -# Lock for thread-safe cache access +_DEFAULT_MANIFEST_CACHE_SIZE = 128 _manifest_cache_lock = threading.RLock() +def _init_manifest_cache() -> LRUCache[str, ManifestFile] | None: + """Initialize the manifest cache from config.""" + manifest_cache_size = Config().get_int("manifest-cache-size") + if manifest_cache_size is None: + manifest_cache_size = _DEFAULT_MANIFEST_CACHE_SIZE + if manifest_cache_size < 0: + raise ValueError(f"manifest-cache-size must be >= 0. Current value: {manifest_cache_size}") + if manifest_cache_size == 0: + return None + return LRUCache(maxsize=manifest_cache_size) + + +_manifest_cache = _init_manifest_cache() + + +def clear_manifest_cache() -> None: + """Clear the manifest cache. No-op if cache is disabled.""" + if _manifest_cache is not None: + with _manifest_cache_lock: + _manifest_cache.clear() + + def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: """Read manifests from a manifest list, deduplicating ManifestFile objects via cache. @@ -927,14 +945,18 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: file = io.new_input(manifest_list) manifest_files = list(read_manifest_list(file)) + if _manifest_cache is None: + return tuple(manifest_files) + result = [] with _manifest_cache_lock: + cache = _manifest_cache for manifest_file in manifest_files: manifest_path = manifest_file.manifest_path - if manifest_path in _manifest_cache: - result.append(_manifest_cache[manifest_path]) + if manifest_path in cache: + result.append(cache[manifest_path]) else: - _manifest_cache[manifest_path] = manifest_file + cache[manifest_path] = manifest_file result.append(manifest_file) return tuple(result) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index 82454c8574..19985b291e 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -32,8 +32,9 @@ import pyarrow as pa import pytest +from pyiceberg import manifest as manifest_module from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.manifest import _manifest_cache +from pyiceberg.manifest import clear_manifest_cache def generate_test_dataframe() -> pa.Table: @@ -64,7 +65,7 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog: @pytest.fixture(autouse=True) def clear_caches() -> None: """Clear caches before each test.""" - _manifest_cache.clear() + clear_manifest_cache() gc.collect() @@ -95,7 +96,8 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: # Sample memory at intervals if (i + 1) % 10 == 0: current, _ = tracemalloc.get_traced_memory() - cache_size = len(_manifest_cache) + cache = manifest_module._manifest_cache + cache_size = len(cache) if cache is not None else 0 memory_samples.append((i + 1, current, cache_size)) print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}") @@ -150,13 +152,14 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> gc.collect() before_clear_memory, _ = tracemalloc.get_traced_memory() - cache_size_before = len(_manifest_cache) + cache = manifest_module._manifest_cache + cache_size_before = len(cache) if cache is not None else 0 print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB") print(f" Cache size: {cache_size_before}") # Phase 2: Clear cache and GC print("\nPhase 2: Clearing cache and running GC...") - _manifest_cache.clear() + clear_manifest_cache() gc.collect() gc.collect() # Multiple GC passes for thorough cleanup @@ -192,6 +195,7 @@ def test_manifest_cache_deduplication_efficiency() -> None: ManifestEntry, ManifestEntryStatus, _manifests, + clear_manifest_cache, write_manifest, write_manifest_list, ) @@ -245,7 +249,7 @@ def test_manifest_cache_deduplication_efficiency() -> None: num_lists = 10 print(f"Creating {num_lists} manifest lists with overlapping manifests...") - _manifest_cache.clear() + clear_manifest_cache() for i in range(num_lists): list_path = f"{tmp_dir}/manifest-list_{i}.avro" @@ -265,7 +269,8 @@ def test_manifest_cache_deduplication_efficiency() -> None: _manifests(io, list_path) # Analyze cache efficiency - cache_entries = len(_manifest_cache) + cache = manifest_module._manifest_cache + cache_entries = len(cache) if cache is not None else 0 # List i contains manifests 0..i, so only the first num_lists manifests are actually used manifests_actually_used = num_lists diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 3f859b3b32..9f94e00192 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,10 +16,12 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from unittest import mock import fastavro import pytest +import pyiceberg.manifest as manifest_module from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO @@ -32,8 +34,8 @@ ManifestEntryStatus, ManifestFile, PartitionFieldSummary, - _manifest_cache, _manifests, + clear_manifest_cache, read_manifest_list, write_manifest, write_manifest_list, @@ -46,9 +48,10 @@ @pytest.fixture(autouse=True) -def clear_global_manifests_cache() -> None: - # Clear the global cache before each test - _manifest_cache.clear() +def reset_global_manifests_cache() -> None: + with manifest_module._manifest_cache_lock: + manifest_module._manifest_cache = manifest_module._init_manifest_cache() + clear_manifest_cache() def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None: @@ -804,9 +807,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: # Verify cache size - should only have 3 unique ManifestFile objects # instead of 1 + 2 + 3 = 6 objects as with the old approach - assert len(_manifest_cache) == 3, ( - f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" - ) + cache = manifest_module._manifest_cache + assert cache is not None, "Manifest cache should be enabled for this test" + assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}" def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: @@ -879,9 +882,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: # With the new approach, we should have exactly N objects # Verify cache has exactly N unique entries - assert len(_manifest_cache) == num_manifests, ( + cache = manifest_module._manifest_cache + assert cache is not None, "Manifest cache should be enabled for this test" + assert len(cache) == num_manifests, ( f"Cache should contain exactly {num_manifests} ManifestFile objects, " - f"but has {len(_manifest_cache)}. " + f"but has {len(cache)}. " f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." ) @@ -932,3 +937,109 @@ def test_manifest_writer_tell(format_version: TableVersion) -> None: after_entry_bytes = writer.tell() assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry" + + +def test_clear_manifest_cache() -> None: + """Test that clear_manifest_cache() clears cache entries while keeping cache enabled.""" + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create a manifest file + manifest_path = f"{tmp_dir}/manifest.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=1, + avro_compression="zstandard", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=data_file, + ) + ) + manifest_file = writer.to_manifest_file() + + # Create a manifest list + list_path = f"{tmp_dir}/manifest-list.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=1, + parent_snapshot_id=None, + sequence_number=1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file]) + + # Populate the cache + _manifests(io, list_path) + + # Verify cache has entries + cache = manifest_module._manifest_cache + assert cache is not None, "Cache should be enabled" + assert len(cache) > 0, "Cache should have entries after reading manifests" + + # Clear the cache + clear_manifest_cache() + + # Verify cache is empty but still enabled + cache_after = manifest_module._manifest_cache + assert cache_after is not None, "Cache should still be enabled after clear" + assert len(cache_after) == 0, "Cache should be empty after clear" + + +@pytest.mark.parametrize( + "env_vars,expected_enabled,expected_size", + [ + ({}, True, 128), # defaults + ({"PYICEBERG_MANIFEST_CACHE_SIZE": "64"}, True, 64), + ({"PYICEBERG_MANIFEST_CACHE_SIZE": "256"}, True, 256), + ({"PYICEBERG_MANIFEST_CACHE_SIZE": "0"}, False, 0), # size=0 disables cache + ], +) +def test_manifest_cache_config_valid_values(env_vars: dict[str, str], expected_enabled: bool, expected_size: int) -> None: + """Test that valid config values are applied correctly.""" + import os + + with mock.patch.dict(os.environ, env_vars, clear=False): + with manifest_module._manifest_cache_lock: + manifest_module._manifest_cache = manifest_module._init_manifest_cache() + cache = manifest_module._manifest_cache + + if expected_enabled: + assert cache is not None, "Cache should be enabled" + assert cache.maxsize == expected_size, f"Cache size should be {expected_size}" + else: + assert cache is None, "Cache should be disabled" + + +@pytest.mark.parametrize( + "env_vars,expected_error_substring", + [ + ({"PYICEBERG_MANIFEST_CACHE_SIZE": "abc"}, "manifest-cache-size should be an integer"), + ({"PYICEBERG_MANIFEST_CACHE_SIZE": "-5"}, "manifest-cache-size must be >= 0"), + ], +) +def test_manifest_cache_config_invalid_values(env_vars: dict[str, str], expected_error_substring: str) -> None: + """Test that invalid config values raise ValueError with appropriate message.""" + import os + + with mock.patch.dict(os.environ, env_vars, clear=False): + with pytest.raises(ValueError, match=expected_error_substring): + with manifest_module._manifest_cache_lock: + manifest_module._manifest_cache = manifest_module._init_manifest_cache()