Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.utils.properties import property_as_int


Expand Down Expand Up @@ -179,6 +179,102 @@ def files(ctx: Context, identifier: str, history: bool) -> None:
output.files(catalog_table, history)


@run.command("add-files")
@click.argument("identifier")
@click.argument("file_paths", nargs=-1)
@click.option("--branch", default=None, help="Branch to add files to (default: main).")
@click.option(
"--no-check-duplicates",
is_flag=True,
help="Skip check for files already referenced by the table.",
)
@click.option(
"--property",
"-p",
"properties",
multiple=True,
help="Snapshot property key=value (repeatable).",
)
@click.pass_context
@catch_exception()
def add_files(
ctx: Context,
identifier: str,
file_paths: tuple[str, ...],
branch: str | None,
no_check_duplicates: bool,
properties: tuple[str, ...],
) -> None:
"""Add one or more data files to the table by path."""
if not file_paths:
raise click.UsageError("At least one file path is required.")

catalog, output = _catalog_and_output(ctx)

snapshot_properties: dict[str, str] = {}
for prop in properties:
if "=" not in prop:
raise click.UsageError(f"Property must be in key=value form, got: {prop!r}")
key, _, value = prop.partition("=")
snapshot_properties[key] = value

file_paths_list = []
for item in file_paths:
file_paths_list.append(item)

table = catalog.load_table(identifier)
table.add_files(
file_paths=file_paths_list,
branch=branch or MAIN_BRANCH,
snapshot_properties=snapshot_properties,
check_duplicate_files=not no_check_duplicates,
)
output.text(f"Added {len(file_paths)} file(s) to {identifier}")


@run.command("delete-files")
@click.argument("identifier")
@click.argument("file_paths", nargs=-1)
@click.option("--branch", default=None, help="Branch to delete files from (default: main).")
@click.option(
"--property",
"-p",
"properties",
multiple=True,
help="Snapshot property key=value (repeatable).",
)
@click.pass_context
@catch_exception()
def delete_files(
ctx: Context,
identifier: str,
file_paths: tuple[str, ...],
branch: str | None,
properties: tuple[str, ...],
) -> None:
"""Remove one or more data files from the table by path."""
if not file_paths:
raise click.UsageError("At least one file path is required.")

catalog, output = _catalog_and_output(ctx)

snapshot_properties: dict[str, str] = {}
for prop in properties:
if "=" not in prop:
raise click.UsageError(f"Property must be in key=value form, got: {prop!r}")
key, _, value = prop.partition("=")
snapshot_properties[key] = value

table = catalog.load_table(identifier)

file_paths_list = []
for item in file_paths:
file_paths_list.append(item)

table.delete_files(file_paths=file_paths_list, branch=branch or MAIN_BRANCH, snapshot_properties=snapshot_properties)
output.text(f"Deleted {len(file_paths)} file(s) from {identifier}")


@run.command()
@click.argument("identifier")
@click.pass_context
Expand Down Expand Up @@ -470,3 +566,7 @@ def _retention_properties(ref: SnapshotRef, table_properties: dict[str, str]) ->
retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"

return retention_properties


