Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/content/pypaimon/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,30 @@ arrow_table = table_read.to_arrow(splits)
pandas_df = table_read.to_pandas(splits)
```

### Snapshot Query

You can inspect table snapshots through `SnapshotManager`:

```python
snapshot_manager = table.snapshot_manager()

latest = snapshot_manager.get_latest_snapshot()
earliest = snapshot_manager.try_get_earliest_snapshot()
snapshot_10 = snapshot_manager.get_snapshot_by_id(10)
snapshots = snapshot_manager.list_snapshots()
snapshot_at_time = snapshot_manager.earlier_or_equal_time_mills(1700000000000)
```

The CLI also supports querying snapshots:

```shell
paimon -c catalog.yaml table snapshot default.my_table
paimon -c catalog.yaml table snapshot default.my_table --id 10
paimon -c catalog.yaml table snapshot default.my_table --earliest
paimon -c catalog.yaml table snapshot default.my_table --time-millis 1700000000000
paimon -c catalog.yaml table snapshot default.my_table --all
```

### Shard Read

Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for
Expand Down Expand Up @@ -606,6 +630,50 @@ table.rollback_to('v3') # tag name
The `rollback_to` method accepts either an `int` (snapshot ID) or a `str` (tag name) and automatically dispatches
to the appropriate rollback logic.

## Maintenance

PyPaimon supports native maintenance helpers for rewriting current table data.

### Compact

Use `compact` to rewrite the current visible records and commit a `COMPACT` snapshot. You can compact the whole table
or a specific partition:

```python
table = catalog.get_table('database_name.table_name')
result = table.new_maintenance().compact({'dt': '2024-01-01'})

