Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
DEFAULT_READ_VERSION: Literal[3] = 3

INITIAL_SEQUENCE_NUMBER = 0

Expand Down Expand Up @@ -852,6 +852,17 @@ def partitions(self) -> list[PartitionFieldSummary] | None:
def key_metadata(self) -> bytes | None:
return self._data[14]

@property
def first_row_id(self) -> int | None:
return self._data[15] if len(self._data) > 15 else None

@first_row_id.setter
def first_row_id(self, value: int | None) -> None:
if len(self._data) <= 15:
self._data.append(value)
else:
self._data[15] = value

def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0

Expand Down Expand Up @@ -1240,6 +1251,12 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
return entry


class ManifestWriterV3(ManifestWriterV2):
@property
def version(self) -> TableVersion:
return 3


def write_manifest(
format_version: TableVersion,
spec: PartitionSpec,
Expand All @@ -1252,6 +1269,8 @@ def write_manifest(
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 3:
return ManifestWriterV3(spec, schema, output_file, snapshot_id, avro_compression)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down Expand Up @@ -1295,6 +1314,10 @@ def __exit__(
@abstractmethod
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

@property
def next_row_id(self) -> int | None:
return None

def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
return self
Expand Down Expand Up @@ -1351,9 +1374,7 @@ def __init__(
self._commit_snapshot_id = snapshot_id
self._sequence_number = sequence_number

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = copy(manifest_file)

def _prepare_manifest_for_commit(self, wrapped_manifest_file: ManifestFile) -> ManifestFile:
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
# To validate this, check that the snapshot id matches the current commit
Expand All @@ -1374,6 +1395,59 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file.min_sequence_number = self._sequence_number
return wrapped_manifest_file

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
return self._prepare_manifest_for_commit(copy(manifest_file))


class ManifestListWriterV3(ManifestListWriterV2):
_next_row_id: int

def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: int | None,
sequence_number: int,
snapshot_first_row_id: int,
compression: AvroCompressionCodec,
):
super().__init__(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
compression=compression,
)
self._format_version = 3
self._meta = {
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"first-row-id": str(snapshot_first_row_id),
"format-version": "3",
AVRO_CODEC_KEY: compression,
}
self._next_row_id = snapshot_first_row_id

@property
def next_row_id(self) -> int | None:
return self._next_row_id

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = self._prepare_manifest_for_commit(copy(manifest_file))

if wrapped_manifest_file.content == ManifestContent.DATA and wrapped_manifest_file.first_row_id is None:
if wrapped_manifest_file.existing_rows_count is None or wrapped_manifest_file.added_rows_count is None:
raise ValueError(
"Cannot assign first row id for a v3 manifest without existing-rows-count and added-rows-count: "
f"{wrapped_manifest_file.manifest_path}"
)

wrapped_manifest_file.first_row_id = self._next_row_id
self._next_row_id += wrapped_manifest_file.existing_rows_count + wrapped_manifest_file.added_rows_count

return wrapped_manifest_file


def write_manifest_list(
format_version: TableVersion,
Expand All @@ -1382,12 +1456,26 @@ def write_manifest_list(
parent_snapshot_id: int | None,
sequence_number: int | None,
avro_compression: AvroCompressionCodec,
snapshot_first_row_id: int | None = None,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
elif format_version == 3:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V3 tables: {sequence_number}")
if snapshot_first_row_id is None:
raise ValueError(f"snapshot_first_row_id is required for V3 tables: {snapshot_first_row_id}")
return ManifestListWriterV3(
output_file=output_file,
snapshot_id=snapshot_id,
parent_snapshot_id=parent_snapshot_id,
sequence_number=sequence_number,
snapshot_first_row_id=snapshot_first_row_id,
compression=avro_compression,
)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
Returns:
The alter table builder.
"""
if format_version not in {1, 2}:
if format_version not in {1, 2, 3}:
raise ValueError(f"Unsupported table format version: {format_version}")

if format_version < self.table_metadata.format_version:
Expand Down
6 changes: 2 additions & 4 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
INITIAL_SPEC_ID = 0
DEFAULT_SCHEMA_ID = 0

SUPPORTED_TABLE_FORMAT_VERSION = 2
SUPPORTED_TABLE_FORMAT_VERSION = 3


def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -574,9 +574,6 @@ def construct_refs(self) -> TableMetadata:
next_row_id: int | None = Field(alias="next-row-id", default=None)
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""

def model_dump_json(self, exclude_none: bool = True, exclude: Any | None = None, by_alias: bool = True, **kwargs: Any) -> str:
raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")


TableMetadata = Annotated[TableMetadataV1 | TableMetadataV2 | TableMetadataV3, Field(discriminator="format_version")]

Expand Down Expand Up @@ -645,6 +642,7 @@ def new_table_metadata(
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
next_row_id=0,
)
else:
raise ValidationError(f"Unknown format version: {format_version}")
Expand Down
43 changes: 28 additions & 15 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil, TableMetadataV3
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
MetadataLogEntry,
Expand Down Expand Up @@ -320,9 +320,17 @@ def _(
return base_metadata

updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version})
updated_metadata = TableMetadataUtil._construct_without_validation(updated_metadata)

if (
isinstance(updated_metadata, TableMetadataV3)
and base_metadata.format_version < 3
and updated_metadata.next_row_id is None
):
updated_metadata = updated_metadata.model_copy(update={"next_row_id": 0})

context.add_update(update)
return TableMetadataUtil._construct_without_validation(updated_metadata)
return updated_metadata


@_apply_table_update.register(SetPropertiesUpdate)
Expand Down Expand Up @@ -433,7 +441,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None:
raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists")
elif (
base_metadata.format_version == 2
base_metadata.format_version >= 2
and update.snapshot.sequence_number is not None
and update.snapshot.sequence_number <= base_metadata.last_sequence_number
and update.snapshot.parent_snapshot_id is not None
Expand All @@ -454,20 +462,25 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
f"Cannot add a snapshot with first row id smaller than the table's next-row-id "
f"{update.snapshot.first_row_id} < {base_metadata.next_row_id}"
)
elif base_metadata.format_version >= 3 and update.snapshot.added_rows is None:
raise ValueError("Cannot add snapshot without added rows")
elif base_metadata.format_version >= 3 and base_metadata.next_row_id is None:
raise ValueError("Cannot add a snapshot when table next-row-id is null")

metadata_updates: dict[str, Any] = {
"last_updated_ms": update.snapshot.timestamp_ms,
"last_sequence_number": update.snapshot.sequence_number,
"snapshots": base_metadata.snapshots + [update.snapshot],
}
if base_metadata.format_version >= 3:
next_row_id = base_metadata.next_row_id
added_rows = update.snapshot.added_rows
if next_row_id is None or added_rows is None:
raise ValueError("Cannot compute next-row-id for v3 snapshot update")
metadata_updates["next_row_id"] = next_row_id + added_rows

context.add_update(update)
return base_metadata.model_copy(
update={
"last_updated_ms": update.snapshot.timestamp_ms,
"last_sequence_number": update.snapshot.sequence_number,
"snapshots": base_metadata.snapshots + [update.snapshot],
"next_row_id": base_metadata.next_row_id + update.snapshot.added_rows
if base_metadata.format_version >= 3
and base_metadata.next_row_id is not None
and update.snapshot.added_rows is not None
else None,
}
)
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(SetSnapshotRefUpdate)
Expand Down
25 changes: 19 additions & 6 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:

def _commit(self) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
table_metadata = self._transaction.table_metadata
next_sequence_number = table_metadata.next_sequence_number()

summary = self._summary(self.snapshot_properties)
file_name = _new_manifest_list_file_name(
Expand All @@ -287,29 +288,41 @@ def _commit(self) -> UpdatesAndRequirements:
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

snapshot_first_row_id: int | None = None
if table_metadata.format_version >= 3:
snapshot_first_row_id = table_metadata.next_row_id
if snapshot_first_row_id is None:
raise ValueError("Cannot commit to a v3 table without next-row-id")

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
format_version=table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
avro_compression=self._compression,
snapshot_first_row_id=snapshot_first_row_id,
) as writer:
writer.add_manifests(new_manifests)

first_row_id: int | None = None
added_rows: int | None = None
if table_metadata.format_version >= 3:
writer_next_row_id = writer.next_row_id
if writer_next_row_id is None or snapshot_first_row_id is None:
raise ValueError("Cannot determine assigned rows for a v3 snapshot commit")
added_rows = writer_next_row_id - snapshot_first_row_id

if self._transaction.table_metadata.format_version >= 3:
first_row_id = self._transaction.table_metadata.next_row_id
first_row_id: int | None = snapshot_first_row_id

snapshot = Snapshot(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
schema_id=self._transaction.table_metadata.current_schema_id,
schema_id=table_metadata.current_schema_id,
first_row_id=first_row_id,
added_rows=added_rows,
)

add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,10 @@ def test_upgrade_table_version(catalog: Catalog) -> None:
transaction.upgrade_table_version(format_version=1)
assert "Cannot downgrade v2 table to v1" in str(e.value)

with pytest.raises(ValueError) as e:
with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=3)
assert "Unsupported table format version: 3" in str(e.value)
with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=3)

assert table_test_table_version.format_version == 3


@pytest.mark.integration
Expand Down
14 changes: 9 additions & 5 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2336,10 +2336,10 @@ def test_nanosecond_support_on_catalog(

_create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions)

with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
catalog.create_table(
"ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"}
)
nanosecond_table = catalog.create_table(
"ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"}
)
assert nanosecond_table.format_version == 3

with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]")
Expand Down Expand Up @@ -2495,7 +2495,6 @@ def test_stage_only_overwrite_files(
assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot]


@pytest.mark.skip("V3 writer support is not enabled.")
@pytest.mark.integration
def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None:
"""Test writing to a v3 table and reading with Spark."""
Expand Down Expand Up @@ -2528,6 +2527,11 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat

tbl.append(test_data)

current_snapshot = tbl.current_snapshot()
assert current_snapshot is not None
assert current_snapshot.first_row_id == initial_next_row_id
assert current_snapshot.added_rows == len(test_data)

assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
"Expected next_row_id to be incremented by the number of added rows"
)
Loading