diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 6c14eea062..0d56685f6c 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -31,7 +31,7 @@ from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError from pyiceberg.table import TableProperties -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.utils.properties import property_as_int @@ -179,6 +179,102 @@ def files(ctx: Context, identifier: str, history: bool) -> None: output.files(catalog_table, history) +@run.command("add-files") +@click.argument("identifier") +@click.argument("file_paths", nargs=-1) +@click.option("--branch", default=None, help="Branch to add files to (default: main).") +@click.option( + "--no-check-duplicates", + is_flag=True, + help="Skip check for files already referenced by the table.", +) +@click.option( + "--property", + "-p", + "properties", + multiple=True, + help="Snapshot property key=value (repeatable).", +) +@click.pass_context +@catch_exception() +def add_files( + ctx: Context, + identifier: str, + file_paths: tuple[str, ...], + branch: str | None, + no_check_duplicates: bool, + properties: tuple[str, ...], +) -> None: + """Add one or more data files to the table by path.""" + if not file_paths: + raise click.UsageError("At least one file path is required.") + + catalog, output = _catalog_and_output(ctx) + + snapshot_properties: dict[str, str] = {} + for prop in properties: + if "=" not in prop: + raise click.UsageError(f"Property must be in key=value form, got: {prop!r}") + key, _, value = prop.partition("=") + snapshot_properties[key] = value + + file_paths_list = [] + for item in file_paths: + file_paths_list.append(item) + + table = catalog.load_table(identifier) + table.add_files( + file_paths=file_paths_list, + branch=branch or MAIN_BRANCH, + snapshot_properties=snapshot_properties, + check_duplicate_files=not no_check_duplicates, + ) + output.text(f"Added {len(file_paths)} file(s) to {identifier}") + + +@run.command("delete-files") +@click.argument("identifier") +@click.argument("file_paths", nargs=-1) +@click.option("--branch", default=None, help="Branch to delete files from (default: main).") +@click.option( + "--property", + "-p", + "properties", + multiple=True, + help="Snapshot property key=value (repeatable).", +) +@click.pass_context +@catch_exception() +def delete_files( + ctx: Context, + identifier: str, + file_paths: tuple[str, ...], + branch: str | None, + properties: tuple[str, ...], +) -> None: + """Remove one or more data files from the table by path.""" + if not file_paths: + raise click.UsageError("At least one file path is required.") + + catalog, output = _catalog_and_output(ctx) + + snapshot_properties: dict[str, str] = {} + for prop in properties: + if "=" not in prop: + raise click.UsageError(f"Property must be in key=value form, got: {prop!r}") + key, _, value = prop.partition("=") + snapshot_properties[key] = value + + table = catalog.load_table(identifier) + + file_paths_list = [] + for item in file_paths: + file_paths_list.append(item) + + table.delete_files(file_paths=file_paths_list, branch=branch or MAIN_BRANCH, snapshot_properties=snapshot_properties) + output.text(f"Deleted {len(file_paths)} file(s) from {identifier}") + + @run.command() @click.argument("identifier") @click.pass_context @@ -470,3 +566,7 @@ def _retention_properties(ref: SnapshotRef, table_properties: dict[str, str]) -> retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever" return retention_properties + + +if __name__ == "__main__": + run() diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ae5eb400d8..afa193b113 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -936,6 +936,41 @@ def add_files( for data_file in data_files: append_files.append_data_file(data_file) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table transaction by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + unique_file_paths = set(file_paths) + + if len(file_paths) != len(unique_file_paths): + raise ValueError("File paths must be unique") + + data_files = _get_data_files_from_snapshot( + table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch + ) + + missing_files = unique_file_paths - set(data_files.keys()) + if missing_files: + raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot: + for data_file in data_files.values(): + overwrite_snapshot.delete_data_file(data_file) + def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1506,6 +1541,31 @@ def add_files( branch=branch, ) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + with self.transaction() as tx: + tx.delete_files( + file_paths=file_paths, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2175,3 +2235,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths] return [f.result() for f in futures if f.result()] + + +def _get_data_files_from_snapshot( + table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH +) -> dict[str, DataFile]: + snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot() + if snapshot is None: + return {} + + result: dict[str, DataFile] = {} + for manifest in snapshot.manifests(io): + if manifest.content == ManifestContent.DATA: + for entry in manifest.fetch_manifest_entry(io, discard_deleted=True): + if entry.data_file.file_path in file_paths: + result[entry.data_file.file_path] = entry.data_file + if len(result) == len(file_paths): + return result + return result diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index a713975ec9..921fb5a70b 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -1029,3 +1029,128 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None: mock_basicConfig.assert_called_once() call_kwargs = mock_basicConfig.call_args[1] assert call_kwargs["level"] == logging.ERROR + + +def test_add_files_invokes_table_api(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + mock_table = MagicMock() + mocker.patch.object(catalog, "load_table", return_value=mock_table) + + runner = CliRunner() + result = runner.invoke( + run, + [ + "add-files", + "default.my_table", + "s3://bucket/path/file.parquet", + "--branch", + "main", + "--property", + "k1=v1", + "-p", + "k2=v2", + ], + ) + + if result.exit_code != 0: + out = (result.output or "") + (getattr(result, "stderr", "") or "") + raise AssertionError(f"exit_code={result.exit_code} output/stderr: {out!r}") + mock_table.add_files.assert_called_once() + call_kwargs = mock_table.add_files.call_args[1] + assert call_kwargs["file_paths"] == ["s3://bucket/path/file.parquet"] + assert call_kwargs["branch"] == "main" + assert call_kwargs["snapshot_properties"] == {"k1": "v1", "k2": "v2"} + assert call_kwargs["check_duplicate_files"] is True + + +def test_add_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke(run, ["add-files", "default.my_table"]) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "file path" in out.lower() or "required" in out.lower() + + +def test_add_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke( + run, + ["add-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"], + ) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "key=value" in out or "invalid_no_equals" in out + + +def test_add_files_table_does_not_exist(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + runner = CliRunner() + result = runner.invoke(run, ["add-files", "default.doesnotexist", "s3://bucket/file.parquet"]) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out) + + +def test_add_files_no_check_duplicates_flag(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + mock_table = MagicMock() + mocker.patch.object(catalog, "load_table", return_value=mock_table) + + runner = CliRunner() + result = runner.invoke( + run, + ["add-files", "default.my_table", "s3://bucket/file.parquet", "--no-check-duplicates"], + ) + + assert result.exit_code == 0 + mock_table.add_files.assert_called_once() + call_kwargs = mock_table.add_files.call_args[1] + assert call_kwargs["check_duplicate_files"] is False + + +def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + runner = CliRunner() + result = runner.invoke( + run, + ["delete-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"], + ) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "key=value" in out or "invalid_no_equals" in out + + +def test_delete_files_table_does_not_exist(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + runner = CliRunner() + result = runner.invoke(run, ["delete-files", "default.doesnotexist", "s3://bucket/file.parquet"]) + assert result.exit_code != 0 + out = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index e3b487e465..69a1f61fe0 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -15,23 +15,82 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from collections.abc import Generator -from datetime import datetime +from collections.abc import Generator, Iterator +from datetime import date, datetime import pyarrow as pa +import pyarrow.parquet as pq import pytest from pyspark.sql import SparkSession +from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual +from pyiceberg.io import FileIO from pyiceberg.manifest import ManifestEntryStatus -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Summary from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType +from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType + +# Schema and data used by delete_files tests (moved from test_add_files) +TABLE_SCHEMA_DELETE_FILES = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + +ARROW_SCHEMA_DELETE_FILES = pa.schema( + [ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ] +) + +ARROW_TABLE_DELETE_FILES = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_DELETE_FILES, +) + + +def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None: + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_schema) as writer: + writer.write_table(arrow_table) + + +def _create_table_for_delete_files( + session_catalog: Catalog, + identifier: str, + format_version: int, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA_DELETE_FILES, +) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table( + identifier=identifier, + schema=schema, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec, + ) def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: @@ -57,6 +116,12 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: session_catalog.drop_table(identifier) +@pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")]) +def format_version_fixture(request: "pytest.FixtureRequest") -> Iterator[int]: + """Fixture to run tests with different table format versions (for delete_files tests).""" + yield request.param + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: @@ -975,3 +1040,89 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho assert after_delete_snapshot is not None assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id) + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.append(ARROW_TABLE_DELETE_FILES) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1