print(result.snapshot_id)
print(result.rewritten_record_count)
print(result.rewritten_file_count)
```

You can also trigger compaction from the CLI:

```shell
paimon -c catalog.yaml table compact default.my_table --partition dt=2024-01-01
```

### Rescale Bucket

Use `rescale_bucket` to rewrite data with a new bucket number. For partitioned tables, specify the target partition:

```python
table = catalog.get_table('database_name.table_name')
result = table.new_maintenance().rescale_bucket(
bucket_num=4,
partition={'dt': '2024-01-01'}
)
```

The same operation is available from the CLI:

```shell
paimon -c catalog.yaml table rescale default.my_table --bucket-num 4 --partition dt=2024-01-01
```

Native Python maintenance does not yet support row tracking, data evolution, or deletion vectors.

## Streaming Read

Streaming reads allow you to continuously read new data as it arrives in a Paimon table. This is useful for building
Expand Down
147 changes: 144 additions & 3 deletions paimon-python/pypaimon/cli/cli_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,54 @@
from pypaimon.common.json_util import JSON


def _parse_partition_spec(partition_spec):
if not partition_spec:
return None

partition = {}
for item in partition_spec.split(','):
if '=' not in item:
raise ValueError(
"Invalid partition spec '{}'. Expected format: key=value,key2=value2".format(partition_spec)
)
key, value = item.split('=', 1)
key = key.strip()
if not key:
raise ValueError(
"Invalid partition spec '{}'. Partition key cannot be empty.".format(partition_spec)
)
partition[key] = value.strip()
return partition


def _load_file_store_table(args):
from pypaimon.cli.cli import load_catalog_config, create_catalog
from pypaimon.table.file_store_table import FileStoreTable

config = load_catalog_config(args.config)
catalog = create_catalog(config)

table_identifier = args.table
parts = table_identifier.split('.')
if len(parts) != 2:
print(f"Error: Invalid table identifier '{table_identifier}'. "
f"Expected format: 'database.table'", file=sys.stderr)
sys.exit(1)

try:
table = catalog.get_table(table_identifier)
except Exception as e:
print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr)
sys.exit(1)

if not isinstance(table, FileStoreTable):
print(f"Error: Table '{table_identifier}' is not a FileStoreTable. "
f"This operation is not supported for this table type.", file=sys.stderr)
sys.exit(1)

return table_identifier, table


def cmd_table_read(args):
"""
Execute the 'table read' command.
Expand Down Expand Up @@ -306,10 +354,21 @@ def cmd_table_snapshot(args):
f"Snapshot operation is not supported for this table type.", file=sys.stderr)
sys.exit(1)

# Get latest snapshot
# Get snapshot
try:
snapshot_manager = table.snapshot_manager()
snapshot = snapshot_manager.get_latest_snapshot()
if getattr(args, 'all', False):
snapshots = snapshot_manager.list_snapshots()
print(JSON.to_json(snapshots, indent=2))
return
if getattr(args, 'snapshot_id', None) is not None:
snapshot = snapshot_manager.get_snapshot_by_id(args.snapshot_id)
elif getattr(args, 'earliest', False):
snapshot = snapshot_manager.try_get_earliest_snapshot()
elif getattr(args, 'time_millis', None) is not None:
snapshot = snapshot_manager.earlier_or_equal_time_mills(args.time_millis)
else:
snapshot = snapshot_manager.get_latest_snapshot()

if snapshot is None:
print(f"Error: No snapshot found for table '{table_identifier}'.", file=sys.stderr)
Expand All @@ -323,6 +382,30 @@ def cmd_table_snapshot(args):
sys.exit(1)


def cmd_table_compact(args):
"""Execute the 'table compact' command."""
table_identifier, table = _load_file_store_table(args)
try:
partition = _parse_partition_spec(getattr(args, 'partition', None))
result = table.new_maintenance().compact(partition)
print(JSON.to_json(result, indent=2))
except Exception as e:
print(f"Error: Failed to compact table '{table_identifier}': {e}", file=sys.stderr)
sys.exit(1)


def cmd_table_rescale(args):
"""Execute the 'table rescale' command."""
table_identifier, table = _load_file_store_table(args)
try:
partition = _parse_partition_spec(getattr(args, 'partition', None))
result = table.new_maintenance().rescale_bucket(args.bucket_num, partition)
print(JSON.to_json(result, indent=2))
except Exception as e:
print(f"Error: Failed to rescale table '{table_identifier}': {e}", file=sys.stderr)
sys.exit(1)


def cmd_table_create(args):
"""
Execute the 'table create' command.
Expand Down Expand Up @@ -776,12 +859,70 @@ def add_table_subcommands(table_parser):
get_parser.set_defaults(func=cmd_table_get)

# table snapshot command
snapshot_parser = table_subparsers.add_parser('snapshot', help='Get the latest snapshot of a table')
snapshot_parser = table_subparsers.add_parser('snapshot', help='Get snapshots of a table')
snapshot_parser.add_argument(
'table',
help='Table identifier in format: database.table'
)
snapshot_group = snapshot_parser.add_mutually_exclusive_group()
snapshot_group.add_argument(
'--id',
dest='snapshot_id',
type=int,
default=None,
help='Get a snapshot by ID'
)
snapshot_group.add_argument(
'--earliest',
action='store_true',
help='Get the earliest available snapshot'
)
snapshot_group.add_argument(
'--time-millis',
type=int,
default=None,
help='Get the latest snapshot whose commit time is less than or equal to this epoch millis'
)
snapshot_group.add_argument(
'--all',
action='store_true',
help='List all available snapshots'
)
snapshot_parser.set_defaults(func=cmd_table_snapshot)

# table compact command
compact_parser = table_subparsers.add_parser('compact', help='Compact a table or partition')
compact_parser.add_argument(
'table',
help='Table identifier in format: database.table'
)
compact_parser.add_argument(
'--partition',
type=str,
default=None,
help='Partition spec to compact, e.g. "dt=2024-01-01,hh=08"'
)
compact_parser.set_defaults(func=cmd_table_compact)

# table rescale command
rescale_parser = table_subparsers.add_parser('rescale', help='Rewrite data with a new bucket number')
rescale_parser.add_argument(
'table',
help='Table identifier in format: database.table'
)
rescale_parser.add_argument(
'--bucket-num',
type=int,
required=True,
help='Target bucket number'
)
rescale_parser.add_argument(
'--partition',
type=str,
default=None,
help='Partition spec to rescale, e.g. "dt=2024-01-01,hh=08"'
)
rescale_parser.set_defaults(func=cmd_table_rescale)

# table create command
create_parser = table_subparsers.add_parser('create', help='Create a new table')
Expand Down
8 changes: 8 additions & 0 deletions paimon-python/pypaimon/common/json_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def from_json(json_str: str, target_class: Type[T]) -> T:
@staticmethod
def __to_dict(obj: Any) -> Dict[str, Any]:
"""Convert to dictionary with custom field names"""
if isinstance(obj, list):
return [
item.to_dict() if hasattr(item, "to_dict")
else JSON.__to_dict(item) if is_dataclass(item)
else item
for item in obj
]

# If object has custom to_dict method, use it
if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")):
return obj.to_dict()
Expand Down
22 changes: 22 additions & 0 deletions paimon-python/pypaimon/snapshot/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,28 @@ def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
snapshot_content = self.file_io.read_file_utf8(snapshot_file)
return JSON.from_json(snapshot_content, Snapshot)

def list_snapshots(self) -> List[Snapshot]:
"""List available snapshots ordered by snapshot ID."""
import re

if not self.file_io.exists(self.snapshot_dir):
return []

snapshot_pattern = re.compile(r'^snapshot-(\d+)$')
snapshot_ids = []
for file_info in self.file_io.list_status(self.snapshot_dir):
filename = file_info.path.split('/')[-1]
match = snapshot_pattern.match(filename)
if match:
snapshot_ids.append(int(match.group(1)))

snapshots = []
for snapshot_id in sorted(snapshot_ids):
snapshot = self.get_snapshot_by_id(snapshot_id)
if snapshot is not None:
snapshots.append(snapshot)
return snapshots

def get_snapshots_batch(
self, snapshot_ids: List[int], max_workers: int = 4
) -> Dict[int, Optional[Snapshot]]:
Expand Down
10 changes: 8 additions & 2 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ def new_vector_search_builder(self) -> 'VectorSearchBuilder':
from pypaimon.table.source.vector_search_builder import VectorSearchBuilderImpl
return VectorSearchBuilderImpl(self)

def new_maintenance(self):
"""Create a table maintenance helper."""
from pypaimon.table.maintenance import TableMaintenance
return TableMaintenance(self)

def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
Expand All @@ -390,8 +395,9 @@ def create_row_key_extractor(self) -> RowKeyExtractor:
else:
raise ValueError(f"Unsupported bucket mode: {bucket_mode}")

def copy(self, options: dict) -> 'FileStoreTable':
if CoreOptions.BUCKET.key() in options and int(options.get(CoreOptions.BUCKET.key())) != self.options.bucket():
def copy(self, options: dict, allow_bucket_change: bool = False) -> 'FileStoreTable':
if (not allow_bucket_change and CoreOptions.BUCKET.key() in options
and int(options.get(CoreOptions.BUCKET.key())) != self.options.bucket()):
raise ValueError("Cannot change bucket number")
new_options = CoreOptions.copy(self.options).options.to_map()
for k, v in options.items():
Expand Down
Loading
Loading