From 7e1153a9eb3d788690cfaa241155691108f6f5ce Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 13:41:12 +0800 Subject: [PATCH] [python] Align SnapshotManager constructor with Java Switch SnapshotManager.__init__ from (table) to the Java-aligned (file_io, table_path, branch=None, snapshot_loader=None) so the class no longer depends on FileStoreTable. The new manager carries its own file_io / table_path / branch / snapshot_loader fields, mirroring paimon-core/.../utils/SnapshotManager.java. FileStoreTable.snapshot_manager() remains the canonical factory and now wires those four basics. All raw SnapshotManager(table) call sites across production and tests are migrated to table.snapshot_manager(). copy_with_branch is rewritten to construct the rebranched manager directly via the new constructor (no field-swap). Mock-style tests that patched the SnapshotManager class to intercept its instances now wire the mock through table.snapshot_manager.return_value, which matches how production code obtains its instance. Follow-up to #7756. --- .../incremental_diff_acceptance_test.py | 5 +- .../globalindex/global_index_scanner.py | 3 +- .../pypaimon/read/scanner/file_scanner.py | 3 +- .../pypaimon/read/streaming_table_scan.py | 3 +- paimon-python/pypaimon/read/table_scan.py | 3 +- .../pypaimon/snapshot/snapshot_manager.py | 37 +++++---- .../pypaimon/table/file_store_table.py | 7 +- .../pypaimon/table/source/full_text_scan.py | 3 +- .../table/source/vector_search_scan.py | 3 +- .../pypaimon/tests/blob_table_test.py | 3 +- .../pypaimon/tests/branch_manager_test.py | 8 +- .../pypaimon/tests/changelog_manager_test.py | 3 +- .../pypaimon/tests/data_evolution_test.py | 3 +- .../pypaimon/tests/file_store_commit_test.py | 19 ++--- .../tests/partition_predicate_test.py | 2 - .../tests/py36/rest_ao_read_write_test.py | 5 +- .../pypaimon/tests/reader_append_only_test.py | 11 ++- .../pypaimon/tests/reader_base_test.py | 9 +- .../pypaimon/tests/reader_primary_key_test.py | 9 +- .../pypaimon/tests/snapshot_manager_test.py | 76 +++++------------ .../tests/streaming_table_scan_test.py | 83 +++++++++---------- .../pypaimon/write/file_store_commit.py | 3 +- 22 files changed, 126 insertions(+), 175 deletions(-) diff --git a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py index 0dc7ebf53fff..a6a56850ffd1 100644 --- a/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py +++ b/paimon-python/pypaimon/acceptance/incremental_diff_acceptance_test.py @@ -38,7 +38,6 @@ AppendTableSplitGenerator from pypaimon.read.scanner.incremental_diff_scanner import \ IncrementalDiffScanner -from pypaimon.snapshot.snapshot_manager import SnapshotManager class IncrementalDiffAcceptanceTest(unittest.TestCase): @@ -94,7 +93,7 @@ def _create_table_with_snapshots(self, name, num_snapshots=5, partition_keys=Non def _read_via_diff(self, table, start_snap_id, end_snap_id): """Read data using IncrementalDiffScanner between two snapshots.""" - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id) end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id) @@ -106,7 +105,7 @@ def _read_via_diff(self, table, start_snap_id, end_snap_id): def _read_via_delta(self, table, start_snap_id, end_snap_id): """Read data by iterating delta_manifest_lists between two snapshots.""" - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() manifest_list_manager = ManifestListManager(table) manifest_file_manager = ManifestFileManager(table) diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py b/paimon-python/pypaimon/globalindex/global_index_scanner.py index 38408059a9bd..c51be0731b09 100644 --- a/paimon-python/pypaimon/globalindex/global_index_scanner.py +++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py @@ -111,8 +111,7 @@ def index_file_filter(entry): return False return global_index_meta.index_field_id in filter_field_ids - from pypaimon.snapshot.snapshot_manager import SnapshotManager - snapshot = SnapshotManager(table).get_latest_snapshot() + snapshot = table.snapshot_manager().get_latest_snapshot() index_file_handler = IndexFileHandler(table=table) entries = index_file_handler.scan(snapshot, index_file_filter) scanned_index_files = [entry.index_file for entry in entries] diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index ffbd83daf0da..39c740174643 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -41,7 +41,6 @@ PrimaryKeyTableSplitGenerator from pypaimon.read.split import DataSplit from pypaimon.snapshot.snapshot import Snapshot -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.source.deletion_file import DeletionFile @@ -179,7 +178,7 @@ def __init__( self.predicate_for_stats = remove_row_id_filter(predicate) if predicate else None self.limit = limit - self.snapshot_manager = SnapshotManager(table) + self.snapshot_manager = table.snapshot_manager() self.manifest_list_manager = ManifestListManager(table) self.manifest_file_manager = ManifestFileManager(table) diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py b/paimon-python/pypaimon/read/streaming_table_scan.py index f426d2064f33..f7f8ca41823e 100644 --- a/paimon-python/pypaimon/read/streaming_table_scan.py +++ b/paimon-python/pypaimon/read/streaming_table_scan.py @@ -48,7 +48,6 @@ from pypaimon.read.scanner.primary_key_table_split_generator import \ PrimaryKeyTableSplitGenerator from pypaimon.snapshot.snapshot import Snapshot -from pypaimon.snapshot.snapshot_manager import SnapshotManager class AsyncStreamingTableScan: @@ -103,7 +102,7 @@ def __init__( self._lookahead_size = 10 # How many snapshots to look ahead # Initialize managers - self._snapshot_manager = SnapshotManager(table) + self._snapshot_manager = table.snapshot_manager() self._manifest_list_manager = ManifestListManager(table) self._manifest_file_manager = ManifestFileManager(table) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index c754b5011168..562bea26f596 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -23,7 +23,6 @@ from pypaimon.read.plan import Plan from pypaimon.read.scanner.file_scanner import FileScanner -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.manifest.manifest_list_manager import ManifestListManager @@ -48,7 +47,7 @@ def plan(self) -> Plan: def _create_file_scanner(self) -> FileScanner: options = self.table.options.options - snapshot_manager = SnapshotManager(self.table) + snapshot_manager = self.table.snapshot_manager() manifest_list_manager = ManifestListManager(self.table) if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP): ts = options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",") diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py b/paimon-python/pypaimon/snapshot/snapshot_manager.py index 074fc844cda0..6f2bef722588 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_manager.py +++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py @@ -29,24 +29,24 @@ class SnapshotManager: """Manager for snapshot files using unified FileIO.""" - def __init__(self, table, branch: Optional[str] = None): - # Lazy imports to avoid a cycle: pypaimon.branch.__init__ + def __init__( + self, + file_io: FileIO, + table_path: str, + branch: Optional[str] = None, + snapshot_loader: Optional[SnapshotLoader] = None, + ): + # Lazy import to avoid a cycle: pypaimon.branch.__init__ # eagerly loads FileSystemBranchManager, which imports # SnapshotManager. from pypaimon.branch.branch_manager import BranchManager - from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH - from pypaimon.table.file_store_table import FileStoreTable - - self.table: FileStoreTable = table - self.file_io: FileIO = self.table.file_io - self.snapshot_loader: Optional[SnapshotLoader] = self.table.catalog_environment.snapshot_loader() - if branch is None: - branch = self.table.current_branch() or DEFAULT_MAIN_BRANCH - self.branch = BranchManager.normalize_branch(branch) + self.file_io: FileIO = file_io + self.table_path: str = table_path.rstrip('/') + self.branch: str = BranchManager.normalize_branch(branch) + self.snapshot_loader: Optional[SnapshotLoader] = snapshot_loader - table_root = self.table.table_path.rstrip('/') - branch_root = BranchManager.branch_path(table_root, self.branch) + branch_root = BranchManager.branch_path(self.table_path, self.branch) self.snapshot_dir = f"{branch_root}/snapshot" self.latest_file = f"{self.snapshot_dir}/LATEST" @@ -55,10 +55,13 @@ def copy_with_branch(self, branch_name: str) -> 'SnapshotManager': # carries a SnapshotLoader rebranched to ``branch_name`` so REST # loads target the correct branch instead of falling back to the # main-branch identifier. - new_manager = SnapshotManager(self.table, branch_name) - if self.snapshot_loader is not None: - new_manager.snapshot_loader = self.snapshot_loader.copy_with_branch(branch_name) - return new_manager + rebranched_loader = ( + self.snapshot_loader.copy_with_branch(branch_name) + if self.snapshot_loader is not None + else None + ) + return SnapshotManager( + self.file_io, self.table_path, branch_name, rebranched_loader) def get_latest_snapshot(self) -> Optional[Snapshot]: """ diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 4b49a62869a2..35addad2518d 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -98,7 +98,12 @@ def consumer_manager(self): def snapshot_manager(self): """Get the snapshot manager for this table.""" from pypaimon.snapshot.snapshot_manager import SnapshotManager - return SnapshotManager(self) + return SnapshotManager( + self.file_io, + self.table_path, + self.current_branch(), + self.catalog_environment.snapshot_loader(), + ) def tag_manager(self): """Get the tag manager for this table.""" diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py b/paimon-python/pypaimon/table/source/full_text_scan.py index 06f72405b386..50b2381d2778 100644 --- a/paimon-python/pypaimon/table/source/full_text_scan.py +++ b/paimon-python/pypaimon/table/source/full_text_scan.py @@ -53,10 +53,9 @@ def __init__(self, table: 'FileStoreTable', text_column: 'DataField'): def scan(self) -> FullTextScanPlan: from pypaimon.index.index_file_handler import IndexFileHandler - from pypaimon.snapshot.snapshot_manager import SnapshotManager text_column = self._text_column - snapshot = SnapshotManager(self._table).get_latest_snapshot() + snapshot = self._table.snapshot_manager().get_latest_snapshot() from pypaimon.snapshot.time_travel_util import TimeTravelUtil from pypaimon.common.options.options import Options diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py b/paimon-python/pypaimon/table/source/vector_search_scan.py index 5245d41f195b..035e51a846cd 100644 --- a/paimon-python/pypaimon/table/source/vector_search_scan.py +++ b/paimon-python/pypaimon/table/source/vector_search_scan.py @@ -60,7 +60,6 @@ def scan(self): from pypaimon.common.options.options import Options from pypaimon.index.index_file_handler import IndexFileHandler from pypaimon.read.push_down_utils import _get_all_fields - from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.snapshot.time_travel_util import TimeTravelUtil vector_column = self._vector_column @@ -81,7 +80,7 @@ def scan(self): self._table.tag_manager() ) if snapshot is None: - snapshot = SnapshotManager(self._table).get_latest_snapshot() + snapshot = self._table.snapshot_manager().get_latest_snapshot() index_file_handler = IndexFileHandler(table=self._table) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 7e4d6c6d26a1..2692aa2ea099 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -2748,7 +2748,6 @@ def test_concurrent_blob_writes_with_retry(self): """Test concurrent blob writes to verify retry mechanism works correctly.""" import threading from pypaimon import Schema - from pypaimon.snapshot.snapshot_manager import SnapshotManager # Run the test 10 times to verify stability iter_num = 2 @@ -2873,7 +2872,7 @@ def write_blob_data(thread_id, start_id): self.assertIn(b'BLOB_PATTERN_', blob, f"Blob {i} should contain pattern") # Verify snapshot count (should have num_threads snapshots) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() latest_snapshot = snapshot_manager.get_latest_snapshot() self.assertIsNotNone(latest_snapshot, f"Iteration {test_iteration}: Latest snapshot should not be None") diff --git a/paimon-python/pypaimon/tests/branch_manager_test.py b/paimon-python/pypaimon/tests/branch_manager_test.py index 7f79eb1d3869..76a4a287c94c 100644 --- a/paimon-python/pypaimon/tests/branch_manager_test.py +++ b/paimon-python/pypaimon/tests/branch_manager_test.py @@ -243,11 +243,11 @@ def test_copy_with_branch_rebranches_snapshot_loader(self): self.assertIsNot(branch_sm.snapshot_loader, sm.snapshot_loader) self.assertEqual(branch_sm.snapshot_loader.identifier.branch, "b1") self.assertEqual( - branch_sm.snapshot_loader.identifier.database, - sm.snapshot_loader.identifier.database) + branch_sm.snapshot_loader.identifier.get_database_name(), + sm.snapshot_loader.identifier.get_database_name()) self.assertEqual( - branch_sm.snapshot_loader.identifier.object, - sm.snapshot_loader.identifier.object) + branch_sm.snapshot_loader.identifier.get_table_name(), + sm.snapshot_loader.identifier.get_table_name()) # Original loader's identifier untouched. self.assertIsNone(sm.snapshot_loader.identifier.branch) diff --git a/paimon-python/pypaimon/tests/changelog_manager_test.py b/paimon-python/pypaimon/tests/changelog_manager_test.py index 63d9157e5ebd..24c079316ad1 100644 --- a/paimon-python/pypaimon/tests/changelog_manager_test.py +++ b/paimon-python/pypaimon/tests/changelog_manager_test.py @@ -121,9 +121,8 @@ def test_earliest_long_lived_changelog_id_none(self): def test_changelog_from_snapshot(self): """Test that Changelog can be created from a Snapshot.""" - from pypaimon.snapshot.snapshot_manager import SnapshotManager - snapshot_manager = SnapshotManager(self.table) + snapshot_manager = self.table.snapshot_manager() snapshot = snapshot_manager.get_latest_snapshot() if snapshot: diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index b9ca4c7accb5..edd3ac9007c9 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -29,7 +29,6 @@ from pypaimon import CatalogFactory, Schema from pypaimon.common.predicate import Predicate from pypaimon.manifest.manifest_list_manager import ManifestListManager -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.offset_row import OffsetRow @@ -192,7 +191,7 @@ def test_partitioned_read_requested_column_missing_in_file(self): # Assert manifest file meta contains min and max row id manifest_list_manager = ManifestListManager(table) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() all_manifests = manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot()) first_commit = next((m for m in all_manifests if m.min_row_id == 0 and m.max_row_id == 1), None) self.assertIsNotNone(first_commit, "Should have a manifest with min_row_id=0, max_row_id=1") diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index 958ea85a6b7e..d4943a695232 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -28,7 +28,6 @@ from pypaimon.write.file_store_commit import FileStoreCommit -@patch('pypaimon.write.file_store_commit.SnapshotManager') @patch('pypaimon.write.file_store_commit.ManifestFileManager') @patch('pypaimon.write.file_store_commit.ManifestListManager') class TestFileStoreCommit(unittest.TestCase): @@ -55,7 +54,7 @@ def _create_file_store_commit(self): ) def test_generate_partition_statistics_single_partition_single_file( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation with single partition and single file.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -107,7 +106,7 @@ def test_generate_partition_statistics_single_partition_single_file( self.assertEqual(stat.last_file_creation_time, expected_time) def test_generate_partition_statistics_multiple_files_same_partition( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation with multiple files in same partition.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -171,7 +170,7 @@ def test_generate_partition_statistics_multiple_files_same_partition( self.assertEqual(stat.last_file_creation_time, expected_time) def test_generate_partition_statistics_multiple_partitions( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation with multiple different partitions.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -259,7 +258,7 @@ def test_generate_partition_statistics_multiple_partitions( self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024) def test_generate_partition_statistics_unpartitioned_table( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation for unpartitioned table.""" # Update mock table to have no partition keys self.mock_table.partition_keys = [] @@ -310,7 +309,7 @@ def test_generate_partition_statistics_unpartitioned_table( self.assertEqual(stat.file_size_in_bytes, 1024 * 1024) def test_generate_partition_statistics_no_creation_time( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation when file has no creation time.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -347,7 +346,7 @@ def test_generate_partition_statistics_no_creation_time( self.assertGreater(stat.last_file_creation_time, 0) def test_generate_partition_statistics_mismatched_partition_keys( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation when partition tuple doesn't match partition keys.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -393,7 +392,7 @@ def test_generate_partition_statistics_mismatched_partition_keys( self.assertEqual(stat.spec, expected_spec) def test_generate_partition_statistics_empty_commit_messages( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): """Test partition statistics generation with empty commit messages list.""" # Create FileStoreCommit instance file_store_commit = self._create_file_store_commit() @@ -405,7 +404,7 @@ def test_generate_partition_statistics_empty_commit_messages( self.assertEqual(len(statistics), 0) def test_append_commit_inherits_index_manifest( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): file_store_commit = self._create_file_store_commit() self.mock_table.identifier = 'default.test_table' @@ -448,7 +447,7 @@ def test_append_commit_inherits_index_manifest( ) def test_null_partition_value( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): + self, mock_manifest_list_manager, mock_manifest_file_manager): from pypaimon.data.timestamp import Timestamp from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.schema.data_types import DataField, AtomicType diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py b/paimon-python/pypaimon/tests/partition_predicate_test.py index a9c94823739c..1dbc9a150fc1 100644 --- a/paimon-python/pypaimon/tests/partition_predicate_test.py +++ b/paimon-python/pypaimon/tests/partition_predicate_test.py @@ -96,7 +96,6 @@ def _manifest_entry(partition_values): ) -@patch('pypaimon.read.scanner.file_scanner.SnapshotManager') @patch('pypaimon.read.scanner.file_scanner.ManifestFileManager') @patch('pypaimon.read.scanner.file_scanner.ManifestListManager') class TestFileScannerPartitionPredicate(unittest.TestCase): @@ -154,7 +153,6 @@ def test_filters_manifest_entry_by_partition(self, *_): _manifest_entry(['2024-01-15', 'us-west-2']))) -@patch('pypaimon.write.file_store_commit.SnapshotManager') @patch('pypaimon.write.file_store_commit.ManifestFileManager') @patch('pypaimon.write.file_store_commit.ManifestListManager') class TestOverwritePartitionPredicate(unittest.TestCase): diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index cfdf33b755d5..5258aec1d033 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -37,7 +37,6 @@ from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.schema.data_types import AtomicType, DataField -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer, GenericRowSerializer) from pypaimon.table.row.row_kind import RowKind @@ -180,7 +179,7 @@ def test_full_data_types(self): self.assertEqual(actual_data, expect_data) # to test GenericRow ability - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, @@ -771,7 +770,7 @@ def test_incremental_timestamp(self): timestamp = int(time.time() * 1000) self._write_test_table(table) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() t1 = snapshot_manager.get_snapshot_by_id(1).time_millis t2 = snapshot_manager.get_snapshot_by_id(2).time_millis # test 1 diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 00b2f803f7ec..d922cb2e3060 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -31,7 +31,6 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import GenericRow from pypaimon.write.file_store_commit import RetryResult @@ -137,7 +136,7 @@ def test_incremental_timestamp_empty_range_keeps_end_snapshot_id(self): table_write.close() table_commit.close() - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() snapshot = snapshot_manager.get_latest_snapshot() table_inc = table.copy({ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(): @@ -478,7 +477,7 @@ def test_commit_retry_filter(self): table_commit.commit(messages) table_write.close() - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() latest_snapshot = snapshot_manager.get_latest_snapshot() commit_entries = [] for msg in messages: @@ -673,7 +672,7 @@ def test_incremental_timestamp(self): timestamp = int(time.time() * 1000) self._write_test_table(table) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() t1 = snapshot_manager.get_snapshot_by_id(1).time_millis t2 = snapshot_manager.get_snapshot_by_id(2).time_millis # test 1 @@ -713,7 +712,7 @@ def test_incremental_read_multi_snapshots(self): table_write.close() table_commit.close() - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() t10 = snapshot_manager.get_snapshot_by_id(10).time_millis t20 = snapshot_manager.get_snapshot_by_id(20).time_millis @@ -820,7 +819,7 @@ def write_data(thread_id, start_user_id): f"Iteration {test_iteration}: User IDs mismatch") # Verify snapshot count (should have num_threads snapshots) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() latest_snapshot = snapshot_manager.get_latest_snapshot() self.assertIsNotNone(latest_snapshot, f"Iteration {test_iteration}: Latest snapshot should not be None") diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 657678f9eabc..f1b147d147c6 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -38,7 +38,6 @@ from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, MapType, PyarrowFieldParser) from pypaimon.schema.table_schema import TableSchema -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer from pypaimon.write.file_store_commit import FileStoreCommit @@ -226,7 +225,7 @@ def test_full_data_types(self): self.assertEqual(actual_data, expect_data) # to test GenericRow ability - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, lambda row: table_scan.file_scanner._filter_manifest_entry(row), False) @@ -517,7 +516,7 @@ def test_primary_key_value_stats_excludes_system_fields(self): pk_read_builder = pk_table.new_read_builder() pk_table_scan = pk_read_builder.new_scan() - latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot() + latest_snapshot = pk_table.snapshot_manager().get_latest_snapshot() pk_manifest_files = pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) pk_manifest_entries = pk_table_scan.file_scanner.manifest_file_manager.read( pk_manifest_files[0].file_name, @@ -592,7 +591,7 @@ def test_value_stats_empty_when_stats_disabled(self): read_builder = table.new_read_builder() table_scan = read_builder.new_scan() - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, @@ -1105,7 +1104,7 @@ def test_primary_key_value_stats(self): # Read manifest to verify value_stats_cols is None (all fields included) read_builder = table.new_read_builder() table_scan = read_builder.new_scan() - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 541dabe895b8..36ec678c8c15 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -26,7 +26,6 @@ from pypaimon import CatalogFactory, Schema from pypaimon.common.options.core_options import CoreOptions -from pypaimon.snapshot.snapshot_manager import SnapshotManager class PkReaderTest(unittest.TestCase): @@ -339,7 +338,7 @@ def test_incremental_timestamp(self): timestamp = int(time.time() * 1000) self._write_test_table(table) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() t1 = snapshot_manager.get_snapshot_by_id(1).time_millis t2 = snapshot_manager.get_snapshot_by_id(2).time_millis # test 1 @@ -386,7 +385,7 @@ def test_incremental_read_multi_snapshots(self): table_write.close() table_commit.close() - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() t10 = snapshot_manager.get_snapshot_by_id(10).time_millis t20 = snapshot_manager.get_snapshot_by_id(20).time_millis @@ -412,7 +411,7 @@ def test_manifest_creation_time_timestamp(self): self._write_test_table(table) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() latest_snapshot = snapshot_manager.get_latest_snapshot() read_builder = table.new_read_builder() table_scan = read_builder.new_scan() @@ -575,7 +574,7 @@ def write_data(thread_id, start_user_id): f"Iteration {test_iteration}: User IDs mismatch") # Verify snapshot count (should have num_threads snapshots) - snapshot_manager = SnapshotManager(table) + snapshot_manager = table.snapshot_manager() latest_snapshot = snapshot_manager.get_latest_snapshot() self.assertIsNotNone(latest_snapshot, f"Iteration {test_iteration}: Latest snapshot should not be None") diff --git a/paimon-python/pypaimon/tests/snapshot_manager_test.py b/paimon-python/pypaimon/tests/snapshot_manager_test.py index 0d1f1b1388f1..d2ac8360c53a 100644 --- a/paimon-python/pypaimon/tests/snapshot_manager_test.py +++ b/paimon-python/pypaimon/tests/snapshot_manager_test.py @@ -33,64 +33,47 @@ def _create_mock_snapshot(snapshot_id: int, commit_kind: str = "APPEND"): return snapshot +def _build_manager(file_io): + from pypaimon.snapshot.snapshot_manager import SnapshotManager + return SnapshotManager(file_io, "/tmp/test_table") + + class SnapshotManagerTest(unittest.TestCase): """Tests for SnapshotManager batch lookahead methods.""" def test_find_next_scannable_returns_first_matching(self): """find_next_scannable should return the first snapshot that passes should_scan.""" - from pypaimon.snapshot.snapshot_manager import SnapshotManager - - table = Mock() - table.table_path = "/tmp/test_table" - table.current_branch.return_value = "main" - table.file_io = Mock() - table.file_io.exists_batch.return_value = { + file_io = Mock() + file_io.exists_batch.return_value = { "/tmp/test_table/snapshot/snapshot-5": True, "/tmp/test_table/snapshot/snapshot-6": True, "/tmp/test_table/snapshot/snapshot-7": True, } - table.catalog_environment = Mock() - table.catalog_environment.snapshot_loader.return_value = None - # Create mock snapshots with different commit kinds snapshots = { 5: _create_mock_snapshot(5, "COMPACT"), 6: _create_mock_snapshot(6, "COMPACT"), 7: _create_mock_snapshot(7, "APPEND"), } - manager = SnapshotManager(table) + manager = _build_manager(file_io) + manager.get_snapshot_by_id = lambda sid: snapshots.get(sid) - # Mock get_snapshot_by_id to return our test snapshots - def mock_get_snapshot(sid): - return snapshots.get(sid) - - manager.get_snapshot_by_id = mock_get_snapshot - - # should_scan only accepts APPEND commits def should_scan(snapshot): return snapshot.commit_kind == "APPEND" result, next_id, skipped_count = manager.find_next_scannable(5, should_scan, lookahead_size=5) - self.assertEqual(result.id, 7) # First APPEND snapshot - self.assertEqual(next_id, 8) # Next ID to check - self.assertEqual(skipped_count, 2) # Skipped snapshots 5 and 6 + self.assertEqual(result.id, 7) + self.assertEqual(next_id, 8) + self.assertEqual(skipped_count, 2) def test_find_next_scannable_returns_none_when_no_snapshot_exists(self): """find_next_scannable should return None when no snapshot exists at start_id.""" - from pypaimon.snapshot.snapshot_manager import SnapshotManager - - table = Mock() - table.table_path = "/tmp/test_table" - table.current_branch.return_value = "main" - table.file_io = Mock() - # All paths return False (no files exist) - table.file_io.exists_batch.return_value = {} - table.catalog_environment = Mock() - table.catalog_environment.snapshot_loader.return_value = None + file_io = Mock() + file_io.exists_batch.return_value = {} - manager = SnapshotManager(table) + manager = _build_manager(file_io) def should_scan(snapshot): return True @@ -98,26 +81,17 @@ def should_scan(snapshot): result, next_id, skipped_count = manager.find_next_scannable(5, should_scan, lookahead_size=5) self.assertIsNone(result) - self.assertEqual(next_id, 5) # Still at start_id + self.assertEqual(next_id, 5) self.assertEqual(skipped_count, 0) def test_find_next_scannable_continues_when_all_skipped(self): """When all lookahead snapshots are skipped, next_id should be start+lookahead.""" - from pypaimon.snapshot.snapshot_manager import SnapshotManager - - table = Mock() - table.table_path = "/tmp/test_table" - table.current_branch.return_value = "main" - table.file_io = Mock() - - # All 3 snapshots exist but are COMPACT (will be skipped) - table.file_io.exists_batch.return_value = { + file_io = Mock() + file_io.exists_batch.return_value = { "/tmp/test_table/snapshot/snapshot-5": True, "/tmp/test_table/snapshot/snapshot-6": True, "/tmp/test_table/snapshot/snapshot-7": True, } - table.catalog_environment = Mock() - table.catalog_environment.snapshot_loader.return_value = None snapshots = { 5: _create_mock_snapshot(5, "COMPACT"), @@ -125,21 +99,17 @@ def test_find_next_scannable_continues_when_all_skipped(self): 7: _create_mock_snapshot(7, "COMPACT"), } - manager = SnapshotManager(table) - - def mock_get_snapshot(sid): - return snapshots.get(sid) - - manager.get_snapshot_by_id = mock_get_snapshot + manager = _build_manager(file_io) + manager.get_snapshot_by_id = lambda sid: snapshots.get(sid) def should_scan(snapshot): return snapshot.commit_kind == "APPEND" result, next_id, skipped_count = manager.find_next_scannable(5, should_scan, lookahead_size=3) - self.assertIsNone(result) # No APPEND found - self.assertEqual(next_id, 8) # 5 + 3 = 8, continue from here - self.assertEqual(skipped_count, 3) # All 3 were skipped + self.assertIsNone(result) + self.assertEqual(next_id, 8) + self.assertEqual(skipped_count, 3) if __name__ == '__main__': diff --git a/paimon-python/pypaimon/tests/streaming_table_scan_test.py b/paimon-python/pypaimon/tests/streaming_table_scan_test.py index a0b456027ed0..7e5386a86fa9 100644 --- a/paimon-python/pypaimon/tests/streaming_table_scan_test.py +++ b/paimon-python/pypaimon/tests/streaming_table_scan_test.py @@ -67,14 +67,14 @@ def _create_mock_table(latest_snapshot_id: int = 5): class AsyncStreamingTableScanTest(unittest.TestCase): """Tests for AsyncStreamingTableScan async streaming functionality.""" - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.FileScanner') - def test_initial_scan(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + def test_initial_scan(self, MockStartingScanner, MockManifestListManager): """Initial scan should yield a Plan and set next_snapshot_id to latest + 1.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(5) mock_snapshot_manager.get_snapshot_by_id.return_value = None @@ -91,15 +91,15 @@ async def get_first_plan(): self.assertIsInstance(plan, Plan) self.assertEqual(scan.next_snapshot_id, 6) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.FileScanner') - def test_stream_skips_non_append_commits(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + def test_stream_skips_non_append_commits(self, MockStartingScanner, MockManifestListManager): """Stream should skip COMPACT/OVERWRITE commits.""" table, _ = _create_mock_table(latest_snapshot_id=7) # Setup mocks - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager # Snapshots: 6 (COMPACT - skip), 7 (APPEND - scan) snapshot_7 = _create_mock_snapshot(7, "APPEND") @@ -137,15 +137,15 @@ async def get_plans(): # Verify lookahead skipped 1 snapshot self.assertEqual(scan._lookahead_skips, 1) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.FileScanner') - def test_stream_sync_yields_plans(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + def test_stream_sync_yields_plans(self, MockStartingScanner, MockManifestListManager): """stream_sync() should provide a synchronous iterator.""" table, _ = _create_mock_table(latest_snapshot_id=5) # Setup mocks - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(5) mock_snapshot_manager.get_snapshot_by_id.return_value = None @@ -159,9 +159,8 @@ def test_stream_sync_yields_plans(self, MockStartingScanner, MockManifestListMan self.assertIsInstance(plan, Plan) break # Just get one - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') - def test_poll_interval_configurable(self, MockManifestListManager, MockSnapshotManager): + def test_poll_interval_configurable(self, MockManifestListManager): """Poll interval should be configurable.""" table, _ = _create_mock_table() @@ -169,14 +168,14 @@ def test_poll_interval_configurable(self, MockManifestListManager, MockSnapshotM self.assertEqual(scan.poll_interval, 0.5) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.FileScanner') - def test_no_snapshot_waits_and_polls(self, MockStartingScanner, MockManifestListManager, MockSnapshotManager): + def test_no_snapshot_waits_and_polls(self, MockStartingScanner, MockManifestListManager): """When no new snapshot exists, should wait and poll again.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} # No snapshot 6 exists yet - find_next_scannable returns (None, 6, 0) first, @@ -216,36 +215,33 @@ async def get_plan_with_timeout(): class StreamingPrefetchTest(unittest.TestCase): """Tests for prefetching functionality in AsyncStreamingTableScan.""" - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') - def test_prefetch_enabled_by_default(self, MockManifestFileManager, MockManifestListManager, MockSnapshotManager): + def test_prefetch_enabled_by_default(self, MockManifestFileManager, MockManifestListManager): """Prefetching should be enabled by default.""" table, _ = _create_mock_table() scan = AsyncStreamingTableScan(table) self.assertTrue(scan._prefetch_enabled) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') - def test_prefetch_can_be_disabled(self, MockManifestFileManager, MockManifestListManager, MockSnapshotManager): + def test_prefetch_can_be_disabled(self, MockManifestFileManager, MockManifestListManager): """Prefetching can be disabled via constructor parameter.""" table, _ = _create_mock_table() scan = AsyncStreamingTableScan(table, prefetch_enabled=False) self.assertFalse(scan._prefetch_enabled) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_prefetch_starts_after_yielding_plan( self, MockManifestFileManager, - MockManifestListManager, - MockSnapshotManager): + MockManifestListManager): """After yielding a plan, prefetch for next snapshot should start.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_manifest_list_manager = MockManifestListManager.return_value mock_manifest_file_manager = MockManifestFileManager.return_value mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} @@ -292,18 +288,17 @@ async def get_two_plans(): plans = asyncio.run(get_two_plans()) self.assertEqual(len(plans), 2) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_prefetch_returns_same_data_as_sequential( self, MockManifestFileManager, - MockManifestListManager, - MockSnapshotManager): + MockManifestListManager): """Prefetched plans should contain the same data as non-prefetched.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_manifest_list_manager = MockManifestListManager.return_value mock_manifest_file_manager = MockManifestFileManager.return_value mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} @@ -346,18 +341,17 @@ async def get_plans(scan, count): # Both should get the same number of plans self.assertEqual(len(plans_prefetch), len(plans_sequential)) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_prefetch_handles_no_next_snapshot( self, MockManifestFileManager, - MockManifestListManager, - MockSnapshotManager): + MockManifestListManager): """When no next snapshot exists, prefetch should return None gracefully.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_manifest_list_manager = MockManifestListManager.return_value mock_manifest_file_manager = MockManifestFileManager.return_value mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} @@ -391,18 +385,17 @@ async def get_one_plan(): plan = asyncio.run(get_one_plan()) self.assertIsInstance(plan, Plan) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_prefetch_disabled_no_prefetch_future( self, MockManifestFileManager, - MockManifestListManager, - MockSnapshotManager): + MockManifestListManager): """With prefetch disabled, no prefetch future should be created.""" table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_manifest_list_manager = MockManifestListManager.return_value mock_manifest_file_manager = MockManifestFileManager.return_value mock_snapshot_manager.get_cache_stats.return_value = {"cache_hits": 0, "cache_misses": 0, "cache_size": 0} @@ -437,12 +430,11 @@ class StreamingCatchUpDiffTest(unittest.TestCase): """Tests for diff-based catch-up optimization in AsyncStreamingTableScan.""" @patch('pypaimon.read.streaming_table_scan.IncrementalDiffScanner') - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_stream_triggers_diff_catch_up_for_large_gap( self, MockManifestFileManager, MockManifestListManager, - MockSnapshotManager, MockDiffScanner + MockDiffScanner ): """ When starting with a large gap, stream() should use diff scanner. @@ -454,7 +446,8 @@ def test_stream_triggers_diff_catch_up_for_large_gap( """ table, _ = _create_mock_table(latest_snapshot_id=100) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_diff_scanner = MockDiffScanner.return_value # Setup: latest is 100, start is 5 (gap=95) @@ -493,19 +486,19 @@ class StreamingConsumerTest(unittest.TestCase): """Tests for consumer management integration in AsyncStreamingTableScan.""" @patch('pypaimon.read.streaming_table_scan.ConsumerManager') - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_consumer_restores_next_snapshot_id( self, MockManifestFileManager, MockManifestListManager, - MockSnapshotManager, MockConsumerManager + MockConsumerManager ): """When consumer exists, stream() should resume from saved position.""" from pypaimon.consumer.consumer import Consumer table, _ = _create_mock_table(latest_snapshot_id=10) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_consumer_manager = MockConsumerManager.return_value mock_manifest_list_manager = MockManifestListManager.return_value @@ -541,12 +534,11 @@ async def get_first_plan(): self.assertEqual(scan.next_snapshot_id, 9) @patch('pypaimon.read.streaming_table_scan.ConsumerManager') - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.FileScanner') def test_consumer_saves_after_yield( self, MockFileScanner, MockManifestListManager, - MockSnapshotManager, MockConsumerManager + MockConsumerManager ): """Consumer progress is flushed on the next __anext__() call after yielding a plan. @@ -556,7 +548,8 @@ def test_consumer_saves_after_yield( """ table, _ = _create_mock_table(latest_snapshot_id=5) - mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager = Mock() + table.snapshot_manager.return_value = mock_snapshot_manager mock_consumer_manager = MockConsumerManager.return_value # No existing consumer state @@ -597,12 +590,10 @@ async def get_first_plan_then_resume(): self.assertEqual(call_args[0][0], "save-test") self.assertEqual(call_args[0][1].next_snapshot, 6) - @patch('pypaimon.read.streaming_table_scan.SnapshotManager') @patch('pypaimon.read.streaming_table_scan.ManifestListManager') @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') def test_no_consumer_when_consumer_id_not_set( - self, MockManifestFileManager, MockManifestListManager, - MockSnapshotManager + self, MockManifestFileManager, MockManifestListManager ): """Without consumer_id, no ConsumerManager should be created.""" table, _ = _create_mock_table() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 97d4ed92988a..6eb7f5f5da99 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -34,7 +34,6 @@ from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import (PartitionStatistics, SnapshotCommit) -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import GenericRow from pypaimon.table.row.offset_row import OffsetRow from pypaimon.write.commit.commit_rollback import CommitRollback @@ -85,7 +84,7 @@ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: str): self.table: FileStoreTable = table self.commit_user = commit_user - self.snapshot_manager = SnapshotManager(table) + self.snapshot_manager = table.snapshot_manager() self.manifest_file_manager = ManifestFileManager(table) self.manifest_list_manager = ManifestListManager(table)