Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.common.time_utils import duration_to_iso8601, local_datetime_to_millis
from pypaimon.filesystem.caching_file_io import CachingFileIO
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
Expand Down Expand Up @@ -405,21 +406,18 @@ def create_tag(
time_retained: Optional[str] = None,
ignore_if_exists: bool = False,
) -> None:
if time_retained is not None:
# Python's Tag dataclass does not yet carry tag_create_time /
# tag_time_retained fields; supporting TTL on FileSystemCatalog
# requires extending Tag + TagManager and is tracked as a
# follow-up. Raise here instead of silently dropping the option,
# so callers cannot mistakenly believe the TTL took effect.
raise NotImplementedError(
"FileSystemCatalog does not yet support `time_retained` on "
"create_tag (requires extending the Python Tag dataclass + "
"TagManager).")
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
table = self.get_table(identifier)
try:
table.create_tag(tag_name, snapshot_id, ignore_if_exists)
# Keyword args: FileStoreTable.create_tag orders time_retained after
# ignore_if_exists (Catalog orders it before), so pass by name.
table.create_tag(
tag_name,
snapshot_id,
ignore_if_exists=ignore_if_exists,
time_retained=time_retained,
)
except ValueError as e:
# ``table.create_tag`` honors ``ignore_if_exists`` internally, so
# any "already exists" message that bubbles up here means the
Expand Down Expand Up @@ -453,16 +451,24 @@ def get_tag(
tag = table.tag_manager().get(tag_name)
if tag is None:
raise TagNotExistException(tag_name)
# tag_create_time / tag_time_retained are not tracked on the
# filesystem side yet — the Python Tag dataclass inherits only
# Snapshot fields. Returning ``None`` for both keeps the response
# shape compatible with the Java contract while making the gap
# visible to callers.
# Surface tag_create_time as epoch millis and tag_time_retained as an
# ISO-8601 duration string (types match the Java REST GetTagResponse
# Long / String). The millis treat the timezone-less create-time as UTC
# (consistent with the ``$tags`` system table); note Java's REST server
# converts using the system default zone, so the numeric value can
# differ on non-UTC hosts. Both are None for tags created without a
# retention (plain-snapshot tag files).
return GetTagResponse(
tag_name=tag_name,
snapshot=tag.trim_to_snapshot(),
tag_create_time=None,
tag_time_retained=None,
tag_create_time=(
None if tag.tag_create_time is None
else local_datetime_to_millis(tag.tag_create_time)
),
tag_time_retained=(
None if tag.tag_time_retained is None
else duration_to_iso8601(tag.tag_time_retained)
),
)

def list_tags_paged(
Expand Down
36 changes: 35 additions & 1 deletion paimon-python/pypaimon/common/json_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ def optional_json_field(json_name: str, json_include: str):
return field(metadata={"json_name": json_name, "json_include": json_include}, default=None)


def json_field_with_codec(json_name: str, encoder, decoder, json_include: str = "non_null"):
"""Create an optional field with a custom JSON name and value codec.

``encoder`` maps the Python value to its JSON representation on serialization;
``decoder`` maps the JSON representation back to the Python value on
deserialization. Both are only invoked for non-``None`` values. This is used
for fields whose on-disk JSON shape must match a non-trivial Java/Jackson
encoding (e.g. ``LocalDateTime`` arrays, ``Duration`` decimal seconds).
"""
return field(
metadata={
"json_name": json_name,
"json_include": json_include,
"encoder": encoder,
"decoder": decoder,
},
default=None,
)


class JSON:

@staticmethod
Expand Down Expand Up @@ -65,6 +85,12 @@ def __to_dict(obj: Any) -> Dict[str, Any]:
if field_info.metadata.get("json_include", None) == "non_null":
continue

# Custom value codec (e.g. Java LocalDateTime / Duration encodings)
encoder = field_info.metadata.get("encoder")
if encoder is not None and field_value is not None:
result[json_name] = encoder(field_value)
continue

# Handle nested objects
if hasattr(field_value, "to_dict"):
result[json_name] = field_value.to_dict()
Expand Down Expand Up @@ -93,9 +119,13 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
# Create field name mapping (json_name -> field_name)
field_mapping = {}
type_mapping = {}
decoder_mapping = {}
for field_info in fields(target_class):
json_name = field_info.metadata.get("json_name", field_info.name)
field_mapping[json_name] = field_info.name
decoder = field_info.metadata.get("decoder")
if decoder is not None:
decoder_mapping[json_name] = decoder
origin_type = getattr(field_info.type, '__origin__', None)
args = getattr(field_info.type, '__args__', None)
field_type = field_info.type
Expand All @@ -113,7 +143,11 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
for json_name, value in data.items():
if json_name in field_mapping:
field_name = field_mapping[json_name]
if json_name in type_mapping:
if json_name in decoder_mapping:
kwargs[field_name] = (
None if value is None else decoder_mapping[json_name](value)
)
elif json_name in type_mapping:
tp = getattr(type_mapping[json_name], '__origin__', None)
if tp in (list, List):
item_type = getattr(type_mapping[json_name], '__args__', None)[0]
Expand Down
181 changes: 181 additions & 0 deletions paimon-python/pypaimon/common/time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import calendar
from datetime import datetime, timedelta
from typing import List


def parse_duration(text: str) -> int:
if text is None:
raise ValueError("text cannot be None")
Expand Down Expand Up @@ -77,3 +82,179 @@ def parse_duration(text: str) -> int:
raise ValueError(f"Duration cannot be negative: {text}")

return result_ms_int


# Unit label -> nanoseconds. Mirrors the aliases accepted by ``parse_duration``
# (and Java ``TimeUtils.parseDuration``); a bare number with no unit is treated
# as milliseconds, matching ``parse_duration``.
_UNIT_TO_NANOS = {
'ns': 1, 'nano': 1, 'nanosecond': 1, 'nanoseconds': 1,
'µs': 1_000, 'micro': 1_000, 'microsecond': 1_000, 'microseconds': 1_000,
'ms': 1_000_000, 'milli': 1_000_000, 'millisecond': 1_000_000, 'milliseconds': 1_000_000,
's': 1_000_000_000, 'sec': 1_000_000_000, 'second': 1_000_000_000, 'seconds': 1_000_000_000,
'm': 60_000_000_000, 'min': 60_000_000_000, 'minute': 60_000_000_000, 'minutes': 60_000_000_000,
'h': 3_600_000_000_000, 'hour': 3_600_000_000_000, 'hours': 3_600_000_000_000,
'd': 86_400_000_000_000, 'day': 86_400_000_000_000, 'days': 86_400_000_000_000,
}


def parse_duration_nanos(text: str) -> int:
"""Parse a duration string to an integer nanosecond count.

This is the full-precision integer counterpart of :func:`parse_duration`
(which returns rounded milliseconds): it accepts the same unit aliases and
the same "bare number means milliseconds" convention, but keeps sub-millisecond
units exactly (``"1ns"`` -> ``1``, ``"500micro"`` -> ``500_000``) instead of
rounding them to zero. Use it where sub-millisecond precision must not be
silently dropped. ``parse_duration`` is intentionally left untouched so its
millisecond contract (relied on by option parsing) does not change.
"""
if text is None:
raise ValueError("text cannot be None")

trimmed = text.strip().lower()
if not trimmed:
raise ValueError("argument is an empty- or whitespace-only string")

pos = 0
while pos < len(trimmed) and trimmed[pos].isdigit():
pos += 1

number_str = trimmed[:pos]
unit_str = trimmed[pos:].strip()

if not number_str:
raise ValueError("text does not start with a number")

try:
value = int(number_str)
except ValueError:
raise ValueError(
f"The value '{number_str}' cannot be re represented as 64bit number (numeric overflow)."
)

if not unit_str:
nanos_per_unit = 1_000_000 # bare number is milliseconds
elif unit_str in _UNIT_TO_NANOS:
nanos_per_unit = _UNIT_TO_NANOS[unit_str]
else:
supported_units = (
'DAYS: (d | day | days), '
'HOURS: (h | hour | hours), '
'MINUTES: (m | min | minute | minutes), '
'SECONDS: (s | sec | second | seconds), '
'MILLISECONDS: (ms | milli | millisecond | milliseconds), '
'MICROSECONDS: (µs | micro | microsecond | microseconds), '
'NANOSECONDS: (ns | nano | nanosecond | nanoseconds)'
)
raise ValueError(
f"Time interval unit label '{unit_str}' does not match any of the recognized units: "
f"{supported_units}"
)

return value * nanos_per_unit


# ---------------------------------------------------------------------------
# Codecs for Java temporal types as serialized by Jackson's JavaTimeModule.
#
# These mirror the on-disk JSON shapes that Paimon's Java side reads/writes so
# that a tag file written by pypaimon is interoperable with Java and vice versa
# (see ``org.apache.paimon.tag.Tag`` and ``TagTest``):
# - ``LocalDateTime`` -> JSON array ``[year, month, day, hour, minute,
# second, nanoOfSecond]``
# - ``Duration`` -> JSON number of (fractional) seconds, e.g. ``86400.0``
#
# Resolution note: Python's ``datetime`` / ``timedelta`` are microsecond-based,
# so a Java create-time or duration finer than a microsecond is truncated to
# microseconds on read. Tag granularity is coarse, so this is not a concern in
# practice.
# ---------------------------------------------------------------------------

_NANOS_PER_MICRO = 1000


def local_datetime_to_json_array(dt: datetime) -> List[int]:
"""Encode a naive ``datetime`` as Java ``LocalDateTime`` array form.

Python ``datetime`` only has microsecond resolution, so the emitted
nanoOfSecond is ``microsecond * 1000`` (never finer than microseconds).
"""
return [
dt.year,
dt.month,
dt.day,
dt.hour,
dt.minute,
dt.second,
dt.microsecond * _NANOS_PER_MICRO,
]


def json_array_to_local_datetime(arr: List[int]) -> datetime:
"""Decode a Java ``LocalDateTime`` array form into a naive ``datetime``.

Jackson omits trailing zero components, so the array may be shorter than 7;
missing components default to 0. A Java nanoOfSecond finer than a
microsecond is truncated to microseconds (Python's resolution limit).
"""
padded = list(arr) + [0] * (7 - len(arr))
year, month, day, hour, minute, second, nano = padded[:7]
return datetime(
year, month, day, hour, minute, second, nano // _NANOS_PER_MICRO
)


def duration_to_json_seconds(td: timedelta):
"""Encode a ``timedelta`` as Java ``Duration`` decimal-seconds number."""
return td.total_seconds()


def json_seconds_to_duration(seconds) -> timedelta:
"""Decode a Java ``Duration`` decimal-seconds number into a ``timedelta``."""
return timedelta(seconds=seconds)


def duration_to_iso8601(td: timedelta) -> str:
"""Render a non-negative ``timedelta`` like ``java.time.Duration.toString()``.

Examples: 1 day -> ``PT24H`` (Duration has no day unit), 30 min ->
``PT30M``, 5 s -> ``PT5S``, zero -> ``PT0S``. Matches what Paimon's Java
``$tags`` system table surfaces for ``time_retained``. Retentions only ever
come from ``parse_duration``, which rejects negatives, so only the
non-negative form is supported.
"""
total_micros = (
td.days * 86_400_000_000 + td.seconds * 1_000_000 + td.microseconds
)
if total_micros <= 0:
return "PT0S"

total_seconds, micros = divmod(total_micros, 1_000_000)
hours, rem = divmod(total_seconds, 3600)
minutes, secs = divmod(rem, 60)

buf = "PT"
if hours != 0:
buf += "{}H".format(hours)
if minutes != 0:
buf += "{}M".format(minutes)
if secs != 0 or micros != 0 or buf == "PT":
frac = ""
if micros != 0:
frac = "." + ("%06d" % micros).rstrip("0")
buf += "{}{}S".format(secs, frac)
return buf


def local_datetime_to_millis(dt: datetime) -> int:
"""Convert a naive ``LocalDateTime`` to epoch millis (treated as UTC).

The tag create-time is timezone-less (Java ``LocalDateTime``); interpreting
it as UTC keeps the millis self-consistent with the ``$tags`` system table,
whose ``create_time`` mirrors Java ``Timestamp.fromLocalDateTime`` (also
zone-less wall-clock arithmetic). Uses integer math with floored
sub-millisecond truncation (matching Java ``Instant.toEpochMilli``), so it
is exact and correct for pre-epoch instants too.
"""
return calendar.timegm(dt.timetuple()) * 1000 + dt.microsecond // 1000
18 changes: 14 additions & 4 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def create_tag(
self,
tag_name: str,
snapshot_id: Optional[int] = None,
ignore_if_exists: bool = False
ignore_if_exists: bool = False,
time_retained: Optional[str] = None
) -> None:
"""
Create a tag for a snapshot.
Expand All @@ -169,6 +170,8 @@ def create_tag(
tag_name: Name for the tag
snapshot_id: ID of the snapshot to tag. If None, uses the latest snapshot.
ignore_if_exists: If True, don't raise error if tag already exists
time_retained: Optional retention (e.g. ``"1d"``); when set, the tag
carries a create-time and TTL.

Raises:
ValueError: If no snapshot exists or tag already exists (when ignore_if_exists=False)
Expand All @@ -186,7 +189,7 @@ def create_tag(
raise ValueError("No snapshot exists in this table.")

tag_mgr = self.tag_manager()
tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists)
tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists, time_retained)

def delete_tag(self, tag_name: str) -> bool:
"""
Expand Down Expand Up @@ -335,14 +338,21 @@ def rename_tag(self, old_name: str, new_name: str) -> None:
tag_mgr = self.tag_manager()
tag_mgr.rename_tag(old_name, new_name)

def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None:
def replace_tag(
self,
tag_name: str,
snapshot_id: int = None,
time_retained: Optional[str] = None
) -> None:
"""
Replace an existing tag with a new snapshot.

Args:
tag_name: Name of the tag to replace
snapshot_id: The snapshot id to associate with the tag.
If None, uses the latest snapshot.
time_retained: Optional retention (e.g. ``"1d"``); when set, the
replaced tag carries a create-time and TTL.

Raises:
ValueError: If tag doesn't exist, or snapshot doesn't exist
Expand All @@ -355,7 +365,7 @@ def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None:
snapshot = self.snapshot_manager().get_snapshot_by_id(snapshot_id)
if snapshot is None:
raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
self.tag_manager().replace_tag(snapshot, tag_name)
self.tag_manager().replace_tag(snapshot, tag_name, time_retained)

def path_factory(self) -> 'FileStorePathFactory':
from pypaimon.utils.file_store_path_factory import FileStorePathFactory
Expand Down
Loading
Loading