if __name__ == "__main__":
run()
78 changes: 78 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,41 @@ def add_files(
for data_file in data_files:
append_files.append_data_file(data_file)

def delete_files(
self,
file_paths: list[str],
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for removing data files from the table transaction by their paths.

Args:
file_paths: The list of full file paths to be removed from the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch to delete files from

Raises:
ValueError: If file_paths contains duplicates
ValueError: If any file paths are not found in the table
"""
unique_file_paths = set(file_paths)

if len(file_paths) != len(unique_file_paths):
raise ValueError("File paths must be unique")

data_files = _get_data_files_from_snapshot(
table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch
)

missing_files = unique_file_paths - set(data_files.keys())
if missing_files:
raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}")

with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot:
for data_file in data_files.values():
overwrite_snapshot.delete_data_file(data_file)

def update_spec(self) -> UpdateSpec:
"""Create a new UpdateSpec to update the partitioning of the table.

Expand Down Expand Up @@ -1506,6 +1541,31 @@ def add_files(
branch=branch,
)

def delete_files(
self,
file_paths: list[str],
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for removing data files from the table by their paths.

Args:
file_paths: The list of full file paths to be removed from the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch to delete files from

Raises:
ValueError: If file_paths contains duplicates
ValueError: If any file paths are not found in the table
"""
with self.transaction() as tx:
tx.delete_files(
file_paths=file_paths,
snapshot_properties=snapshot_properties,
branch=branch,
)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -2175,3 +2235,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list
futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths]

return [f.result() for f in futures if f.result()]


def _get_data_files_from_snapshot(
table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH
) -> dict[str, DataFile]:
snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot()
if snapshot is None:
return {}

result: dict[str, DataFile] = {}
for manifest in snapshot.manifests(io):
if manifest.content == ManifestContent.DATA:
for entry in manifest.fetch_manifest_entry(io, discard_deleted=True):
if entry.data_file.file_path in file_paths:
result[entry.data_file.file_path] = entry.data_file
if len(result) == len(file_paths):
return result
return result
125 changes: 125 additions & 0 deletions tests/cli/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,128 @@ def test_log_level_cli_overrides_env(mocker: MockFixture) -> None:
mock_basicConfig.assert_called_once()
call_kwargs = mock_basicConfig.call_args[1]
assert call_kwargs["level"] == logging.ERROR


def test_add_files_invokes_table_api(catalog: InMemoryCatalog, mocker: MockFixture) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
mock_table = MagicMock()
mocker.patch.object(catalog, "load_table", return_value=mock_table)

runner = CliRunner()
result = runner.invoke(
run,
[
"add-files",
"default.my_table",
"s3://bucket/path/file.parquet",
"--branch",
"main",
"--property",
"k1=v1",
"-p",
"k2=v2",
],
)

if result.exit_code != 0:
out = (result.output or "") + (getattr(result, "stderr", "") or "")
raise AssertionError(f"exit_code={result.exit_code} output/stderr: {out!r}")
mock_table.add_files.assert_called_once()
call_kwargs = mock_table.add_files.call_args[1]
assert call_kwargs["file_paths"] == ["s3://bucket/path/file.parquet"]
assert call_kwargs["branch"] == "main"
assert call_kwargs["snapshot_properties"] == {"k1": "v1", "k2": "v2"}
assert call_kwargs["check_duplicate_files"] is True


def test_add_files_requires_at_least_one_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
runner = CliRunner()
result = runner.invoke(run, ["add-files", "default.my_table"])
assert result.exit_code != 0
out = (result.output or "") + (getattr(result, "stderr", "") or "")
assert "file path" in out.lower() or "required" in out.lower()


def test_add_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
runner = CliRunner()
result = runner.invoke(
run,
["add-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"],
)
assert result.exit_code != 0
out = (result.output or "") + (getattr(result, "stderr", "") or "")
assert "key=value" in out or "invalid_no_equals" in out


def test_add_files_table_does_not_exist(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
runner = CliRunner()
result = runner.invoke(run, ["add-files", "default.doesnotexist", "s3://bucket/file.parquet"])
assert result.exit_code != 0
out = (result.output or "") + (getattr(result, "stderr", "") or "")
assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out)


def test_add_files_no_check_duplicates_flag(catalog: InMemoryCatalog, mocker: MockFixture) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
mock_table = MagicMock()
mocker.patch.object(catalog, "load_table", return_value=mock_table)

runner = CliRunner()
result = runner.invoke(
run,
["add-files", "default.my_table", "s3://bucket/file.parquet", "--no-check-duplicates"],
)

assert result.exit_code == 0
mock_table.add_files.assert_called_once()
call_kwargs = mock_table.add_files.call_args[1]
assert call_kwargs["check_duplicate_files"] is False


def test_delete_files_invalid_property_format(catalog: InMemoryCatalog, mocker: MockFixture) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
runner = CliRunner()
result = runner.invoke(
run,
["delete-files", "default.my_table", "s3://bucket/file.parquet", "--property", "invalid_no_equals"],
)
assert result.exit_code != 0
out = (result.output or "") + (getattr(result, "stderr", "") or "")
assert "key=value" in out or "invalid_no_equals" in out


def test_delete_files_table_does_not_exist(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
runner = CliRunner()
result = runner.invoke(run, ["delete-files", "default.doesnotexist", "s3://bucket/file.parquet"])
assert result.exit_code != 0
out = (result.output or "") + (getattr(result, "stderr", "") or "")
assert "default.doesnotexist" in out and ("Table does not exist" in out or "does not exist" in out)
Loading