From 00d00cf0ef540fd0e07456c6b4dc156e10938cb5 Mon Sep 17 00:00:00 2001 From: Colin Chen Date: Wed, 29 Apr 2026 11:55:54 +0800 Subject: [PATCH 1/2] [python] Add PyPaimon maintenance operations --- docs/content/pypaimon/python-api.md | 68 +++++++ paimon-python/pypaimon/cli/cli_table.py | 147 +++++++++++++- paimon-python/pypaimon/common/json_util.py | 8 + .../pypaimon/snapshot/snapshot_manager.py | 22 +++ .../pypaimon/table/file_store_table.py | 10 +- paimon-python/pypaimon/table/maintenance.py | 185 ++++++++++++++++++ .../pypaimon/tests/cli_table_test.py | 34 ++++ .../pypaimon/tests/table/maintenance_test.py | 115 +++++++++++ .../pypaimon/write/file_store_commit.py | 22 +++ paimon-python/pypaimon/write/table_commit.py | 12 ++ 10 files changed, 618 insertions(+), 5 deletions(-) create mode 100644 paimon-python/pypaimon/table/maintenance.py create mode 100644 paimon-python/pypaimon/tests/table/maintenance_test.py diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 709f4d9a0a2a..42876011f22b 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -463,6 +463,30 @@ arrow_table = table_read.to_arrow(splits) pandas_df = table_read.to_pandas(splits) ``` +### Snapshot Query + +You can inspect table snapshots through `SnapshotManager`: + +```python +snapshot_manager = table.snapshot_manager() + +latest = snapshot_manager.get_latest_snapshot() +earliest = snapshot_manager.try_get_earliest_snapshot() +snapshot_10 = snapshot_manager.get_snapshot_by_id(10) +snapshots = snapshot_manager.list_snapshots() +snapshot_at_time = snapshot_manager.earlier_or_equal_time_mills(1700000000000) +``` + +The CLI also supports querying snapshots: + +```shell +paimon -c catalog.yaml table snapshot default.my_table +paimon -c catalog.yaml table snapshot default.my_table --id 10 +paimon -c catalog.yaml table snapshot default.my_table --earliest +paimon -c catalog.yaml table snapshot default.my_table --time-millis 1700000000000 +paimon -c catalog.yaml table snapshot default.my_table --all +``` + ### Shard Read Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for @@ -606,6 +630,50 @@ table.rollback_to('v3') # tag name The `rollback_to` method accepts either an `int` (snapshot ID) or a `str` (tag name) and automatically dispatches to the appropriate rollback logic. +## Maintenance + +PyPaimon supports native maintenance helpers for rewriting current table data. + +### Compact + +Use `compact` to rewrite the current visible records and commit a `COMPACT` snapshot. You can compact the whole table +or a specific partition: + +```python +table = catalog.get_table('database_name.table_name') +result = table.new_maintenance().compact({'dt': '2024-01-01'}) + +print(result.snapshot_id) +print(result.rewritten_record_count) +print(result.rewritten_file_count) +``` + +You can also trigger compaction from the CLI: + +```shell +paimon -c catalog.yaml table compact default.my_table --partition dt=2024-01-01 +``` + +### Rescale Bucket + +Use `rescale_bucket` to rewrite data with a new bucket number. For partitioned tables, specify the target partition: + +```python +table = catalog.get_table('database_name.table_name') +result = table.new_maintenance().rescale_bucket( + bucket_num=4, + partition={'dt': '2024-01-01'} +) +``` + +The same operation is available from the CLI: + +```shell +paimon -c catalog.yaml table rescale default.my_table --bucket-num 4 --partition dt=2024-01-01 +``` + +Native Python maintenance does not yet support row tracking, data evolution, or deletion vectors. + ## Streaming Read Streaming reads allow you to continuously read new data as it arrives in a Paimon table. This is useful for building diff --git a/paimon-python/pypaimon/cli/cli_table.py b/paimon-python/pypaimon/cli/cli_table.py index 0f829f45531a..64a04274fe7e 100644 --- a/paimon-python/pypaimon/cli/cli_table.py +++ b/paimon-python/pypaimon/cli/cli_table.py @@ -25,6 +25,54 @@ from pypaimon.common.json_util import JSON +def _parse_partition_spec(partition_spec): + if not partition_spec: + return None + + partition = {} + for item in partition_spec.split(','): + if '=' not in item: + raise ValueError( + "Invalid partition spec '{}'. Expected format: key=value,key2=value2".format(partition_spec) + ) + key, value = item.split('=', 1) + key = key.strip() + if not key: + raise ValueError( + "Invalid partition spec '{}'. Partition key cannot be empty.".format(partition_spec) + ) + partition[key] = value.strip() + return partition + + +def _load_file_store_table(args): + from pypaimon.cli.cli import load_catalog_config, create_catalog + from pypaimon.table.file_store_table import FileStoreTable + + config = load_catalog_config(args.config) + catalog = create_catalog(config) + + table_identifier = args.table + parts = table_identifier.split('.') + if len(parts) != 2: + print(f"Error: Invalid table identifier '{table_identifier}'. " + f"Expected format: 'database.table'", file=sys.stderr) + sys.exit(1) + + try: + table = catalog.get_table(table_identifier) + except Exception as e: + print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + if not isinstance(table, FileStoreTable): + print(f"Error: Table '{table_identifier}' is not a FileStoreTable. " + f"This operation is not supported for this table type.", file=sys.stderr) + sys.exit(1) + + return table_identifier, table + + def cmd_table_read(args): """ Execute the 'table read' command. @@ -306,10 +354,21 @@ def cmd_table_snapshot(args): f"Snapshot operation is not supported for this table type.", file=sys.stderr) sys.exit(1) - # Get latest snapshot + # Get snapshot try: snapshot_manager = table.snapshot_manager() - snapshot = snapshot_manager.get_latest_snapshot() + if getattr(args, 'all', False): + snapshots = snapshot_manager.list_snapshots() + print(JSON.to_json(snapshots, indent=2)) + return + if getattr(args, 'snapshot_id', None) is not None: + snapshot = snapshot_manager.get_snapshot_by_id(args.snapshot_id) + elif getattr(args, 'earliest', False): + snapshot = snapshot_manager.try_get_earliest_snapshot() + elif getattr(args, 'time_millis', None) is not None: + snapshot = snapshot_manager.earlier_or_equal_time_mills(args.time_millis) + else: + snapshot = snapshot_manager.get_latest_snapshot() if snapshot is None: print(f"Error: No snapshot found for table '{table_identifier}'.", file=sys.stderr) @@ -323,6 +382,30 @@ def cmd_table_snapshot(args): sys.exit(1) +def cmd_table_compact(args): + """Execute the 'table compact' command.""" + table_identifier, table = _load_file_store_table(args) + try: + partition = _parse_partition_spec(getattr(args, 'partition', None)) + result = table.new_maintenance().compact(partition) + print(JSON.to_json(result, indent=2)) + except Exception as e: + print(f"Error: Failed to compact table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + +def cmd_table_rescale(args): + """Execute the 'table rescale' command.""" + table_identifier, table = _load_file_store_table(args) + try: + partition = _parse_partition_spec(getattr(args, 'partition', None)) + result = table.new_maintenance().rescale_bucket(args.bucket_num, partition) + print(JSON.to_json(result, indent=2)) + except Exception as e: + print(f"Error: Failed to rescale table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + def cmd_table_create(args): """ Execute the 'table create' command. @@ -776,12 +859,70 @@ def add_table_subcommands(table_parser): get_parser.set_defaults(func=cmd_table_get) # table snapshot command - snapshot_parser = table_subparsers.add_parser('snapshot', help='Get the latest snapshot of a table') + snapshot_parser = table_subparsers.add_parser('snapshot', help='Get snapshots of a table') snapshot_parser.add_argument( 'table', help='Table identifier in format: database.table' ) + snapshot_group = snapshot_parser.add_mutually_exclusive_group() + snapshot_group.add_argument( + '--id', + dest='snapshot_id', + type=int, + default=None, + help='Get a snapshot by ID' + ) + snapshot_group.add_argument( + '--earliest', + action='store_true', + help='Get the earliest available snapshot' + ) + snapshot_group.add_argument( + '--time-millis', + type=int, + default=None, + help='Get the latest snapshot whose commit time is less than or equal to this epoch millis' + ) + snapshot_group.add_argument( + '--all', + action='store_true', + help='List all available snapshots' + ) snapshot_parser.set_defaults(func=cmd_table_snapshot) + + # table compact command + compact_parser = table_subparsers.add_parser('compact', help='Compact a table or partition') + compact_parser.add_argument( + 'table', + help='Table identifier in format: database.table' + ) + compact_parser.add_argument( + '--partition', + type=str, + default=None, + help='Partition spec to compact, e.g. "dt=2024-01-01,hh=08"' + ) + compact_parser.set_defaults(func=cmd_table_compact) + + # table rescale command + rescale_parser = table_subparsers.add_parser('rescale', help='Rewrite data with a new bucket number') + rescale_parser.add_argument( + 'table', + help='Table identifier in format: database.table' + ) + rescale_parser.add_argument( + '--bucket-num', + type=int, + required=True, + help='Target bucket number' + ) + rescale_parser.add_argument( + '--partition', + type=str, + default=None, + help='Partition spec to rescale, e.g. "dt=2024-01-01,hh=08"' + ) + rescale_parser.set_defaults(func=cmd_table_rescale) # table create command create_parser = table_subparsers.add_parser('create', help='Create a new table') diff --git a/paimon-python/pypaimon/common/json_util.py b/paimon-python/pypaimon/common/json_util.py index fb257cfe3cae..3b625364911f 100644 --- a/paimon-python/pypaimon/common/json_util.py +++ b/paimon-python/pypaimon/common/json_util.py @@ -48,6 +48,14 @@ def from_json(json_str: str, target_class: Type[T]) -> T: @staticmethod def __to_dict(obj: Any) -> Dict[str, Any]: """Convert to dictionary with custom field names""" + if isinstance(obj, list): + return [ + item.to_dict() if hasattr(item, "to_dict") + else JSON.__to_dict(item) if is_dataclass(item) + else item + for item in obj + ] + # If object has custom to_dict method, use it if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")): return obj.to_dict() diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py b/paimon-python/pypaimon/snapshot/snapshot_manager.py index e0b4b08bc8e9..fe6af7a61c90 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_manager.py +++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py @@ -216,6 +216,28 @@ def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: snapshot_content = self.file_io.read_file_utf8(snapshot_file) return JSON.from_json(snapshot_content, Snapshot) + def list_snapshots(self) -> List[Snapshot]: + """List available snapshots ordered by snapshot ID.""" + import re + + if not self.file_io.exists(self.snapshot_dir): + return [] + + snapshot_pattern = re.compile(r'^snapshot-(\d+)$') + snapshot_ids = [] + for file_info in self.file_io.list_status(self.snapshot_dir): + filename = file_info.path.split('/')[-1] + match = snapshot_pattern.match(filename) + if match: + snapshot_ids.append(int(match.group(1))) + + snapshots = [] + for snapshot_id in sorted(snapshot_ids): + snapshot = self.get_snapshot_by_id(snapshot_id) + if snapshot is not None: + snapshots.append(snapshot) + return snapshots + def get_snapshots_batch( self, snapshot_ids: List[int], max_workers: int = 4 ) -> Dict[int, Optional[Snapshot]]: diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 4dadb234db2a..bf34c8d6a9c4 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -377,6 +377,11 @@ def new_vector_search_builder(self) -> 'VectorSearchBuilder': from pypaimon.table.source.vector_search_builder import VectorSearchBuilderImpl return VectorSearchBuilderImpl(self) + def new_maintenance(self): + """Create a table maintenance helper.""" + from pypaimon.table.maintenance import TableMaintenance + return TableMaintenance(self) + def create_row_key_extractor(self) -> RowKeyExtractor: bucket_mode = self.bucket_mode() if bucket_mode == BucketMode.HASH_FIXED: @@ -390,8 +395,9 @@ def create_row_key_extractor(self) -> RowKeyExtractor: else: raise ValueError(f"Unsupported bucket mode: {bucket_mode}") - def copy(self, options: dict) -> 'FileStoreTable': - if CoreOptions.BUCKET.key() in options and int(options.get(CoreOptions.BUCKET.key())) != self.options.bucket(): + def copy(self, options: dict, allow_bucket_change: bool = False) -> 'FileStoreTable': + if (not allow_bucket_change and CoreOptions.BUCKET.key() in options + and int(options.get(CoreOptions.BUCKET.key())) != self.options.bucket()): raise ValueError("Cannot change bucket number") new_options = CoreOptions.copy(self.options).options.to_map() for k, v in options.items(): diff --git a/paimon-python/pypaimon/table/maintenance.py b/paimon-python/pypaimon/table/maintenance.py new file mode 100644 index 000000000000..e4d8754d3bd7 --- /dev/null +++ b/paimon-python/pypaimon/table/maintenance.py @@ -0,0 +1,185 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from dataclasses import dataclass +from typing import Dict, Optional + +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.predicate_builder import PredicateBuilder + + +@dataclass +class MaintenanceResult: + """Summary for a table maintenance operation.""" + + snapshot_id: Optional[int] + rewritten_record_count: int + rewritten_file_count: int + + +class TableMaintenance: + """Maintenance operations for a file store table.""" + + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + + def compact(self, partition: Optional[Dict[str, str]] = None) -> MaintenanceResult: + """Rewrite current table data and commit it as a COMPACT snapshot. + + This native Python compaction rewrites current visible records for the whole table + or one partition. Row tracking, data evolution, and deletion vectors need dedicated + metadata handling and are intentionally rejected here. + """ + self._check_supported("compact") + before_snapshot = self.table.snapshot_manager().get_latest_snapshot() + if before_snapshot is None: + raise ValueError("Table has no snapshot. No need to compact.") + + commit_messages, rewritten_record_count, rewritten_file_count = self._rewrite( + self.table, + partition, + ) + if not commit_messages: + return MaintenanceResult(before_snapshot.id, 0, 0) + + table_commit = self.table.new_batch_write_builder().new_commit() + try: + table_commit.compact(partition, commit_messages) + except Exception: + table_commit.abort(commit_messages) + raise + finally: + table_commit.close() + + latest_snapshot = self.table.snapshot_manager().get_latest_snapshot() + return MaintenanceResult( + latest_snapshot.id if latest_snapshot else None, + rewritten_record_count, + rewritten_file_count + ) + + def rescale_bucket( + self, + bucket_num: int, + partition: Optional[Dict[str, str]] = None + ) -> MaintenanceResult: + """Rewrite data using a different bucket number and commit with overwrite.""" + if bucket_num <= 0: + raise ValueError("bucket_num must be greater than 0.") + self._check_supported("rescale_bucket") + if self.table.partition_keys and partition is None: + raise ValueError("partition must be specified for partitioned tables.") + + before_snapshot = self.table.snapshot_manager().get_latest_snapshot() + if before_snapshot is None: + raise ValueError("Table has no snapshot. No need to rescale.") + + rescaled_table = self.table.copy( + {CoreOptions.BUCKET.key(): str(bucket_num)}, + allow_bucket_change=True + ) + commit_messages, rewritten_record_count, rewritten_file_count = self._rewrite( + rescaled_table, + partition, + ) + if not commit_messages: + return MaintenanceResult(before_snapshot.id, 0, 0) + + table_commit = rescaled_table.new_batch_write_builder().overwrite(partition).new_commit() + try: + table_commit.commit(commit_messages) + except Exception: + table_commit.abort(commit_messages) + raise + finally: + table_commit.close() + + latest_snapshot = self.table.snapshot_manager().get_latest_snapshot() + return MaintenanceResult( + latest_snapshot.id if latest_snapshot else None, + rewritten_record_count, + rewritten_file_count + ) + + def _rewrite(self, write_table, partition): + read_builder = self.table.new_read_builder() + predicate = self._partition_predicate(partition) + if predicate is not None: + read_builder = read_builder.with_filter(predicate) + + scan = read_builder.new_scan() + splits = scan.plan().splits() + if not splits: + return [], 0, 0 + + table_read = read_builder.new_read() + table_write = write_table.new_batch_write_builder().new_write() + + rewritten_record_count = 0 + try: + for split in splits: + data = table_read.to_arrow([split]) + if data is None or data.num_rows == 0: + continue + rewritten_record_count += data.num_rows + table_write.write_arrow(data) + commit_messages = table_write.prepare_commit() + except Exception: + table_write.close() + raise + + rewritten_file_count = sum(len(msg.new_files) for msg in commit_messages) + table_write.close() + return commit_messages, rewritten_record_count, rewritten_file_count + + def _partition_predicate(self, partition): + if not partition: + return None + + partition_keys = set(self.table.partition_keys) + for key in partition: + if key not in partition_keys: + raise ValueError( + "Partition spec key '{}' is not a partition column. Partition keys are: {}.".format( + key, + list(self.table.partition_keys) + ) + ) + + predicate_builder = PredicateBuilder(self.table.fields) + predicates = [ + predicate_builder.equal(key, value) + for key, value in partition.items() + ] + return predicate_builder.and_predicates(predicates) + + def _check_supported(self, operation: str) -> None: + if self.table.options.row_tracking_enabled(): + raise NotImplementedError( + "{} does not support row-tracking tables yet.".format(operation) + ) + if self.table.options.data_evolution_enabled(): + raise NotImplementedError( + "{} does not support data-evolution tables yet.".format(operation) + ) + if self.table.options.deletion_vectors_enabled(): + raise NotImplementedError( + "{} does not support deletion-vector tables yet.".format(operation) + ) diff --git a/paimon-python/pypaimon/tests/cli_table_test.py b/paimon-python/pypaimon/tests/cli_table_test.py index 2a2f13646c78..d63f6a2e4019 100644 --- a/paimon-python/pypaimon/tests/cli_table_test.py +++ b/paimon-python/pypaimon/tests/cli_table_test.py @@ -238,6 +238,40 @@ def test_cli_table_snapshot_basic(self): self.assertGreater(snapshot_json['id'], 0) self.assertGreater(snapshot_json['totalRecordCount'], 0) + def test_cli_table_snapshot_by_id(self): + """Test table snapshot by ID via CLI.""" + latest_snapshot = self.catalog.get_table('test_db.users').snapshot_manager().get_latest_snapshot() + + with patch('sys.argv', + ['paimon', '-c', self.config_file, 'table', 'snapshot', + 'test_db.users', '--id', str(latest_snapshot.id)]): + with patch('sys.stdout', new_callable=StringIO) as mock_stdout: + try: + main() + except SystemExit: + pass + + import json + snapshot_json = json.loads(mock_stdout.getvalue()) + self.assertEqual(latest_snapshot.id, snapshot_json['id']) + + def test_cli_table_snapshot_all(self): + """Test table snapshot list via CLI.""" + with patch('sys.argv', + ['paimon', '-c', self.config_file, 'table', 'snapshot', + 'test_db.users', '--all']): + with patch('sys.stdout', new_callable=StringIO) as mock_stdout: + try: + main() + except SystemExit: + pass + + import json + snapshots_json = json.loads(mock_stdout.getvalue()) + self.assertIsInstance(snapshots_json, list) + self.assertGreaterEqual(len(snapshots_json), 1) + self.assertIn('id', snapshots_json[0]) + def test_cli_table_create_with_json_schema(self): """Test table create with JSON schema file.""" import json diff --git a/paimon-python/pypaimon/tests/table/maintenance_test.py b/paimon-python/pypaimon/tests/table/maintenance_test.py new file mode 100644 index 000000000000..c4eb02bf27c1 --- /dev/null +++ b/paimon-python/pypaimon/tests/table/maintenance_test.py @@ -0,0 +1,115 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class MaintenanceTest(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.warehouse = os.path.join(self.tempdir, "warehouse") + self.catalog = CatalogFactory.create({"warehouse": self.warehouse}) + self.catalog.create_database("default", True) + self.pa_schema = pa.schema([ + ("id", pa.int32()), + ("name", pa.string()), + ("dt", pa.string()), + ]) + + def tearDown(self): + shutil.rmtree(self.tempdir, ignore_errors=True) + + def test_compact_rewrites_files_with_compact_snapshot(self): + schema = Schema.from_pyarrow_schema( + self.pa_schema, + partition_keys=["dt"], + options={"bucket": "-1"}, + ) + self.catalog.create_table("default.compact_t", schema, False) + table = self.catalog.get_table("default.compact_t") + + self._write(table, [1, 2], ["a", "b"], ["p1", "p1"]) + self._write(table, [3, 4], ["c", "d"], ["p1", "p1"]) + self._write(table, [5, 6], ["e", "f"], ["p1", "p1"]) + + self.assertGreater(self._file_count(table), 1) + + result = table.new_maintenance().compact({"dt": "p1"}) + + self.assertEqual(6, result.rewritten_record_count) + self.assertEqual(1, self._file_count(table)) + self.assertEqual("COMPACT", table.snapshot_manager().get_latest_snapshot().commit_kind) + self.assertEqual([1, 2, 3, 4, 5, 6], self._read_ids(table)) + + def test_rescale_bucket_rewrites_partition(self): + schema = Schema.from_pyarrow_schema( + self.pa_schema, + partition_keys=["dt"], + options={"bucket": "1", "bucket-key": "id"}, + ) + self.catalog.create_table("default.rescale_t", schema, False) + table = self.catalog.get_table("default.rescale_t") + + self._write(table, [1, 2, 3, 4, 5, 6], ["a", "b", "c", "d", "e", "f"], ["p1"] * 6) + + result = table.new_maintenance().rescale_bucket(2, {"dt": "p1"}) + + self.assertEqual(6, result.rewritten_record_count) + self.assertEqual("OVERWRITE", table.snapshot_manager().get_latest_snapshot().commit_kind) + self.assertLessEqual(max(self._buckets(table)), 1) + self.assertEqual([1, 2, 3, 4, 5, 6], self._read_ids(table)) + + def _write(self, table, ids, names, dts): + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = pa.Table.from_pydict( + { + "id": ids, + "name": names, + "dt": dts, + }, + schema=self.pa_schema + ) + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + def _file_count(self, table): + return sum(len(split.files) for split in table.new_read_builder().new_scan().plan().splits()) + + def _buckets(self, table): + return [split.bucket for split in table.new_read_builder().new_scan().plan().splits()] + + def _read_ids(self, table): + read_builder = table.new_read_builder() + result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + return result.sort_by("id").column("id").to_pylist() + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 832a39ba6887..849ecf5f6db6 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -191,6 +191,28 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c allow_rollback=False, ) + def compact(self, compact_partition, commit_messages: List[CommitMessage], commit_identifier: int): + """Commit rewritten files as a compaction snapshot.""" + if not commit_messages: + return + + partition_filter = None + if compact_partition is not None and len(compact_partition) > 0: + predicate_builder = PredicateBuilder(self.table.partition_keys_fields) + sub_predicates = [] + for key, value in compact_partition.items(): + sub_predicates.append(predicate_builder.equal(key, value)) + partition_filter = predicate_builder.and_predicates(sub_predicates) + + self._try_commit( + commit_kind="COMPACT", + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( + snapshot, partition_filter, commit_messages), + detect_conflicts=True, + allow_rollback=False, + ) + def drop_partitions(self, partitions: List[Dict[str, str]], commit_identifier: int) -> None: if not partitions: raise ValueError("Partitions list cannot be empty.") diff --git a/paimon-python/pypaimon/write/table_commit.py b/paimon-python/pypaimon/write/table_commit.py index 19918c1782e6..201d6324d812 100644 --- a/paimon-python/pypaimon/write/table_commit.py +++ b/paimon-python/pypaimon/write/table_commit.py @@ -83,6 +83,18 @@ class BatchTableCommit(TableCommit): def commit(self, commit_messages: List[CommitMessage]): self._commit(commit_messages, BATCH_COMMIT_IDENTIFIER) + def compact(self, compact_partition: Optional[Dict[str, str]], commit_messages: List[CommitMessage]): + """Commit rewritten files as a compaction snapshot.""" + self._check_committed() + non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()] + if not non_empty_messages: + return + self.file_store_commit.compact( + compact_partition=compact_partition, + commit_messages=non_empty_messages, + commit_identifier=BATCH_COMMIT_IDENTIFIER + ) + def truncate_table(self) -> None: """Truncate the entire table, deleting all data.""" self._check_committed() From 3f4764f76a906267b373276fa10fa58d9a6c9957 Mon Sep 17 00:00:00 2001 From: Colin Chen Date: Wed, 29 Apr 2026 17:24:21 +0800 Subject: [PATCH 2/2] [python] Trigger CI rerun