Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
AppendTableSplitGenerator
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager


class IncrementalDiffAcceptanceTest(unittest.TestCase):
Expand Down Expand Up @@ -94,7 +93,7 @@ def _create_table_with_snapshots(self, name, num_snapshots=5, partition_keys=Non

def _read_via_diff(self, table, start_snap_id, end_snap_id):
"""Read data using IncrementalDiffScanner between two snapshots."""
snapshot_manager = SnapshotManager(table)
snapshot_manager = table.snapshot_manager()
start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id)
end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id)

Expand All @@ -106,7 +105,7 @@ def _read_via_diff(self, table, start_snap_id, end_snap_id):

def _read_via_delta(self, table, start_snap_id, end_snap_id):
"""Read data by iterating delta_manifest_lists between two snapshots."""
snapshot_manager = SnapshotManager(table)
snapshot_manager = table.snapshot_manager()
manifest_list_manager = ManifestListManager(table)
manifest_file_manager = ManifestFileManager(table)

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/globalindex/global_index_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ def index_file_filter(entry):
return False
return global_index_meta.index_field_id in filter_field_ids

from pypaimon.snapshot.snapshot_manager import SnapshotManager
snapshot = SnapshotManager(table).get_latest_snapshot()
snapshot = table.snapshot_manager().get_latest_snapshot()
index_file_handler = IndexFileHandler(table=table)
entries = index_file_handler.scan(snapshot, index_file_filter)
scanned_index_files = [entry.index_file for entry in entries]
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.source.deletion_file import DeletionFile

