From e63feff3a005a73e90d9aca70b206d425f9b82f3 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Sun, 1 Feb 2026 14:32:51 -0600 Subject: [PATCH 01/10] Added implementation to delete file. --- pyiceberg/table/__init__.py | 77 ++++++++++++++++++++++++++ tests/integration/test_add_files.py | 86 +++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ae5eb400d8..67a0b8c60d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -936,6 +936,40 @@ def add_files( for data_file in data_files: append_files.append_data_file(data_file) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table transaction by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + if len(file_paths) != len(set(file_paths)): + raise ValueError("File paths must be unique") + + file_paths_set = set(file_paths) + data_files = _get_data_files_from_snapshot( + table_metadata=self.table_metadata, file_paths=file_paths_set, io=self._table.io, branch=branch + ) + + missing_files = file_paths_set - set(data_files.keys()) + if missing_files: + raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot: + for data_file in data_files.values(): + overwrite_snapshot.delete_data_file(data_file) + def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1506,6 +1540,31 @@ def add_files( branch=branch, ) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + with self.transaction() as tx: + tx.delete_files( + file_paths=file_paths, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2175,3 +2234,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths] return [f.result() for f in futures if f.result()] + + +def _get_data_files_from_snapshot( + table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH +) -> dict[str, DataFile]: + snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot() + if snapshot is None: + return {} + + result: dict[str, DataFile] = {} + for manifest in snapshot.manifests(io): + if manifest.content == ManifestContent.DATA: + for entry in manifest.fetch_manifest_entry(io, discard_deleted=True): + if entry.data_file.file_path in file_paths: + result[entry.data_file.file_path] = entry.data_file + if len(result) == len(file_paths): + return result + return result diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 86ef05e5f4..549d1fdcbe 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -1040,3 +1040,89 @@ def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, form for col in branch_df.columns: assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null" + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.append(ARROW_TABLE) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1 From a3952c28ccac5a9f35658c9aa568553d02b7f2b2 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 6 Feb 2026 19:39:02 -0600 Subject: [PATCH 02/10] moved tests to test_deletes.py --- pyiceberg/table/__init__.py | 9 +- tests/integration/test_add_files.py | 86 --------------- tests/integration/test_deletes.py | 164 +++++++++++++++++++++++++++- 3 files changed, 165 insertions(+), 94 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 67a0b8c60d..afa193b113 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -954,15 +954,16 @@ def delete_files( ValueError: If file_paths contains duplicates ValueError: If any file paths are not found in the table """ - if len(file_paths) != len(set(file_paths)): + unique_file_paths = set(file_paths) + + if len(file_paths) != len(unique_file_paths): raise ValueError("File paths must be unique") - file_paths_set = set(file_paths) data_files = _get_data_files_from_snapshot( - table_metadata=self.table_metadata, file_paths=file_paths_set, io=self._table.io, branch=branch + table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch ) - missing_files = file_paths_set - set(data_files.keys()) + missing_files = unique_file_paths - set(data_files.keys()) if missing_files: raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 549d1fdcbe..86ef05e5f4 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -1040,89 +1040,3 @@ def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, form for col in branch_df.columns: assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null" - - -@pytest.mark.integration -def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_unpartitioned_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=file_paths) - assert len(tbl.scan().to_arrow()) == 5 - - tbl.delete_files(file_paths=file_paths[:2]) - - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() - - assert sum(row.deleted_data_files_count for row in rows) == 2 - - df = spark.table(identifier) - assert df.count() == 3 - - assert len(tbl.scan().to_arrow()) == 3 - - -@pytest.mark.integration -def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_nonexistent_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=file_paths) - - with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): - tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) - - -@pytest.mark.integration -def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_duplicate_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=[file_path]) - - with pytest.raises(ValueError, match="File paths must be unique"): - tbl.delete_files(file_paths=[file_path, file_path]) - - -@pytest.mark.integration -def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_branch_v{format_version}" - branch = "branch1" - - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.append(ARROW_TABLE) - assert tbl.metadata.current_snapshot_id is not None - tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() - - tbl.add_files(file_paths=file_paths, branch=branch) - branch_df = spark.table(f"{identifier}.branch_{branch}") - assert branch_df.count() == 6 - - tbl.delete_files(file_paths=file_paths[:3], branch=branch) - - branch_df = spark.table(f"{identifier}.branch_{branch}") - assert branch_df.count() == 3 - - main_df = spark.table(identifier) - assert main_df.count() == 1 diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index e3b487e465..54be0e5f73 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -15,23 +15,83 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from collections.abc import Generator -from datetime import datetime +from collections.abc import Generator, Iterator +from datetime import date, datetime import pyarrow as pa +import pyarrow.parquet as pq import pytest from pyspark.sql import SparkSession +from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual +from pyiceberg.io import FileIO from pyiceberg.manifest import ManifestEntryStatus -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Summary from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType +from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType + + +# Schema and data used by delete_files tests (moved from test_add_files) +TABLE_SCHEMA_DELETE_FILES = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + +ARROW_SCHEMA_DELETE_FILES = pa.schema( + [ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ] +) + +ARROW_TABLE_DELETE_FILES = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_DELETE_FILES, +) + + +def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None: + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_schema) as writer: + writer.write_table(arrow_table) + + +def _create_table_for_delete_files( + session_catalog: Catalog, + identifier: str, + format_version: int, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA_DELETE_FILES, +) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table( + identifier=identifier, + schema=schema, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec, + ) def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: @@ -57,6 +117,12 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: session_catalog.drop_table(identifier) +@pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")]) +def format_version_fixture(request: "pytest.FixtureRequest") -> Iterator[int]: + """Fixture to run tests with different table format versions (for delete_files tests).""" + yield request.param + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: @@ -975,3 +1041,93 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho assert after_delete_snapshot is not None assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id) + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.append(ARROW_TABLE_DELETE_FILES) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1 From 5ee0ca667a38998624090df75f863f90edb77e76 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 6 Feb 2026 22:06:22 -0600 Subject: [PATCH 03/10] Fixed lint errors. --- tests/integration/test_deletes.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 54be0e5f73..69a1f61fe0 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -36,7 +36,6 @@ from pyiceberg.transforms import IdentityTransform from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType - # Schema and data used by delete_files tests (moved from test_add_files) TABLE_SCHEMA_DELETE_FILES = Schema( NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), @@ -1044,9 +1043,7 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho @pytest.mark.integration -def test_delete_files_from_unpartitioned_table( - spark: SparkSession, session_catalog: Catalog, format_version: int -) -> None: +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: identifier = f"default.delete_files_unpartitioned_v{format_version}" tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) @@ -1104,9 +1101,7 @@ def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format @pytest.mark.integration -def test_delete_files_from_branch( - spark: SparkSession, session_catalog: Catalog, format_version: int -) -> None: +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: identifier = f"default.delete_files_branch_v{format_version}" branch = "branch1" From 2091f00b76c94a756e2f0f301181ab487c00b0f0 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 20:35:16 -0600 Subject: [PATCH 04/10] Added cli command to delete file. --- pyiceberg/cli/console.py | 59 ++++++++++++++++++++++++++++++++++++++- tests/cli/test_console.py | 33 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 6c14eea062..e416af4504 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -31,7 +31,7 @@ from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError from pyiceberg.table import TableProperties -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.utils.properties import property_as_int @@ -179,6 +179,59 @@ def files(ctx: Context, identifier: str, history: bool) -> None: output.files(catalog_table, history) +@run.command("delete-files") +@click.argument("identifier") +@click.argument("file_paths", nargs=-1) +@click.option("--branch", default=None, help="Branch to delete files from (default: main).") +@click.option( + "--property", + "-p", + "properties", + multiple=True, + help="Snapshot property key=value (repeatable).", +) +@click.pass_context +@catch_exception() +def delete_files( + ctx: Context, + identifier: str, + file_paths: tuple[str, ...], + branch: str | None, + properties: tuple[str, ...], +) -> None: + + + """ + Remove one or more data files from the table by path + """ + + """Remove one or more data files from the table by path.""" + if not file_paths: + raise click.UsageError("At least one file path is required.") + + catalog, output = _catalog_and_output(ctx) + + snapshot_properties: dict[str, str] = {} + for prop in properties: + if "=" not in prop: + raise click.UsageError(f"Property must be in key=value form, got: {prop!r}") + key, _, value = prop.partition("=") + snapshot_properties[key] = value + + table = catalog.load_table(identifier) + + file_paths_list = [] + for item in file_paths: + file_paths_list.append(item) + + table.delete_files( + file_paths=list(file_paths), + branch=branch or MAIN_BRANCH, + snapshot_properties=snapshot_properties + ) + output.text(f"Deleted {len(file_paths)} file(s) from {identifier}") + + @run.command() @click.argument("identifier") @click.pass_context @@ -470,3 +523,7 @@ def _retention_properties(ref: SnapshotRef, table_properties: dict[str, str]) -> retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever" return retention_properties + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index a713975ec9..29a23ee920 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1029,3 +1029,36 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: mock_basicConfig.assert_called_once() call_kwargs = mock_basicConfig.call_args[1] assert call_kwargs["level"] == logging.ERROR + +def test_delete_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None: + runner = CliRunner() + result = runner.invoke(run, ["delete-files", "default.my_table"]) + assert result.exit_code == 2 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "file path" in out.lower() or "At least one" in out + + +def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke( + run, + ["delete-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"], + ) + assert result.exit_code == 2 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "key=value" in out or "invalid_no_equals" in out + + +def test_delete_files_table_does_not_exist(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + runner = CliRunner() + result = runner.invoke(run, ["delete-files", "default.doesnotexist", "s3://bucket/file.parquet"]) + assert result.exit_code == 1 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out) From 911f8a5540488668e36aeea8e0249ed5afc8cdb9 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 20:40:17 -0600 Subject: [PATCH 05/10] fixed lint errors --- pyiceberg/cli/console.py | 14 ++------------ tests/cli/test_console.py | 1 + 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index e416af4504..294b3d9e31 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -199,12 +199,6 @@ def delete_files( branch: str | None, properties: tuple[str, ...], ) -> None: - - - """ - Remove one or more data files from the table by path - """ - """Remove one or more data files from the table by path.""" if not file_paths: raise click.UsageError("At least one file path is required.") @@ -224,11 +218,7 @@ def delete_files( for item in file_paths: file_paths_list.append(item) - table.delete_files( - file_paths=list(file_paths), - branch=branch or MAIN_BRANCH, - snapshot_properties=snapshot_properties - ) + table.delete_files(file_paths=list(file_paths), branch=branch or MAIN_BRANCH, snapshot_properties=snapshot_properties) output.text(f"Deleted {len(file_paths)} file(s) from {identifier}") @@ -526,4 +516,4 @@ def _retention_properties(ref: SnapshotRef, table_properties: dict[str, str]) -> if __name__ == "__main__": - run() \ No newline at end of file + run() diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 29a23ee920..13a2fdb65b 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1030,6 +1030,7 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: call_kwargs = mock_basicConfig.call_args[1] assert call_kwargs["level"] == logging.ERROR + def test_delete_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None: runner = CliRunner() result = runner.invoke(run, ["delete-files", "default.my_table"]) From 99c887826dde4706de89c26c1c419332f975fde5 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 20:48:35 -0600 Subject: [PATCH 06/10] fixed lint errors --- tests/cli/test_console.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 13a2fdb65b..4b5a3094b6 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1030,15 +1030,6 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: call_kwargs = mock_basicConfig.call_args[1] assert call_kwargs["level"] == logging.ERROR - -def test_delete_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None: - runner = CliRunner() - result = runner.invoke(run, ["delete-files", "default.my_table"]) - assert result.exit_code == 2 - out = (result.output or "") + (getattr(result, "stderr", "") or "") - assert "file path" in out.lower() or "At least one" in out - - def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: catalog.create_namespace(TEST_TABLE_NAMESPACE) catalog.create_table( From 937d16a5c969c222e1912e0618841e73e35b1a01 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 20:57:18 -0600 Subject: [PATCH 07/10] fixed subscriptable error when converting from tuple to list. --- pyiceberg/cli/console.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 294b3d9e31..5928523301 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -218,7 +218,7 @@ def delete_files( for item in file_paths: file_paths_list.append(item) - table.delete_files(file_paths=list(file_paths), branch=branch or MAIN_BRANCH, snapshot_properties=snapshot_properties) + table.delete_files(file_paths=file_paths_list, branch=branch or MAIN_BRANCH, snapshot_properties=snapshot_properties) output.text(f"Deleted {len(file_paths)} file(s) from {identifier}") From 698218a303cd052b6c75a52f99d4efee3995b5ce Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 21:00:56 -0600 Subject: [PATCH 08/10] fixed lint errors --- tests/cli/test_console.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 4b5a3094b6..306521cc70 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1030,6 +1030,7 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: call_kwargs = mock_basicConfig.call_args[1] assert call_kwargs["level"] == logging.ERROR + def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: catalog.create_namespace(TEST_TABLE_NAMESPACE) catalog.create_table( From 236449027cf5ed7a008858a401c9220a94616fc8 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Feb 2026 21:08:40 -0600 Subject: [PATCH 09/10] fix unit tests. --- tests/cli/test_console.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 306521cc70..316472b46f 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1043,7 +1043,7 @@ def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: run, ["delete-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"], ) - assert result.exit_code == 2 + assert result.exit_code != 0 out = (result.output or "") + (getattr(result, "stderr", "") or "") assert "key=value" in out or "invalid_no_equals" in out @@ -1052,6 +1052,6 @@ def test_delete_files_table_does_not_exist(catalog: InMemoryCatalog) -> None: catalog.create_namespace(TEST_TABLE_NAMESPACE) runner = CliRunner() result = runner.invoke(run, ["delete-files", "default.doesnotexist", "s3://bucket/file.parquet"]) - assert result.exit_code == 1 + assert result.exit_code != 0 out = (result.output or "") + (getattr(result, "stderr", "") or "") assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out) From 7e9e4b3d305e3083b31889a7cd5117e4778472d5 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 11 Feb 2026 11:32:57 -0600 Subject: [PATCH 10/10] Added add-files option to CLI. --- pyiceberg/cli/console.py | 53 +++++++++++++++++++++ tests/cli/test_console.py | 99 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 5928523301..0d56685f6c 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -179,6 +179,59 @@ def files(ctx: Context, identifier: str, history: bool) -> None: output.files(catalog_table, history) +@run.command("add-files") +@click.argument("identifier") +@click.argument("file_paths", nargs=-1) +@click.option("--branch", default=None, help="Branch to add files to (default: main).") +@click.option( + "--no-check-duplicates", + is_flag=True, + help="Skip check for files already referenced by the table.", +) +@click.option( + "--property", + "-p", + "properties", + multiple=True, + help="Snapshot property key=value (repeatable).", +) +@click.pass_context +@catch_exception() +def add_files( + ctx: Context, + identifier: str, + file_paths: tuple[str, ...], + branch: str | None, + no_check_duplicates: bool, + properties: tuple[str, ...], +) -> None: + """Add one or more data files to the table by path.""" + if not file_paths: + raise click.UsageError("At least one file path is required.") + + catalog, output = _catalog_and_output(ctx) + + snapshot_properties: dict[str, str] = {} + for prop in properties: + if "=" not in prop: + raise click.UsageError(f"Property must be in key=value form, got: {prop!r}") + key, _, value = prop.partition("=") + snapshot_properties[key] = value + + file_paths_list = [] + for item in file_paths: + file_paths_list.append(item) + + table = catalog.load_table(identifier) + table.add_files( + file_paths=file_paths_list, + branch=branch or MAIN_BRANCH, + snapshot_properties=snapshot_properties, + check_duplicate_files=not no_check_duplicates, + ) + output.text(f"Added {len(file_paths)} file(s) to {identifier}") + + @run.command("delete-files") @click.argument("identifier") @click.argument("file_paths", nargs=-1) diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 316472b46f..921fb5a70b 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1031,6 +1031,105 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: assert call_kwargs["level"] == logging.ERROR +def test_add_files_invokes_table_api(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + mock_table = MagicMock() + mocker.patch.object(catalog, "load_table", return_value=mock_table) + + runner = CliRunner() + result = runner.invoke( + run, + [ + "add-files", + "default.my_table", + "s3://bucket/path/file.parquet", + "--branch", + "main", + "--property", + "k1=v1", + "-p", + "k2=v2", + ], + ) + + if result.exit_code != 0: + out = (result.output or "") + (getattr(result, "stderr", "") or "") + raise AssertionError(f"exit_code={result.exit_code} output/stderr: {out!r}") + mock_table.add_files.assert_called_once() + call_kwargs = mock_table.add_files.call_args[1] + assert call_kwargs["file_paths"] == ["s3://bucket/path/file.parquet"] + assert call_kwargs["branch"] == "main" + assert call_kwargs["snapshot_properties"] == {"k1": "v1", "k2": "v2"} + assert call_kwargs["check_duplicate_files"] is True + + +def test_add_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke(run, ["add-files", "default.my_table"]) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "file path" in out.lower() or "required" in out.lower() + + +def test_add_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke( + run, + ["add-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"], + ) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "key=value" in out or "invalid_no_equals" in out + + +def test_add_files_table_does_not_exist(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + runner = CliRunner() + result = runner.invoke(run, ["add-files", "default.doesnotexist", "s3://bucket/file.parquet"]) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out) + + +def test_add_files_no_check_duplicates_flag(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + mock_table = MagicMock() + mocker.patch.object(catalog, "load_table", return_value=mock_table) + + runner = CliRunner() + result = runner.invoke( + run, + ["add-files", "default.my_table", "s3://bucket/file.parquet", "--no-check-duplicates"], + ) + + assert result.exit_code == 0 + mock_table.add_files.assert_called_once() + call_kwargs = mock_table.add_files.call_args[1] + assert call_kwargs["check_duplicate_files"] is False + + def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: catalog.create_namespace(TEST_TABLE_NAMESPACE) catalog.create_table(