Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,8 +1449,19 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return StringType()
elif pa.types.is_date32(primitive):
return DateType()
elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
return TimeType()
elif isinstance(primitive, pa.Time64Type):
if primitive.unit == "us":
return TimeType()
elif primitive.unit == "ns":
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns' time precision. Downcasting to 'us'.")
return TimeType()
else:
raise TypeError(
"Iceberg does not yet support 'ns' time precision. "
"Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically "
"downcast 'ns' to 'us' on write.",
)
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit in ("s", "ms", "us"):
Expand Down Expand Up @@ -1969,6 +1980,12 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
elif field.field_type == TimeType():
if pa.types.is_time(target_type) and pa.types.is_time(values.type):
# Downcasting of nanoseconds to microseconds
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
elif isinstance(field.field_type, (IntegerType, LongType)):
# Cast smaller integer types to target type for cross-platform compatibility
# Only allow widening conversions (smaller bit width to larger)
Expand Down
31 changes: 30 additions & 1 deletion tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import uuid
import warnings
from collections.abc import Iterator
from datetime import date, datetime, timezone
from datetime import date, datetime, time, timezone
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -3077,6 +3077,35 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception(
assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value)


def test__to_requested_schema_time_ns_downcast() -> None:
"""Test that a time64[ns] column is downcast to time64[us] on write when the flag is set."""
requested_schema = Schema(NestedField(1, "time_field", TimeType(), required=False))
file_schema = requested_schema

arrow_schema = pa.schema([pa.field("time_field", pa.time64("ns"))])
data = pa.array([time(12, 0, 0, 1), None], type=pa.time64("ns"))
batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema)

result = _to_requested_schema(requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, include_field_ids=False)

assert result.schema[0].type == pa.time64("us")
assert result.column(0).to_pylist() == [time(12, 0, 0, 1), None]


def test__to_requested_schema_time_ns_without_downcast_raises_exception() -> None:
requested_schema = Schema(NestedField(1, "time_field", TimeType(), required=False))
file_schema = requested_schema

arrow_schema = pa.schema([pa.field("time_field", pa.time64("ns"))])
data = pa.array([time(12, 0, 0, 1), None], type=pa.time64("ns"))
batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema)

with pytest.raises(ValueError) as exc_info:
_to_requested_schema(requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False)

assert "Unsupported schema projection from time64[ns] to time64[us]" in str(exc_info.value)


@pytest.mark.parametrize(
"arrow_type,iceberg_type,expected_arrow_type",
[
Expand Down
16 changes: 15 additions & 1 deletion tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,24 @@ def test_pyarrow_time64_us_to_iceberg() -> None:

def test_pyarrow_time64_ns_to_iceberg() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' time precision. Use 'downcast-ns-timestamp-to-us-on-write' "
"configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_time64_ns_to_iceberg_downcast() -> None:
pyarrow_type = pa.time64("ns")
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg(downcast_ns_timestamp_to_us=True))
assert converted_iceberg_type == TimeType()
# 'ns' time is downcast to 'us' precision
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.time64("us")


@pytest.mark.parametrize("precision", ["s", "ms", "us", "ns"])
def test_pyarrow_timestamp_to_iceberg(precision: str) -> None:
pyarrow_type = pa.timestamp(unit=precision)
Expand Down