diff --git a/docs/docs/pypaimon/cli.md b/docs/docs/pypaimon/cli.md index df5b7e0feed8..5b192b1daf51 100644 --- a/docs/docs/pypaimon/cli.md +++ b/docs/docs/pypaimon/cli.md @@ -589,6 +589,66 @@ paimon table alter mydb.users alter-column -n age -t BIGINT -c 'User age in year paimon table alter mydb.users update-comment -c "Updated user information table" ``` +## Tag Commands + +Manage tags (named snapshots) on a table. Tags are useful for time travel and pinning a snapshot for later access. + +```shell +paimon tag mydb.users ... +``` + +#### Tag Create + +```shell +# Tag the latest snapshot +paimon tag create mydb.users v1 + +# Tag a specific snapshot +paimon tag create mydb.users v1 --snapshot-id 3 + +# Do not error if the tag already exists +paimon tag create mydb.users v1 --ignore-if-exists +``` + +Options: +- `--snapshot-id, -s`: Snapshot id to tag (default: the latest snapshot) +- `--ignore-if-exists, -i`: Do not raise an error if the tag already exists + +#### Tag List + +```shell +# List all tags +paimon tag list mydb.users + +# Only tags with a name prefix +paimon tag list mydb.users --prefix prod_ + +# JSON output +paimon tag list mydb.users --format json +``` + +Options: +- `--prefix, -p`: Only list tags whose name starts with this prefix +- `--format, -f`: Output format, `table` (default) or `json` + +#### Tag Get + +```shell +paimon tag get mydb.users v1 + +# JSON output +paimon tag get mydb.users v1 --format json +``` + +Options: +- `--format, -f`: Output format, `table` (default) or `json` + +#### Tag Delete + +```shell +paimon tag delete mydb.users v1 +``` + ## Database Commands ### DB Get diff --git a/paimon-python/pypaimon/cli/cli.py b/paimon-python/pypaimon/cli/cli.py index 66a1d37b9d3f..b7c21006f259 100644 --- a/paimon-python/pypaimon/cli/cli.py +++ b/paimon-python/pypaimon/cli/cli.py @@ -129,6 +129,10 @@ def main(): from pypaimon.cli.cli_branch import add_branch_subcommands add_branch_subcommands(subparsers) + # Tag commands + from pypaimon.cli.cli_tag import add_tag_subcommands + add_tag_subcommands(subparsers) + args = parser.parse_args() if args.command is None: diff --git a/paimon-python/pypaimon/cli/cli_tag.py b/paimon-python/pypaimon/cli/cli_tag.py new file mode 100644 index 000000000000..eb513ea606fe --- /dev/null +++ b/paimon-python/pypaimon/cli/cli_tag.py @@ -0,0 +1,214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tag commands for the Paimon CLI. + +Adds the top-level ``tag {create,list,delete,get}`` subcommands (alongside +``table`` / ``db`` / ``catalog``). All operations go through the Catalog +layer so they get typed exceptions and work for both filesystem and REST +catalogs. +""" + +import json +import sys + +from pypaimon.catalog.catalog_exception import (TableNotExistException, + TagAlreadyExistException, + TagNotExistException) +from pypaimon.common.json_util import JSON + + +def _open_catalog(args): + """Load config, build the catalog and validate the ``database.table`` id. + + Returns ``(catalog, identifier)``. On any failure, prints to stderr and + exits with a non-zero status (matching the other table commands). + """ + from pypaimon.cli.cli import create_catalog, load_catalog_config + + identifier = args.table + if len(identifier.split('.')) != 2: + print("Error: Invalid table identifier '{}'. Expected format: " + "'database.table'".format(identifier), file=sys.stderr) + sys.exit(1) + try: + catalog = create_catalog(load_catalog_config(args.config)) + except Exception as e: + print("Error: {}".format(e), file=sys.stderr) + sys.exit(1) + return catalog, identifier + + +def cmd_tag_create(args): + """Execute ``tag create``.""" + catalog, identifier = _open_catalog(args) + try: + catalog.create_tag( + identifier, + args.tag_name, + snapshot_id=args.snapshot_id, + ignore_if_exists=args.ignore_if_exists, + ) + except TableNotExistException: + print("Error: Table '{}' does not exist.".format(identifier), + file=sys.stderr) + sys.exit(1) + except TagAlreadyExistException: + print("Error: Tag '{}' already exists.".format(args.tag_name), + file=sys.stderr) + sys.exit(1) + except Exception as e: + print("Error: Failed to create tag: {}".format(e), file=sys.stderr) + sys.exit(1) + print("Tag '{}' created on table '{}'.".format(args.tag_name, identifier)) + + +def cmd_tag_delete(args): + """Execute ``tag delete``.""" + catalog, identifier = _open_catalog(args) + try: + catalog.delete_tag(identifier, args.tag_name) + except TableNotExistException: + print("Error: Table '{}' does not exist.".format(identifier), + file=sys.stderr) + sys.exit(1) + except TagNotExistException: + print("Error: Tag '{}' does not exist.".format(args.tag_name), + file=sys.stderr) + sys.exit(1) + except Exception as e: + print("Error: Failed to delete tag: {}".format(e), file=sys.stderr) + sys.exit(1) + print("Tag '{}' deleted from table '{}'.".format(args.tag_name, identifier)) + + +def cmd_tag_list(args): + """Execute ``tag list``.""" + catalog, identifier = _open_catalog(args) + try: + paged = catalog.list_tags_paged(identifier, tag_name_prefix=args.prefix) + tags = paged.elements + except TableNotExistException: + print("Error: Table '{}' does not exist.".format(identifier), + file=sys.stderr) + sys.exit(1) + except Exception as e: + print("Error: Failed to list tags: {}".format(e), file=sys.stderr) + sys.exit(1) + + # The CLI does not expose paging; warn (rather than silently truncate) if + # the catalog returned a partial page. + if paged.next_page_token is not None: + print("Note: tag list may be truncated; more tags are available.", + file=sys.stderr) + + if args.format == 'json': + print(json.dumps(tags, ensure_ascii=False)) + elif not tags: + print("No tags found.") + else: + for tag in tags: + print(tag) + + +def cmd_tag_get(args): + """Execute ``tag get``.""" + catalog, identifier = _open_catalog(args) + try: + response = catalog.get_tag(identifier, args.tag_name) + except TableNotExistException: + print("Error: Table '{}' does not exist.".format(identifier), + file=sys.stderr) + sys.exit(1) + except TagNotExistException: + print("Error: Tag '{}' does not exist.".format(args.tag_name), + file=sys.stderr) + sys.exit(1) + except Exception as e: + print("Error: Failed to get tag: {}".format(e), file=sys.stderr) + sys.exit(1) + + if args.format == 'json': + print(JSON.to_json(response, indent=2)) + return + + print("Tag: {}".format(response.tag_name)) + snapshot = response.snapshot + if snapshot is not None: + print(" Snapshot ID: {}".format(snapshot.id)) + print(" Schema ID: {}".format(snapshot.schema_id)) + print(" Record Count: {}".format(snapshot.total_record_count)) + # create_time / time_retained are surfaced only when present (populated for + # tags created with a retention). + if response.tag_create_time is not None: + print(" Create Time: {}".format(response.tag_create_time)) + if response.tag_time_retained is not None: + print(" Time Retained: {}".format(response.tag_time_retained)) + + +def add_tag_subcommands(subparsers): + """Register the top-level ``tag `` subcommands.""" + tag_parser = subparsers.add_parser( + 'tag', help='Tag operations on a table') + tag_subparsers = tag_parser.add_subparsers( + dest='tag_command', help='Tag commands') + + # tag create + create_parser = tag_subparsers.add_parser( + 'create', help='Create a tag on a table') + create_parser.add_argument( + 'table', help='Table identifier in format: database.table') + create_parser.add_argument('tag_name', help='Name of the tag to create') + create_parser.add_argument( + '--snapshot-id', '-s', type=int, default=None, + help='Snapshot id to tag (default: the latest snapshot)') + create_parser.add_argument( + '--ignore-if-exists', '-i', action='store_true', + help='Do not error if the tag already exists') + create_parser.set_defaults(func=cmd_tag_create) + + # tag list + list_parser = tag_subparsers.add_parser( + 'list', help='List tags of a table') + list_parser.add_argument( + 'table', help='Table identifier in format: database.table') + list_parser.add_argument( + '--prefix', '-p', default=None, + help='Only list tags whose name starts with this prefix') + list_parser.add_argument( + '--format', '-f', choices=['table', 'json'], default='table', + help='Output format: table (default) or json') + list_parser.set_defaults(func=cmd_tag_list) + + # tag get + get_parser = tag_subparsers.add_parser( + 'get', help='Show details of a tag') + get_parser.add_argument( + 'table', help='Table identifier in format: database.table') + get_parser.add_argument('tag_name', help='Name of the tag') + get_parser.add_argument( + '--format', '-f', choices=['table', 'json'], default='table', + help='Output format: table (default) or json') + get_parser.set_defaults(func=cmd_tag_get) + + # tag delete + delete_parser = tag_subparsers.add_parser( + 'delete', help='Delete a tag from a table') + delete_parser.add_argument( + 'table', help='Table identifier in format: database.table') + delete_parser.add_argument('tag_name', help='Name of the tag to delete') + delete_parser.set_defaults(func=cmd_tag_delete) diff --git a/paimon-python/pypaimon/tests/cli_tag_test.py b/paimon-python/pypaimon/tests/cli_tag_test.py new file mode 100644 index 000000000000..9e3b75b9d17b --- /dev/null +++ b/paimon-python/pypaimon/tests/cli_tag_test.py @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for the top-level ``tag`` CLI subcommands.""" + +import json +import os +import shutil +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.cli.cli import main + + +class CliTagTest(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp(prefix="cli_tag_") + self.warehouse = os.path.join(self.tempdir, 'warehouse') + self.catalog = CatalogFactory.create({'warehouse': self.warehouse}) + self.catalog.create_database('db', True) + + pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + self.catalog.create_table( + 'db.t', Schema.from_pyarrow_schema(pa_schema), False) + # Two commits so snapshot ids 1 and 2 both exist. + table = self.catalog.get_table('db.t') + for rows in ([{'id': 1, 'name': 'a'}], [{'id': 2, 'name': 'b'}]): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + c.commit(w.prepare_commit()) + w.close() + c.close() + + self.config_file = os.path.join(self.tempdir, 'paimon.yaml') + with open(self.config_file, 'w') as f: + f.write("metastore: filesystem\nwarehouse: {}\n".format( + self.warehouse)) + + def tearDown(self): + shutil.rmtree(self.tempdir, ignore_errors=True) + + def _run(self, *argv): + """Run the CLI; return (stdout, stderr, exit_code).""" + out, err = StringIO(), StringIO() + code = 0 + full = ['paimon', '-c', self.config_file] + list(argv) + with patch('sys.argv', full): + with patch('sys.stdout', out), patch('sys.stderr', err): + try: + main() + except SystemExit as e: + code = 0 if e.code is None else ( + e.code if isinstance(e.code, int) else 1) + return out.getvalue(), err.getvalue(), code + + # -- create + list ------------------------------------------------------- + + def test_create_then_list(self): + out, _, code = self._run('tag', 'create', 'db.t', 'v1') + self.assertEqual(0, code) + self.assertIn("created", out) + + out, _, code = self._run('tag', 'list', 'db.t') + self.assertEqual(0, code) + self.assertIn("v1", out) + + def test_list_empty(self): + out, _, code = self._run('tag', 'list', 'db.t') + self.assertEqual(0, code) + self.assertIn("No tags found.", out) + + def test_list_json_empty(self): + out, _, code = self._run( + 'tag', 'list', 'db.t', '--format', 'json') + self.assertEqual(0, code) + self.assertEqual([], json.loads(out)) + + def test_list_json(self): + self._run('tag', 'create', 'db.t', 'v1') + self._run('tag', 'create', 'db.t', 'v2') + out, _, code = self._run('tag', 'list', 'db.t', '--format', 'json') + self.assertEqual(0, code) + self.assertEqual({"v1", "v2"}, set(json.loads(out))) + + def test_list_prefix(self): + self._run('tag', 'create', 'db.t', 'prod_v1') + self._run('tag', 'create', 'db.t', 'dev_v1') + out, _, code = self._run( + 'tag', 'list', 'db.t', '--prefix', 'prod_', '-f', 'json') + self.assertEqual(0, code) + self.assertEqual(["prod_v1"], json.loads(out)) + + # -- create options ------------------------------------------------------ + + def test_create_with_snapshot_id(self): + out, _, code = self._run( + 'tag', 'create', 'db.t', 'v1', '--snapshot-id', '1') + self.assertEqual(0, code) + out, _, _ = self._run('tag', 'get', 'db.t', 'v1') + self.assertIn("Snapshot ID: 1", out) + + def test_create_duplicate_raises(self): + self._run('tag', 'create', 'db.t', 'v1') + out, err, code = self._run('tag', 'create', 'db.t', 'v1') + self.assertEqual(1, code) + self.assertIn("already exists", err) + + def test_create_duplicate_ignore_if_exists(self): + self._run('tag', 'create', 'db.t', 'v1') + _, _, code = self._run( + 'tag', 'create', 'db.t', 'v1', '--ignore-if-exists') + self.assertEqual(0, code) + + # -- get ----------------------------------------------------------------- + + def test_get_table_format(self): + self._run('tag', 'create', 'db.t', 'v1', '--snapshot-id', '2') + out, _, code = self._run('tag', 'get', 'db.t', 'v1') + self.assertEqual(0, code) + self.assertIn("Tag: v1", out) + self.assertIn("Snapshot ID: 2", out) + + def test_get_json_format(self): + self._run('tag', 'create', 'db.t', 'v1') + out, _, code = self._run( + 'tag', 'get', 'db.t', 'v1', '--format', 'json') + self.assertEqual(0, code) + parsed = json.loads(out) + self.assertEqual("v1", parsed["tagName"]) + + def test_get_not_exists(self): + _, err, code = self._run('tag', 'get', 'db.t', 'absent') + self.assertEqual(1, code) + self.assertIn("does not exist", err) + + # -- delete -------------------------------------------------------------- + + def test_delete(self): + self._run('tag', 'create', 'db.t', 'v1') + out, _, code = self._run('tag', 'delete', 'db.t', 'v1') + self.assertEqual(0, code) + self.assertIn("deleted", out) + out, _, _ = self._run('tag', 'list', 'db.t') + self.assertNotIn("v1", out) + + def test_delete_not_exists(self): + _, err, code = self._run('tag', 'delete', 'db.t', 'absent') + self.assertEqual(1, code) + self.assertIn("does not exist", err) + + # -- bad input ----------------------------------------------------------- + + def test_invalid_identifier(self): + _, err, code = self._run('tag', 'create', 'nodot', 'v1') + self.assertEqual(1, code) + self.assertIn("Invalid table identifier", err) + + +if __name__ == "__main__": + unittest.main()