diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index b7356b45955b..510ea270c84a 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -37,6 +37,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier +from pypaimon.common.time_utils import duration_to_iso8601, local_datetime_to_millis from pypaimon.filesystem.caching_file_io import CachingFileIO from pypaimon.schema.schema_change import SchemaChange from pypaimon.schema.schema_manager import SchemaManager @@ -405,21 +406,18 @@ def create_tag( time_retained: Optional[str] = None, ignore_if_exists: bool = False, ) -> None: - if time_retained is not None: - # Python's Tag dataclass does not yet carry tag_create_time / - # tag_time_retained fields; supporting TTL on FileSystemCatalog - # requires extending Tag + TagManager and is tracked as a - # follow-up. Raise here instead of silently dropping the option, - # so callers cannot mistakenly believe the TTL took effect. - raise NotImplementedError( - "FileSystemCatalog does not yet support `time_retained` on " - "create_tag (requires extending the Python Tag dataclass + " - "TagManager).") if not isinstance(identifier, Identifier): identifier = Identifier.from_string(identifier) table = self.get_table(identifier) try: - table.create_tag(tag_name, snapshot_id, ignore_if_exists) + # Keyword args: FileStoreTable.create_tag orders time_retained after + # ignore_if_exists (Catalog orders it before), so pass by name. + table.create_tag( + tag_name, + snapshot_id, + ignore_if_exists=ignore_if_exists, + time_retained=time_retained, + ) except ValueError as e: # ``table.create_tag`` honors ``ignore_if_exists`` internally, so # any "already exists" message that bubbles up here means the @@ -453,16 +451,24 @@ def get_tag( tag = table.tag_manager().get(tag_name) if tag is None: raise TagNotExistException(tag_name) - # tag_create_time / tag_time_retained are not tracked on the - # filesystem side yet — the Python Tag dataclass inherits only - # Snapshot fields. Returning ``None`` for both keeps the response - # shape compatible with the Java contract while making the gap - # visible to callers. + # Surface tag_create_time as epoch millis and tag_time_retained as an + # ISO-8601 duration string (types match the Java REST GetTagResponse + # Long / String). The millis treat the timezone-less create-time as UTC + # (consistent with the ``$tags`` system table); note Java's REST server + # converts using the system default zone, so the numeric value can + # differ on non-UTC hosts. Both are None for tags created without a + # retention (plain-snapshot tag files). return GetTagResponse( tag_name=tag_name, snapshot=tag.trim_to_snapshot(), - tag_create_time=None, - tag_time_retained=None, + tag_create_time=( + None if tag.tag_create_time is None + else local_datetime_to_millis(tag.tag_create_time) + ), + tag_time_retained=( + None if tag.tag_time_retained is None + else duration_to_iso8601(tag.tag_time_retained) + ), ) def list_tags_paged( diff --git a/paimon-python/pypaimon/common/json_util.py b/paimon-python/pypaimon/common/json_util.py index 6b06f8880524..de1421d7addc 100644 --- a/paimon-python/pypaimon/common/json_util.py +++ b/paimon-python/pypaimon/common/json_util.py @@ -32,6 +32,26 @@ def optional_json_field(json_name: str, json_include: str): return field(metadata={"json_name": json_name, "json_include": json_include}, default=None) +def json_field_with_codec(json_name: str, encoder, decoder, json_include: str = "non_null"): + """Create an optional field with a custom JSON name and value codec. + + ``encoder`` maps the Python value to its JSON representation on serialization; + ``decoder`` maps the JSON representation back to the Python value on + deserialization. Both are only invoked for non-``None`` values. This is used + for fields whose on-disk JSON shape must match a non-trivial Java/Jackson + encoding (e.g. ``LocalDateTime`` arrays, ``Duration`` decimal seconds). + """ + return field( + metadata={ + "json_name": json_name, + "json_include": json_include, + "encoder": encoder, + "decoder": decoder, + }, + default=None, + ) + + class JSON: @staticmethod @@ -65,6 +85,12 @@ def __to_dict(obj: Any) -> Dict[str, Any]: if field_info.metadata.get("json_include", None) == "non_null": continue + # Custom value codec (e.g. Java LocalDateTime / Duration encodings) + encoder = field_info.metadata.get("encoder") + if encoder is not None and field_value is not None: + result[json_name] = encoder(field_value) + continue + # Handle nested objects if hasattr(field_value, "to_dict"): result[json_name] = field_value.to_dict() @@ -93,9 +119,13 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T: # Create field name mapping (json_name -> field_name) field_mapping = {} type_mapping = {} + decoder_mapping = {} for field_info in fields(target_class): json_name = field_info.metadata.get("json_name", field_info.name) field_mapping[json_name] = field_info.name + decoder = field_info.metadata.get("decoder") + if decoder is not None: + decoder_mapping[json_name] = decoder origin_type = getattr(field_info.type, '__origin__', None) args = getattr(field_info.type, '__args__', None) field_type = field_info.type @@ -113,7 +143,11 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T: for json_name, value in data.items(): if json_name in field_mapping: field_name = field_mapping[json_name] - if json_name in type_mapping: + if json_name in decoder_mapping: + kwargs[field_name] = ( + None if value is None else decoder_mapping[json_name](value) + ) + elif json_name in type_mapping: tp = getattr(type_mapping[json_name], '__origin__', None) if tp in (list, List): item_type = getattr(type_mapping[json_name], '__args__', None)[0] diff --git a/paimon-python/pypaimon/common/time_utils.py b/paimon-python/pypaimon/common/time_utils.py index 763f1bc5f2ca..8eecee31f85c 100644 --- a/paimon-python/pypaimon/common/time_utils.py +++ b/paimon-python/pypaimon/common/time_utils.py @@ -15,6 +15,11 @@ # specific language governing permissions and limitations # under the License. +import calendar +from datetime import datetime, timedelta +from typing import List + + def parse_duration(text: str) -> int: if text is None: raise ValueError("text cannot be None") @@ -77,3 +82,179 @@ def parse_duration(text: str) -> int: raise ValueError(f"Duration cannot be negative: {text}") return result_ms_int + + +# Unit label -> nanoseconds. Mirrors the aliases accepted by ``parse_duration`` +# (and Java ``TimeUtils.parseDuration``); a bare number with no unit is treated +# as milliseconds, matching ``parse_duration``. +_UNIT_TO_NANOS = { + 'ns': 1, 'nano': 1, 'nanosecond': 1, 'nanoseconds': 1, + 'µs': 1_000, 'micro': 1_000, 'microsecond': 1_000, 'microseconds': 1_000, + 'ms': 1_000_000, 'milli': 1_000_000, 'millisecond': 1_000_000, 'milliseconds': 1_000_000, + 's': 1_000_000_000, 'sec': 1_000_000_000, 'second': 1_000_000_000, 'seconds': 1_000_000_000, + 'm': 60_000_000_000, 'min': 60_000_000_000, 'minute': 60_000_000_000, 'minutes': 60_000_000_000, + 'h': 3_600_000_000_000, 'hour': 3_600_000_000_000, 'hours': 3_600_000_000_000, + 'd': 86_400_000_000_000, 'day': 86_400_000_000_000, 'days': 86_400_000_000_000, +} + + +def parse_duration_nanos(text: str) -> int: + """Parse a duration string to an integer nanosecond count. + + This is the full-precision integer counterpart of :func:`parse_duration` + (which returns rounded milliseconds): it accepts the same unit aliases and + the same "bare number means milliseconds" convention, but keeps sub-millisecond + units exactly (``"1ns"`` -> ``1``, ``"500micro"`` -> ``500_000``) instead of + rounding them to zero. Use it where sub-millisecond precision must not be + silently dropped. ``parse_duration`` is intentionally left untouched so its + millisecond contract (relied on by option parsing) does not change. + """ + if text is None: + raise ValueError("text cannot be None") + + trimmed = text.strip().lower() + if not trimmed: + raise ValueError("argument is an empty- or whitespace-only string") + + pos = 0 + while pos < len(trimmed) and trimmed[pos].isdigit(): + pos += 1 + + number_str = trimmed[:pos] + unit_str = trimmed[pos:].strip() + + if not number_str: + raise ValueError("text does not start with a number") + + try: + value = int(number_str) + except ValueError: + raise ValueError( + f"The value '{number_str}' cannot be re represented as 64bit number (numeric overflow)." + ) + + if not unit_str: + nanos_per_unit = 1_000_000 # bare number is milliseconds + elif unit_str in _UNIT_TO_NANOS: + nanos_per_unit = _UNIT_TO_NANOS[unit_str] + else: + supported_units = ( + 'DAYS: (d | day | days), ' + 'HOURS: (h | hour | hours), ' + 'MINUTES: (m | min | minute | minutes), ' + 'SECONDS: (s | sec | second | seconds), ' + 'MILLISECONDS: (ms | milli | millisecond | milliseconds), ' + 'MICROSECONDS: (µs | micro | microsecond | microseconds), ' + 'NANOSECONDS: (ns | nano | nanosecond | nanoseconds)' + ) + raise ValueError( + f"Time interval unit label '{unit_str}' does not match any of the recognized units: " + f"{supported_units}" + ) + + return value * nanos_per_unit + + +# --------------------------------------------------------------------------- +# Codecs for Java temporal types as serialized by Jackson's JavaTimeModule. +# +# These mirror the on-disk JSON shapes that Paimon's Java side reads/writes so +# that a tag file written by pypaimon is interoperable with Java and vice versa +# (see ``org.apache.paimon.tag.Tag`` and ``TagTest``): +# - ``LocalDateTime`` -> JSON array ``[year, month, day, hour, minute, +# second, nanoOfSecond]`` +# - ``Duration`` -> JSON number of (fractional) seconds, e.g. ``86400.0`` +# +# Resolution note: Python's ``datetime`` / ``timedelta`` are microsecond-based, +# so a Java create-time or duration finer than a microsecond is truncated to +# microseconds on read. Tag granularity is coarse, so this is not a concern in +# practice. +# --------------------------------------------------------------------------- + +_NANOS_PER_MICRO = 1000 + + +def local_datetime_to_json_array(dt: datetime) -> List[int]: + """Encode a naive ``datetime`` as Java ``LocalDateTime`` array form. + + Python ``datetime`` only has microsecond resolution, so the emitted + nanoOfSecond is ``microsecond * 1000`` (never finer than microseconds). + """ + return [ + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, + dt.microsecond * _NANOS_PER_MICRO, + ] + + +def json_array_to_local_datetime(arr: List[int]) -> datetime: + """Decode a Java ``LocalDateTime`` array form into a naive ``datetime``. + + Jackson omits trailing zero components, so the array may be shorter than 7; + missing components default to 0. A Java nanoOfSecond finer than a + microsecond is truncated to microseconds (Python's resolution limit). + """ + padded = list(arr) + [0] * (7 - len(arr)) + year, month, day, hour, minute, second, nano = padded[:7] + return datetime( + year, month, day, hour, minute, second, nano // _NANOS_PER_MICRO + ) + + +def duration_to_json_seconds(td: timedelta): + """Encode a ``timedelta`` as Java ``Duration`` decimal-seconds number.""" + return td.total_seconds() + + +def json_seconds_to_duration(seconds) -> timedelta: + """Decode a Java ``Duration`` decimal-seconds number into a ``timedelta``.""" + return timedelta(seconds=seconds) + + +def duration_to_iso8601(td: timedelta) -> str: + """Render a non-negative ``timedelta`` like ``java.time.Duration.toString()``. + + Examples: 1 day -> ``PT24H`` (Duration has no day unit), 30 min -> + ``PT30M``, 5 s -> ``PT5S``, zero -> ``PT0S``. Matches what Paimon's Java + ``$tags`` system table surfaces for ``time_retained``. Retentions only ever + come from ``parse_duration``, which rejects negatives, so only the + non-negative form is supported. + """ + total_micros = ( + td.days * 86_400_000_000 + td.seconds * 1_000_000 + td.microseconds + ) + if total_micros <= 0: + return "PT0S" + + total_seconds, micros = divmod(total_micros, 1_000_000) + hours, rem = divmod(total_seconds, 3600) + minutes, secs = divmod(rem, 60) + + buf = "PT" + if hours != 0: + buf += "{}H".format(hours) + if minutes != 0: + buf += "{}M".format(minutes) + if secs != 0 or micros != 0 or buf == "PT": + frac = "" + if micros != 0: + frac = "." + ("%06d" % micros).rstrip("0") + buf += "{}{}S".format(secs, frac) + return buf + + +def local_datetime_to_millis(dt: datetime) -> int: + """Convert a naive ``LocalDateTime`` to epoch millis (treated as UTC). + + The tag create-time is timezone-less (Java ``LocalDateTime``); interpreting + it as UTC keeps the millis self-consistent with the ``$tags`` system table, + whose ``create_time`` mirrors Java ``Timestamp.fromLocalDateTime`` (also + zone-less wall-clock arithmetic). Uses integer math with floored + sub-millisecond truncation (matching Java ``Instant.toEpochMilli``), so it + is exact and correct for pre-epoch instants too. + """ + return calendar.timegm(dt.timetuple()) * 1000 + dt.microsecond // 1000 diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 880580688390..dd561eb55bc2 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -160,7 +160,8 @@ def create_tag( self, tag_name: str, snapshot_id: Optional[int] = None, - ignore_if_exists: bool = False + ignore_if_exists: bool = False, + time_retained: Optional[str] = None ) -> None: """ Create a tag for a snapshot. @@ -169,6 +170,8 @@ def create_tag( tag_name: Name for the tag snapshot_id: ID of the snapshot to tag. If None, uses the latest snapshot. ignore_if_exists: If True, don't raise error if tag already exists + time_retained: Optional retention (e.g. ``"1d"``); when set, the tag + carries a create-time and TTL. Raises: ValueError: If no snapshot exists or tag already exists (when ignore_if_exists=False) @@ -186,7 +189,7 @@ def create_tag( raise ValueError("No snapshot exists in this table.") tag_mgr = self.tag_manager() - tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists) + tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists, time_retained) def delete_tag(self, tag_name: str) -> bool: """ @@ -335,7 +338,12 @@ def rename_tag(self, old_name: str, new_name: str) -> None: tag_mgr = self.tag_manager() tag_mgr.rename_tag(old_name, new_name) - def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None: + def replace_tag( + self, + tag_name: str, + snapshot_id: int = None, + time_retained: Optional[str] = None + ) -> None: """ Replace an existing tag with a new snapshot. @@ -343,6 +351,8 @@ def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None: tag_name: Name of the tag to replace snapshot_id: The snapshot id to associate with the tag. If None, uses the latest snapshot. + time_retained: Optional retention (e.g. ``"1d"``); when set, the + replaced tag carries a create-time and TTL. Raises: ValueError: If tag doesn't exist, or snapshot doesn't exist @@ -355,7 +365,7 @@ def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None: snapshot = self.snapshot_manager().get_snapshot_by_id(snapshot_id) if snapshot is None: raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.") - self.tag_manager().replace_tag(snapshot, tag_name) + self.tag_manager().replace_tag(snapshot, tag_name, time_retained) def path_factory(self) -> 'FileStorePathFactory': from pypaimon.utils.file_store_path_factory import FileStorePathFactory diff --git a/paimon-python/pypaimon/table/system/tags_table.py b/paimon-python/pypaimon/table/system/tags_table.py index 7cc50ec7a96d..dc9bc18636a7 100644 --- a/paimon-python/pypaimon/table/system/tags_table.py +++ b/paimon-python/pypaimon/table/system/tags_table.py @@ -21,6 +21,7 @@ import pyarrow +from pypaimon.common.time_utils import duration_to_iso8601, local_datetime_to_millis from pypaimon.schema.data_types import AtomicType, DataField, RowType from pypaimon.table.system.system_table import SystemTable @@ -73,10 +74,12 @@ def _build_arrow_table(self) -> pyarrow.Table: record_counts.append( None if tag.total_record_count is None else int(tag.total_record_count)) - # TODO: surface create_time and time_retained once the Tag - # dataclass carries them. - create_times.append(None) - time_retained.append(None) + create_times.append( + None if tag.tag_create_time is None + else local_datetime_to_millis(tag.tag_create_time)) + time_retained.append( + None if tag.tag_time_retained is None + else duration_to_iso8601(tag.tag_time_retained)) return pyarrow.table({ "tag_name": pyarrow.array(names, type=pyarrow.string()), diff --git a/paimon-python/pypaimon/tag/tag.py b/paimon-python/pypaimon/tag/tag.py index d9911a16dd54..f27c7a587546 100644 --- a/paimon-python/pypaimon/tag/tag.py +++ b/paimon-python/pypaimon/tag/tag.py @@ -15,10 +15,76 @@ # specific language governing permissions and limitations # under the License. +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Optional + +from pypaimon.common.json_util import json_field_with_codec +from pypaimon.common.time_utils import ( + duration_to_json_seconds, + json_array_to_local_datetime, + json_seconds_to_duration, + local_datetime_to_json_array, +) from pypaimon.snapshot.snapshot import Snapshot +@dataclass class Tag(Snapshot): + """A Snapshot with optional ``tagCreateTime`` and ``tagTimeRetained`` (TTL). + + Both fields mirror Java ``org.apache.paimon.tag.Tag`` and are serialized in + the same on-disk JSON shape (LocalDateTime as a ``[y, mo, d, h, mi, s, ns]`` + array, Duration as decimal seconds), so tag files round-trip across the Java + and Python SDKs. They are ``None`` for tags created without a retention. + """ + + tag_create_time: Optional[datetime] = json_field_with_codec( + "tagCreateTime", + encoder=local_datetime_to_json_array, + decoder=json_array_to_local_datetime, + ) + tag_time_retained: Optional[timedelta] = json_field_with_codec( + "tagTimeRetained", + encoder=duration_to_json_seconds, + decoder=json_seconds_to_duration, + ) + + @staticmethod + def from_snapshot_and_tag_ttl( + snapshot: Snapshot, + tag_time_retained: Optional[timedelta], + tag_create_time: datetime, + ) -> "Tag": + """Build a Tag from a Snapshot plus tag-specific TTL metadata. + + Mirrors Java ``Tag.fromSnapshotAndTagTtl``. + """ + return Tag( + version=snapshot.version, + id=snapshot.id, + schema_id=snapshot.schema_id, + base_manifest_list=snapshot.base_manifest_list, + delta_manifest_list=snapshot.delta_manifest_list, + total_record_count=snapshot.total_record_count, + delta_record_count=snapshot.delta_record_count, + commit_user=snapshot.commit_user, + commit_identifier=snapshot.commit_identifier, + commit_kind=snapshot.commit_kind, + time_millis=snapshot.time_millis, + base_manifest_list_size=snapshot.base_manifest_list_size, + delta_manifest_list_size=snapshot.delta_manifest_list_size, + changelog_manifest_list=snapshot.changelog_manifest_list, + changelog_manifest_list_size=snapshot.changelog_manifest_list_size, + index_manifest=snapshot.index_manifest, + changelog_record_count=snapshot.changelog_record_count, + watermark=snapshot.watermark, + statistics=snapshot.statistics, + next_row_id=snapshot.next_row_id, + properties=snapshot.properties, + tag_create_time=tag_create_time, + tag_time_retained=tag_time_retained, + ) def trim_to_snapshot(self) -> Snapshot: """Convert this Tag to a Snapshot""" diff --git a/paimon-python/pypaimon/tag/tag_manager.py b/paimon-python/pypaimon/tag/tag_manager.py index 45a1810414f9..96f4eead8e40 100644 --- a/paimon-python/pypaimon/tag/tag_manager.py +++ b/paimon-python/pypaimon/tag/tag_manager.py @@ -17,10 +17,12 @@ import logging import os +from datetime import datetime, timedelta from typing import Optional from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import JSON +from pypaimon.common.time_utils import parse_duration_nanos from pypaimon.snapshot.snapshot import Snapshot from pypaimon.tag.tag import Tag @@ -118,16 +120,19 @@ def create_tag( self, snapshot: Snapshot, tag_name: str, - ignore_if_exists: bool = False + ignore_if_exists: bool = False, + time_retained: Optional[str] = None ) -> None: """ Create a tag from given snapshot and save it in the storage. - + Args: snapshot: The snapshot to tag tag_name: Name for the tag ignore_if_exists: If True, don't raise error if tag already exists - + time_retained: Optional retention (e.g. ``"1d"``, ``"12h"``); when + set, the tag carries a create-time and TTL. + Raises: ValueError: If tag_name is blank or tag already exists (when ignore_if_exists=False) """ @@ -139,7 +144,7 @@ def create_tag( return raise ValueError(f"Tag '{tag_name}' already exists.") - self._create_or_replace_tag(snapshot, tag_name) + self._create_or_replace_tag(snapshot, tag_name, time_retained) def list_tags(self): """List all tags.""" @@ -161,7 +166,8 @@ def list_tags(self): def _create_or_replace_tag( self, snapshot: Snapshot, - tag_name: str + tag_name: str, + time_retained: Optional[str] = None ) -> None: """ Internal method to create or replace a tag. @@ -173,7 +179,27 @@ def _create_or_replace_tag( if not self.file_io.exists(tag_dir): self.file_io.mkdirs(tag_dir) - content = JSON.to_json(snapshot) + # Mirror Java TagManager.createOrReplaceTag: when no retention is set, + # write the plain Snapshot JSON (no tag-specific fields) so the file + # stays readable by older readers. Only write the richer Tag JSON when a + # retention (and thus a create-time) is present. + if time_retained is not None: + # parse_duration_nanos keeps sub-millisecond units exactly. Python's + # timedelta is microsecond-resolution, so reject sub-microsecond + # retentions (e.g. "1ns") instead of silently writing a zero-TTL tag. + nanos = parse_duration_nanos(time_retained) + micros, sub_micro = divmod(nanos, 1000) + if sub_micro: + raise ValueError( + f"time_retained '{time_retained}' specifies sub-microsecond " + f"precision, which pypaimon cannot represent (Python timedelta " + f"resolution is microseconds). Use a value of at least 1 microsecond." + ) + retained = timedelta(microseconds=micros) + tag = Tag.from_snapshot_and_tag_ttl(snapshot, retained, datetime.now()) + content = JSON.to_json(tag) + else: + content = JSON.to_json(snapshot) self.file_io.overwrite_file_utf8(tag_path, content) @@ -199,13 +225,20 @@ def delete_tag(self, tag_name: str) -> bool: self.file_io.delete_quietly(path) return True - def replace_tag(self, snapshot: Snapshot, tag_name: str) -> None: + def replace_tag( + self, + snapshot: Snapshot, + tag_name: str, + time_retained: Optional[str] = None + ) -> None: """ Replace an existing tag with a new snapshot. Args: snapshot: The new snapshot to associate with the tag tag_name: Name of the tag to replace + time_retained: Optional retention (e.g. ``"1d"``); when set, the + replaced tag carries a create-time and TTL. Raises: ValueError: If tag_name is blank or tag doesn't exist @@ -216,7 +249,7 @@ def replace_tag(self, snapshot: Snapshot, tag_name: str) -> None: if not self.tag_exists(tag_name): raise ValueError(f"Tag '{tag_name}' doesn't exist.") - self._create_or_replace_tag(snapshot, tag_name) + self._create_or_replace_tag(snapshot, tag_name, time_retained) def rename_tag(self, old_name: str, new_name: str) -> None: """ diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py index a721dec3d442..e7dc0419cd90 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py @@ -24,6 +24,7 @@ implementation is in use. """ +import json import os import shutil import tempfile @@ -81,12 +82,21 @@ def test_create_and_get_tag(self): self.assertIsInstance(response, GetTagResponse) self.assertEqual(response.tag_name, "t1") self.assertIsInstance(response.snapshot, Snapshot) - # FileSystemCatalog does not yet track tag_create_time / - # tag_time_retained — both must be None until the underlying Tag - # dataclass is extended. + # A tag created without a retention carries no create-time / TTL: the + # file is a plain Snapshot JSON, so both surface as None. self.assertIsNone(response.tag_create_time) self.assertIsNone(response.tag_time_retained) + def test_create_tag_without_ttl_writes_plain_snapshot(self): + # No-TTL tags must be written as plain Snapshot JSON (no tag-specific + # fields) so the file stays readable by older / Java readers. + self.catalog.create_tag(self.identifier, "t1") + table = self.catalog.get_table(self.identifier) + tag_path = table.tag_manager().tag_path("t1") + content = json.loads(table.file_io.read_file_utf8(tag_path)) + self.assertNotIn("tagCreateTime", content) + self.assertNotIn("tagTimeRetained", content) + def test_create_tag_with_snapshot_id(self): self.catalog.create_tag(self.identifier, "t1", snapshot_id=1) response = self.catalog.get_tag(self.identifier, "t1") @@ -109,11 +119,46 @@ def test_create_tag_already_exists_ignore(self): result = self.catalog.list_tags_paged(self.identifier) self.assertEqual(result.elements, ["t1"]) - def test_create_tag_with_time_retained_raises_not_implemented(self): - with self.assertRaises(NotImplementedError) as cm: - self.catalog.create_tag( - self.identifier, "t1", time_retained="1d") - self.assertIn("time_retained", str(cm.exception)) + def test_create_tag_with_time_retained(self): + self.catalog.create_tag(self.identifier, "t1", time_retained="1d") + response = self.catalog.get_tag(self.identifier, "t1") + # create_time surfaces as epoch millis, time_retained as an ISO-8601 + # duration string (matching Java's Duration.toString()). + self.assertIsNotNone(response.tag_create_time) + self.assertIsInstance(response.tag_create_time, int) + self.assertEqual(response.tag_time_retained, "PT24H") + + def test_create_tag_with_time_retained_writes_java_compatible_json(self): + self.catalog.create_tag(self.identifier, "t1", time_retained="1d") + table = self.catalog.get_table(self.identifier) + tag_path = table.tag_manager().tag_path("t1") + content = json.loads(table.file_io.read_file_utf8(tag_path)) + # On-disk shape must match Java: LocalDateTime array + Duration seconds. + self.assertIsInstance(content["tagCreateTime"], list) + self.assertEqual(len(content["tagCreateTime"]), 7) + self.assertEqual(content["tagTimeRetained"], 86400.0) + + def test_create_tag_with_sub_millisecond_time_retained_preserved(self): + # A sub-millisecond but microsecond-representable retention must be kept + # (500us -> 0.0005s / PT0.0005S), not rounded to a zero-TTL tag. + self.catalog.create_tag(self.identifier, "t1", time_retained="500micro") + table = self.catalog.get_table(self.identifier) + tag_path = table.tag_manager().tag_path("t1") + content = json.loads(table.file_io.read_file_utf8(tag_path)) + self.assertEqual(content["tagTimeRetained"], 0.0005) + response = self.catalog.get_tag(self.identifier, "t1") + self.assertEqual(response.tag_time_retained, "PT0.0005S") + + def test_create_tag_with_sub_microsecond_time_retained_rejected(self): + # A retention finer than a microsecond cannot be represented by Python's + # timedelta; it must raise instead of silently writing a zero-TTL tag, + # and no tag file should be left behind. + for retained in ("1ns", "999ns"): + with self.assertRaises(ValueError): + self.catalog.create_tag( + self.identifier, "t1", time_retained=retained) + with self.assertRaises(TagNotExistException): + self.catalog.get_tag(self.identifier, "t1") def test_create_tag_table_not_exists(self): with self.assertRaises(TableNotExistException): @@ -221,6 +266,16 @@ def test_replace_tag_with_latest_snapshot(self): tag = table.tag_manager().get("latest_test") self.assertEqual(tag.trim_to_snapshot().id, 2) + def test_replace_tag_with_time_retained(self): + table = self.catalog.get_table(self.identifier) + table.create_tag("ttl_replace", snapshot_id=1) + # Replacing with a retention upgrades the plain-snapshot tag into a + # TTL-carrying tag. + table.replace_tag("ttl_replace", snapshot_id=1, time_retained="12h") + response = self.catalog.get_tag(self.identifier, "ttl_replace") + self.assertIsNotNone(response.tag_create_time) + self.assertEqual(response.tag_time_retained, "PT12H") + def test_replace_tag_not_exists_raises(self): table = self.catalog.get_table(self.identifier) with self.assertRaises(ValueError) as cm: diff --git a/paimon-python/pypaimon/tests/system/tags_table_test.py b/paimon-python/pypaimon/tests/system/tags_table_test.py index bfc5af91099b..e801b656d8d7 100644 --- a/paimon-python/pypaimon/tests/system/tags_table_test.py +++ b/paimon-python/pypaimon/tests/system/tags_table_test.py @@ -120,13 +120,26 @@ def test_lists_created_tags_with_snapshot_metadata(self): tzinfo=datetime.timezone.utc).timestamp() * 1000) self.assertEqual(_TAG_COMMIT_MS, ms) - # Until pypaimon Tag carries these fields they are surfaced as - # None — same trade-off as FileSystemCatalog.get_tag. + # Tags created without a retention carry no create-time / TTL. for value in arrow_table.column("create_time").to_pylist(): self.assertIsNone(value) for value in arrow_table.column("time_retained").to_pylist(): self.assertIsNone(value) + def test_surfaces_create_time_and_time_retained_for_ttl_tag(self): + self.table.create_tag("ttl", snapshot_id=7, time_retained="1d") + + arrow_table = _read(self.catalog.get_table("db.t$tags")) + row = { + name: arrow_table.column(name).to_pylist()[0] + for name in arrow_table.column_names + } + self.assertEqual("ttl", row["tag_name"]) + # create_time is a real timestamp; time_retained is the ISO-8601 + # duration string (Java Duration.toString()). + self.assertIsInstance(row["create_time"], datetime.datetime) + self.assertEqual("PT24H", row["time_retained"]) + if __name__ == "__main__": unittest.main() diff --git a/paimon-python/pypaimon/tests/tag_ttl_serde_test.py b/paimon-python/pypaimon/tests/tag_ttl_serde_test.py new file mode 100644 index 000000000000..a87687626229 --- /dev/null +++ b/paimon-python/pypaimon/tests/tag_ttl_serde_test.py @@ -0,0 +1,202 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for Tag TTL serialization and the temporal codecs. + +These pin down the on-disk JSON shape against the Java side +(``org.apache.paimon.tag.Tag`` / ``TagTest``) so tag files stay +interoperable across the Java and Python SDKs. +""" + +import json +import unittest +from datetime import datetime, timedelta + +from pypaimon.common.json_util import JSON +from pypaimon.common.time_utils import ( + duration_to_iso8601, + duration_to_json_seconds, + json_array_to_local_datetime, + json_seconds_to_duration, + local_datetime_to_json_array, + local_datetime_to_millis, + parse_duration_nanos, +) +from pypaimon.snapshot.snapshot import Snapshot +from pypaimon.tag.tag import Tag + + +def _snapshot(): + return Snapshot( + version=3, + id=0, + schema_id=0, + base_manifest_list=None, + delta_manifest_list=None, + total_record_count=0, + delta_record_count=0, + commit_user=None, + commit_identifier=0, + commit_kind="APPEND", + time_millis=1000, + ) + + +class TemporalCodecTest(unittest.TestCase): + + def test_local_datetime_array_round_trip(self): + dt = datetime(2024, 6, 25, 10, 30, 45, 123456) + arr = local_datetime_to_json_array(dt) + # [year, month, day, hour, minute, second, nanoOfSecond] + self.assertEqual([2024, 6, 25, 10, 30, 45, 123456000], arr) + self.assertEqual(dt, json_array_to_local_datetime(arr)) + + def test_local_datetime_array_pads_missing_components(self): + # Jackson omits trailing zero components; missing ones default to 0. + self.assertEqual( + datetime(1969, 1, 1, 0, 0, 0, 0), + json_array_to_local_datetime([1969, 1, 1]), + ) + + def test_local_datetime_truncates_java_nanos_to_micros(self): + # Java nanoOfSecond (9 digits) is truncated to microseconds. + dt = json_array_to_local_datetime([1969, 1, 1, 0, 0, 0, 123456789]) + self.assertEqual(123456, dt.microsecond) + + def test_duration_seconds_round_trip(self): + td = timedelta(days=1) + self.assertEqual(86400.0, duration_to_json_seconds(td)) + self.assertEqual(td, json_seconds_to_duration(86400.0)) + # Java writes "5.000000000"; json parses it as 5.0. + self.assertEqual(timedelta(seconds=5), json_seconds_to_duration(5.0)) + + def test_duration_to_iso8601(self): + self.assertEqual("PT24H", duration_to_iso8601(timedelta(days=1))) + self.assertEqual("PT30M", duration_to_iso8601(timedelta(minutes=30))) + self.assertEqual("PT5S", duration_to_iso8601(timedelta(seconds=5))) + self.assertEqual("PT0S", duration_to_iso8601(timedelta(0))) + self.assertEqual( + "PT1H30M15S", + duration_to_iso8601(timedelta(hours=1, minutes=30, seconds=15)), + ) + self.assertEqual( + "PT0.5S", duration_to_iso8601(timedelta(milliseconds=500))) + # Fractional seconds combined with hour/minute components. + self.assertEqual( + "PT1H30M0.5S", + duration_to_iso8601( + timedelta(hours=1, minutes=30, milliseconds=500)), + ) + self.assertEqual( + "PT12H", duration_to_iso8601(timedelta(hours=12))) + + def test_local_datetime_to_millis_pre_epoch_floors(self): + # Pre-1970 with sub-millisecond is floored (matching Java + # Instant.toEpochMilli), not truncated toward zero. + dt = datetime(1969, 12, 31, 23, 59, 59, 500000) + self.assertEqual(-500, local_datetime_to_millis(dt)) + + def test_local_datetime_to_millis(self): + dt = datetime(1970, 1, 1, 0, 0, 1) + self.assertEqual(1000, local_datetime_to_millis(dt)) + + def test_parse_duration_nanos_keeps_full_precision(self): + # Unlike parse_duration (rounded milliseconds), the nanos variant keeps + # sub-millisecond units exactly instead of rounding them to zero. + self.assertEqual(1, parse_duration_nanos("1ns")) + self.assertEqual(1_000, parse_duration_nanos("1micro")) + self.assertEqual(500_000, parse_duration_nanos("500micro")) + self.assertEqual(1_000_000, parse_duration_nanos("1ms")) + self.assertEqual(1_000_000_000, parse_duration_nanos("1s")) + self.assertEqual(86_400_000_000_000, parse_duration_nanos("1d")) + # A bare number means milliseconds, matching parse_duration. + self.assertEqual(5_000_000, parse_duration_nanos("5")) + + def test_parse_duration_nanos_rejects_unknown_unit(self): + with self.assertRaises(ValueError): + parse_duration_nanos("1x") + + def test_sub_millisecond_retention_round_trips(self): + # 500 microseconds is representable and must survive serialization at + # microsecond precision (0.0005s / PT0.0005S), not collapse to zero. + td = timedelta(microseconds=parse_duration_nanos("500micro") // 1000) + self.assertEqual(timedelta(microseconds=500), td) + self.assertEqual(0.0005, duration_to_json_seconds(td)) + self.assertEqual(td, json_seconds_to_duration(0.0005)) + self.assertEqual("PT0.0005S", duration_to_iso8601(td)) + + +class TagSerdeTest(unittest.TestCase): + + def test_no_ttl_serializes_as_plain_snapshot(self): + # A Tag without TTL must not emit the tag-specific fields. + snap = _snapshot() + parsed = json.loads(JSON.to_json(snap)) + self.assertNotIn("tagCreateTime", parsed) + self.assertNotIn("tagTimeRetained", parsed) + + def test_ttl_tag_matches_java_on_disk_shape(self): + tag = Tag.from_snapshot_and_tag_ttl( + _snapshot(), + timedelta(seconds=5), + datetime(1969, 1, 1, 0, 0, 0, 123456), + ) + parsed = json.loads(JSON.to_json(tag)) + self.assertEqual([1969, 1, 1, 0, 0, 0, 123456000], + parsed["tagCreateTime"]) + self.assertEqual(5.0, parsed["tagTimeRetained"]) + + def test_round_trip(self): + tag = Tag.from_snapshot_and_tag_ttl( + _snapshot(), timedelta(days=1), datetime(2024, 6, 25, 10, 30, 45)) + restored = JSON.from_json(JSON.to_json(tag), Tag) + self.assertEqual(datetime(2024, 6, 25, 10, 30, 45), + restored.tag_create_time) + self.assertEqual(timedelta(days=1), restored.tag_time_retained) + + def test_reads_java_written_tag(self): + # A tag file produced by Java: array create-time + decimal seconds. + java_json = json.dumps({ + "version": 3, "id": 0, "schemaId": 0, + "baseManifestList": None, "deltaManifestList": None, + "totalRecordCount": 0, "deltaRecordCount": 0, + "commitUser": None, "commitIdentifier": 0, + "commitKind": "APPEND", "timeMillis": 1000, + "tagCreateTime": [1969, 1, 1, 0, 0, 0, 123456789], + "tagTimeRetained": 5.000000000, + }) + tag = JSON.from_json(java_json, Tag) + self.assertEqual( + datetime(1969, 1, 1, 0, 0, 0, 123456), tag.tag_create_time) + self.assertEqual(timedelta(seconds=5), tag.tag_time_retained) + + def test_reads_legacy_plain_snapshot_tag(self): + # Old tag files (plain Snapshot JSON) load with None TTL fields. + tag = JSON.from_json(JSON.to_json(_snapshot()), Tag) + self.assertIsNone(tag.tag_create_time) + self.assertIsNone(tag.tag_time_retained) + + def test_trim_to_snapshot_drops_tag_fields(self): + tag = Tag.from_snapshot_and_tag_ttl( + _snapshot(), timedelta(days=1), datetime(2024, 1, 1)) + snap = tag.trim_to_snapshot() + self.assertNotIsInstance(snap, Tag) + self.assertFalse(hasattr(snap, "tag_create_time")) + + +if __name__ == "__main__": + unittest.main()