Expand Down Expand Up @@ -179,7 +178,7 @@ def __init__(
self.predicate_for_stats = remove_row_id_filter(predicate) if predicate else None
self.limit = limit

self.snapshot_manager = SnapshotManager(table)
self.snapshot_manager = table.snapshot_manager()
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/read/streaming_table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager


class AsyncStreamingTableScan:
Expand Down Expand Up @@ -103,7 +102,7 @@ def __init__(
self._lookahead_size = 10 # How many snapshots to look ahead

# Initialize managers
self._snapshot_manager = SnapshotManager(table)
self._snapshot_manager = table.snapshot_manager()
self._manifest_list_manager = ManifestListManager(table)
self._manifest_file_manager = ManifestFileManager(table)

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from pypaimon.read.plan import Plan
from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager


Expand All @@ -48,7 +47,7 @@ def plan(self) -> Plan:

def _create_file_scanner(self) -> FileScanner:
options = self.table.options.options
snapshot_manager = SnapshotManager(self.table)
snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)
if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
ts = options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
Expand Down
37 changes: 20 additions & 17 deletions paimon-python/pypaimon/snapshot/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@
class SnapshotManager:
"""Manager for snapshot files using unified FileIO."""

def __init__(self, table, branch: Optional[str] = None):
# Lazy imports to avoid a cycle: pypaimon.branch.__init__
def __init__(
self,
file_io: FileIO,
table_path: str,
branch: Optional[str] = None,
snapshot_loader: Optional[SnapshotLoader] = None,
):
# Lazy import to avoid a cycle: pypaimon.branch.__init__
# eagerly loads FileSystemBranchManager, which imports
# SnapshotManager.
from pypaimon.branch.branch_manager import BranchManager
from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
from pypaimon.table.file_store_table import FileStoreTable

self.table: FileStoreTable = table
self.file_io: FileIO = self.table.file_io
self.snapshot_loader: Optional[SnapshotLoader] = self.table.catalog_environment.snapshot_loader()

if branch is None:
branch = self.table.current_branch() or DEFAULT_MAIN_BRANCH
self.branch = BranchManager.normalize_branch(branch)
self.file_io: FileIO = file_io
self.table_path: str = table_path.rstrip('/')
self.branch: str = BranchManager.normalize_branch(branch)
self.snapshot_loader: Optional[SnapshotLoader] = snapshot_loader

table_root = self.table.table_path.rstrip('/')
branch_root = BranchManager.branch_path(table_root, self.branch)
branch_root = BranchManager.branch_path(self.table_path, self.branch)
self.snapshot_dir = f"{branch_root}/snapshot"
self.latest_file = f"{self.snapshot_dir}/LATEST"

Expand All @@ -55,10 +55,13 @@ def copy_with_branch(self, branch_name: str) -> 'SnapshotManager':
# carries a SnapshotLoader rebranched to ``branch_name`` so REST
# loads target the correct branch instead of falling back to the
# main-branch identifier.
new_manager = SnapshotManager(self.table, branch_name)
if self.snapshot_loader is not None:
new_manager.snapshot_loader = self.snapshot_loader.copy_with_branch(branch_name)
return new_manager
rebranched_loader = (
self.snapshot_loader.copy_with_branch(branch_name)
if self.snapshot_loader is not None
else None
)
return SnapshotManager(
self.file_io, self.table_path, branch_name, rebranched_loader)

def get_latest_snapshot(self) -> Optional[Snapshot]:
"""
Expand Down
7 changes: 6 additions & 1 deletion paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ def consumer_manager(self):
def snapshot_manager(self):
"""Get the snapshot manager for this table."""
from pypaimon.snapshot.snapshot_manager import SnapshotManager
return SnapshotManager(self)
return SnapshotManager(
self.file_io,
self.table_path,
self.current_branch(),
self.catalog_environment.snapshot_loader(),
)

def tag_manager(self):
"""Get the tag manager for this table."""
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/table/source/full_text_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ def __init__(self, table: 'FileStoreTable', text_column: 'DataField'):

def scan(self) -> FullTextScanPlan:
from pypaimon.index.index_file_handler import IndexFileHandler
from pypaimon.snapshot.snapshot_manager import SnapshotManager

text_column = self._text_column
snapshot = SnapshotManager(self._table).get_latest_snapshot()
snapshot = self._table.snapshot_manager().get_latest_snapshot()

from pypaimon.snapshot.time_travel_util import TimeTravelUtil
from pypaimon.common.options.options import Options
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/table/source/vector_search_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def scan(self):
from pypaimon.common.options.options import Options
from pypaimon.index.index_file_handler import IndexFileHandler
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.snapshot.time_travel_util import TimeTravelUtil

vector_column = self._vector_column
Expand All @@ -81,7 +80,7 @@ def scan(self):
self._table.tag_manager()
)
if snapshot is None:
snapshot = SnapshotManager(self._table).get_latest_snapshot()
snapshot = self._table.snapshot_manager().get_latest_snapshot()

index_file_handler = IndexFileHandler(table=self._table)

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,6 @@ def test_concurrent_blob_writes_with_retry(self):
"""Test concurrent blob writes to verify retry mechanism works correctly."""
import threading
from pypaimon import Schema
from pypaimon.snapshot.snapshot_manager import SnapshotManager

# Run the test 10 times to verify stability
iter_num = 2
Expand Down Expand Up @@ -2873,7 +2872,7 @@ def write_blob_data(thread_id, start_id):
self.assertIn(b'BLOB_PATTERN_', blob, f"Blob {i} should contain pattern")

# Verify snapshot count (should have num_threads snapshots)
snapshot_manager = SnapshotManager(table)
snapshot_manager = table.snapshot_manager()
latest_snapshot = snapshot_manager.get_latest_snapshot()
self.assertIsNotNone(latest_snapshot,
f"Iteration {test_iteration}: Latest snapshot should not be None")
Expand Down
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/tests/branch_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ def test_copy_with_branch_rebranches_snapshot_loader(self):
self.assertIsNot(branch_sm.snapshot_loader, sm.snapshot_loader)
self.assertEqual(branch_sm.snapshot_loader.identifier.branch, "b1")
self.assertEqual(
branch_sm.snapshot_loader.identifier.database,
sm.snapshot_loader.identifier.database)
branch_sm.snapshot_loader.identifier.get_database_name(),
sm.snapshot_loader.identifier.get_database_name())
self.assertEqual(
branch_sm.snapshot_loader.identifier.object,
sm.snapshot_loader.identifier.object)
branch_sm.snapshot_loader.identifier.get_table_name(),
sm.snapshot_loader.identifier.get_table_name())
# Original loader's identifier untouched.
self.assertIsNone(sm.snapshot_loader.identifier.branch)

Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/tests/changelog_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ def test_earliest_long_lived_changelog_id_none(self):

def test_changelog_from_snapshot(self):
"""Test that Changelog can be created from a Snapshot."""
from pypaimon.snapshot.snapshot_manager import SnapshotManager

snapshot_manager = SnapshotManager(self.table)
snapshot_manager = self.table.snapshot_manager()
snapshot = snapshot_manager.get_latest_snapshot()

if snapshot:
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from pypaimon import CatalogFactory, Schema
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.offset_row import OffsetRow


Expand Down Expand Up @@ -192,7 +191,7 @@ def test_partitioned_read_requested_column_missing_in_file(self):

# Assert manifest file meta contains min and max row id
manifest_list_manager = ManifestListManager(table)
snapshot_manager = SnapshotManager(table)
snapshot_manager = table.snapshot_manager()
all_manifests = manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot())
first_commit = next((m for m in all_manifests if m.min_row_id == 0 and m.max_row_id == 1), None)
self.assertIsNotNone(first_commit, "Should have a manifest with min_row_id=0, max_row_id=1")
Expand Down
19 changes: 9 additions & 10 deletions paimon-python/pypaimon/tests/file_store_commit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from pypaimon.write.file_store_commit import FileStoreCommit


@patch('pypaimon.write.file_store_commit.SnapshotManager')
@patch('pypaimon.write.file_store_commit.ManifestFileManager')
@patch('pypaimon.write.file_store_commit.ManifestListManager')
class TestFileStoreCommit(unittest.TestCase):
Expand All @@ -55,7 +54,7 @@ def _create_file_store_commit(self):
)

def test_generate_partition_statistics_single_partition_single_file(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with single partition and single file."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand Down Expand Up @@ -107,7 +106,7 @@ def test_generate_partition_statistics_single_partition_single_file(
self.assertEqual(stat.last_file_creation_time, expected_time)

def test_generate_partition_statistics_multiple_files_same_partition(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with multiple files in same partition."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand Down Expand Up @@ -171,7 +170,7 @@ def test_generate_partition_statistics_multiple_files_same_partition(
self.assertEqual(stat.last_file_creation_time, expected_time)

def test_generate_partition_statistics_multiple_partitions(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with multiple different partitions."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand Down Expand Up @@ -259,7 +258,7 @@ def test_generate_partition_statistics_multiple_partitions(
self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024)

def test_generate_partition_statistics_unpartitioned_table(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation for unpartitioned table."""
# Update mock table to have no partition keys
self.mock_table.partition_keys = []
Expand Down Expand Up @@ -310,7 +309,7 @@ def test_generate_partition_statistics_unpartitioned_table(
self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)

def test_generate_partition_statistics_no_creation_time(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation when file has no creation time."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand Down Expand Up @@ -347,7 +346,7 @@ def test_generate_partition_statistics_no_creation_time(
self.assertGreater(stat.last_file_creation_time, 0)

def test_generate_partition_statistics_mismatched_partition_keys(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation when partition tuple doesn't match partition keys."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand Down Expand Up @@ -393,7 +392,7 @@ def test_generate_partition_statistics_mismatched_partition_keys(
self.assertEqual(stat.spec, expected_spec)

def test_generate_partition_statistics_empty_commit_messages(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
"""Test partition statistics generation with empty commit messages list."""
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
Expand All @@ -405,7 +404,7 @@ def test_generate_partition_statistics_empty_commit_messages(
self.assertEqual(len(statistics), 0)

def test_append_commit_inherits_index_manifest(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
file_store_commit = self._create_file_store_commit()

self.mock_table.identifier = 'default.test_table'
Expand Down Expand Up @@ -448,7 +447,7 @@ def test_append_commit_inherits_index_manifest(
)

def test_null_partition_value(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
self, mock_manifest_list_manager, mock_manifest_file_manager):
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import DataField, AtomicType
Expand Down
2 changes: 0 additions & 2 deletions paimon-python/pypaimon/tests/partition_predicate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def _manifest_entry(partition_values):
)


@patch('pypaimon.read.scanner.file_scanner.SnapshotManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestFileManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestListManager')
class TestFileScannerPartitionPredicate(unittest.TestCase):
Expand Down Expand Up @@ -154,7 +153,6 @@ def test_filters_manifest_entry_by_partition(self, *_):
_manifest_entry(['2024-01-15', 'us-west-2'])))


@patch('pypaimon.write.file_store_commit.SnapshotManager')
@patch('pypaimon.write.file_store_commit.ManifestFileManager')
@patch('pypaimon.write.file_store_commit.ManifestListManager')
class TestOverwritePartitionPredicate(unittest.TestCase):
Expand Down
5 changes: 2 additions & 3 deletions paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
GenericRowSerializer)
from pypaimon.table.row.row_kind import RowKind
Expand Down Expand Up @@ -180,7 +179,7 @@ def test_full_data_types(self):
self.assertEqual(actual_data, expect_data)

# to test GenericRow ability
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
latest_snapshot = table.snapshot_manager().get_latest_snapshot()
manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
Expand Down Expand Up @@ -771,7 +770,7 @@ def test_incremental_timestamp(self):
timestamp = int(time.time() * 1000)
self._write_test_table(table)

snapshot_manager = SnapshotManager(table)
snapshot_manager = table.snapshot_manager()
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
# test 1
Expand Down
Loading
Loading