diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py new file mode 100644 index 000000000000..978b48011c54 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -0,0 +1,134 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Python port of Java's ``PartialUpdateMergeFunction`` +(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ +PartialUpdateMergeFunction.java``). + +The merge function used by the ``partial-update`` merge engine on PK +tables: rows sharing a primary key are merged left-to-right, taking the +latest non-null value per non-PK field. ``DeduplicateMergeFunction`` +keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets +later writes "fill in" fields the earlier writes left null, so users +can write the same logical record across multiple commits with +different sets of non-null columns. + +This is the **core merge semantics only**. The Java implementation also +supports per-field aggregator overrides (``fields..aggregate- +function``), sequence groups (``fields..sequence-group``), +``ignore-delete``, and ``partial-update.remove-record-on-*`` options. +None of those are implemented yet; non-INSERT row kinds raise +``NotImplementedError`` at ``add`` time so we never silently corrupt +data with a half-implemented contract. +""" + +from typing import Any, List, Optional + +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +class PartialUpdateMergeFunction: + """A MergeFunction where the key is the primary key (unique) and the + value is merged across all rows for that key by taking the latest + non-null value per non-PK field. + + Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``: + ``reset`` (between groups of same-key rows), ``add`` (one row at a + time, oldest to newest), ``get_result`` (after the group is + exhausted). + """ + + def __init__(self, key_arity: int, value_arity: int, + nullables: Optional[List[bool]] = None): + self._key_arity = key_arity + self._value_arity = value_arity + # Per-value-field nullable flags, parallel to value indices. When + # ``None``, no nullability check runs (preserves the contract for + # direct callers that don't have schema info handy). When given, + # mirrors Java's ``updateNonNullFields`` check: a null input on a + # NOT NULL field raises rather than being silently absorbed. + if nullables is not None and len(nullables) != value_arity: + raise ValueError( + "nullables length {} does not match value_arity {}".format( + len(nullables), value_arity)) + self._nullables = nullables + # Lazily allocated on first add(); ``None`` means "no rows yet". + self._accumulator: Optional[List[Any]] = None + # Reference to the most recently added kv. We use it only to + # propagate the key + sequence_number into the result row, and we + # snapshot those two values into a fresh tuple in ``get_result()`` + # so the result is not aliased to upstream's reused KeyValue. + self._latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self._accumulator = None + self._latest_kv = None + + def add(self, kv: KeyValue) -> None: + row_kind_byte = kv.value_row_kind_byte + if not RowKind.is_add_byte(row_kind_byte): + # DELETE / UPDATE_BEFORE need ignore-delete or + # partial-update.remove-record-on-delete to be set in Java; + # neither option is wired up in pypaimon yet, so refuse the + # row rather than silently swallow it. + raise NotImplementedError( + "PartialUpdateMergeFunction received a {} row; this " + "Python port does not yet implement the ignore-delete / " + "partial-update.remove-record-on-delete options. Use the " + "Java client for tables that produce DELETE / " + "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) + ) + + # Mirror Java's reset() + updateNonNullFields(): the accumulator + # starts as all-null (equivalent to ``new GenericRow(arity)``) and + # each add() writes non-null inputs; null inputs are absorbed — + # except when the schema marks the field NOT NULL, in which case + # we raise to match Java's IllegalArgumentException check. + if self._accumulator is None: + self._accumulator = [None] * self._value_arity + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + elif self._nullables is not None and not self._nullables[i]: + raise ValueError("Field {} can not be null".format(i)) + self._latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + if self._accumulator is None or self._latest_kv is None: + return None + + kv = self._latest_kv + # Snapshot the key as a fresh tuple — we cannot keep a reference + # to ``kv`` because upstream readers (e.g. KeyValueWrapReader) + # reuse a single KeyValue instance and mutate its underlying + # row_tuple between calls. Building a fresh tuple here means the + # result we return is decoupled from any subsequent iteration. + key_values = tuple( + kv.key.get_field(i) for i in range(self._key_arity) + ) + result_row = key_values + ( + kv.sequence_number, + RowKind.INSERT.value, + ) + tuple(self._accumulator) + + result = KeyValue(self._key_arity, self._value_arity) + result.replace(result_row) + return result diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py b/paimon-python/pypaimon/read/reader/sort_merge_reader.py index aedd593b702b..2dcf152e601b 100644 --- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py +++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py @@ -30,9 +30,15 @@ class SortMergeReaderWithMinHeap(RecordReader): """SortMergeReader implemented with min-heap.""" - def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema): + def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema, + merge_function: Optional[Any] = None): self.next_batch_readers = list(readers) - self.merge_function = DeduplicateMergeFunction() + # Default to dedupe so callers that don't pass a merge_function + # keep their old behaviour. The merge engine dispatch lives in + # ``MergeFileSplitRead.section_reader_supplier`` for the read + # path; tests or other ad-hoc callers can pass a different + # implementation here. + self.merge_function = merge_function if merge_function is not None else DeduplicateMergeFunction() if schema.partition_keys: trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not in schema.partition_keys] diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 9a82b0000a85..ea2e8cf4b103 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,7 +21,7 @@ from functools import partial from typing import Callable, List, Optional, Tuple -from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.options.core_options import CoreOptions, MergeEngine from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader from pypaimon.deletionvectors.deletion_vector import DeletionVector @@ -54,7 +54,10 @@ KeyValueUnwrapRecordReader from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.shard_batch_reader import ShardBatchReader -from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction, + SortMergeReaderWithMinHeap) from pypaimon.read.push_down_utils import _get_all_fields from pypaimon.read.split import Split from pypaimon.read.sliced_split import SlicedSplit @@ -98,6 +101,10 @@ def __init__( self.split = split self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) + # Snapshot the raw value-side schema before _create_key_value_fields + # wraps it, so MergeFileSplitRead can hand per-value-field nullable + # flags to merge functions that mirror Java's NOT-NULL check. + self.value_fields = list(read_type) self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type @@ -482,7 +489,101 @@ def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: supplier = partial(self.kv_reader_supplier, file, self.deletion_file_readers.get(file.file_name, None)) data_readers.append(supplier) readers.append(ConcatRecordReader(data_readers)) - return SortMergeReaderWithMinHeap(readers, self.table.table_schema) + merge_function = self._build_merge_function() + return SortMergeReaderWithMinHeap( + readers, self.table.table_schema, merge_function=merge_function) + + def _build_merge_function(self): + """Pick the right MergeFunction implementation for the table's + ``merge-engine`` option. ``DEDUPLICATE`` is the default and the + only engine supported on the Python read path historically; + ``PARTIAL_UPDATE`` is now wired up to its dedicated + implementation. The remaining engines (``AGGREGATE`` / + ``FIRST_ROW``) used to silently degrade to dedupe — that quietly + produced wrong data — so we now raise an explicit + ``NotImplementedError`` instead, until they're ported. + + For ``PARTIAL_UPDATE``, we also refuse to run when the table + configures any option whose semantics this port does not yet + implement (sequence-group, per-field aggregator overrides, + ignore-delete and friends). Without this guard those options + would be silently ignored and produce subtly wrong results — + the same anti-pattern this PR exists to close. + """ + engine = self.table.options.merge_engine() + if engine == MergeEngine.DEDUPLICATE: + return DeduplicateMergeFunction() + if engine == MergeEngine.PARTIAL_UPDATE: + unsupported = self._partial_update_unsupported_options() + if unsupported: + raise NotImplementedError( + "merge-engine 'partial-update' is enabled together " + "with options that pypaimon does not yet implement: " + "{}. The supported subset is per-key last-non-null " + "merge with no sequence-group, no per-field " + "aggregator override, no ignore-delete and no " + "partial-update.remove-record-on-* flags. Use the " + "Java client for the full feature set, or open an " + "issue to track Python support.".format( + ", ".join(sorted(unsupported)) + ) + ) + return PartialUpdateMergeFunction( + key_arity=len(self.trimmed_primary_key), + value_arity=self.value_arity, + nullables=[f.type.nullable for f in self.value_fields], + ) + raise NotImplementedError( + "merge-engine '{}' is not implemented in pypaimon yet " + "(supported: deduplicate, partial-update). Use the Java " + "client or open an issue to track support.".format(engine.value) + ) + + # Boolean-valued options that, when truthy, opt the table into + # behaviour the Python PartialUpdateMergeFunction does not implement. + # Mirrors org.apache.paimon.CoreOptions and the fallback keys in + # PartialUpdateMergeFunction.java. + _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( + "ignore-delete", + "partial-update.ignore-delete", + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.remove-record-on-delete", + "partial-update.remove-record-on-sequence-group", + ) + _FIELDS_PREFIX = "fields." + _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" + _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" + _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" + + def _partial_update_unsupported_options(self): + """Return the set of option keys configured on this table that + ``PartialUpdateMergeFunction`` does not yet support. Empty set + means we can safely run the simple last-non-null merge. + """ + flagged = set() + raw = self.table.options.options.to_map() + for key, value in raw.items(): + if (key in self._PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS + and self._option_is_truthy(value)): + flagged.add(key) + elif key == self._DEFAULT_AGGREGATE_FUNCTION_KEY: + flagged.add(key) + elif key.startswith(self._FIELDS_PREFIX) and ( + key.endswith(self._FIELD_SEQUENCE_GROUP_SUFFIX) + or key.endswith(self._FIELD_AGGREGATE_FUNCTION_SUFFIX)): + flagged.add(key) + return flagged + + @staticmethod + def _option_is_truthy(raw): + if raw is None: + return False + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() in ("true", "1", "yes", "on") + return bool(raw) def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py new file mode 100644 index 000000000000..2f567918ec3a --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -0,0 +1,394 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""End-to-end tests for the ``partial-update`` merge engine. + +Each test creates a PK table with ``merge-engine`` set to a particular +value, writes one or more batches, and reads back. Partial-update reads +must merge non-null fields across batches; ``deduplicate`` must keep +the latest row only; ``aggregation`` and ``first-row`` must raise +``NotImplementedError`` (until they are ported), since silently +treating them as deduplicate would corrupt the user's data. +""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class PartialUpdateMergeEngineE2ETest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('a', pa.string()), + ('b', pa.string()), + ('c', pa.string()), + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_pk_table(self, table_name, merge_engine='partial-update', + extra_options=None): + # bucket=1 so all rows for any PK land in the same bucket; this is + # what forces the read path through SortMergeReader instead of the + # raw_convertible / single-file fast path. partial-update merging + # only happens inside SortMergeReader. + options = { + 'bucket': '1', + 'merge-engine': merge_engine, + } + if extra_options: + options.update(extra_options) + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options=options, + ) + full = 'default.{}'.format(table_name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full) + + def _write(self, table, rows): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _write_many(self, table, batches): + """Multiple ``write_arrow`` calls inside a single ``prepare_commit``. + + Mirrors the reviewer's question: rows that land in the same + underlying data file must still go through the merge-engine + dispatch; in-writer merging cannot silently degrade to dedupe. + """ + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + for rows in batches: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _read(self, table): + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + if not splits: + return [] + return sorted( + rb.new_read().to_arrow(splits).to_pylist(), + key=lambda r: r['id'], + ) + + # -- partial-update happy path --------------------------------------- + + def test_partial_update_two_writes_merges_non_null(self): + """Two writes against the same PK with disjoint non-null columns + must merge into a single row that has both columns populated. + """ + table = self._create_pk_table('two_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + def test_partial_update_three_writes_merges_left_to_right(self): + """Three overlapping writes — each filling in a different column — + compose into the union of non-null fields. + """ + table = self._create_pk_table('three_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + def test_partial_update_disjoint_keys_unaffected(self): + """Three rows with disjoint PKs must all appear unchanged in the + output — partial-update only merges rows that share a PK. + """ + table = self._create_pk_table('disjoint_keys') + self._write(table, [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ]) + + self.assertEqual( + self._read(table), + [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ], + ) + + def test_partial_update_later_value_wins_over_earlier_non_null(self): + """When two writes both supply a non-null value for the same + column, the later value wins (latest non-null per field). + """ + table = self._create_pk_table('later_wins') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}], + ) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + """A later write with NULL for a column does NOT overwrite an + earlier non-null value for that column. + """ + table = self._create_pk_table('null_no_clobber') + self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- single-commit, multiple write_arrow calls ----------------------- + # + # Reviewer concern (#7745): rows from multiple ``write_arrow`` calls + # inside a single ``prepare_commit`` may land in the same data file + # and bypass the merge-engine dispatch we added in + # ``MergeFileSplitRead._build_merge_function``. Verified: they do. + # + # Root cause is upstream of this PR. ``KeyValueDataWriter._merge_data`` + # simply ``concat + sort``s incoming batches without applying any + # merge function, so the flushed file holds multiple rows for the + # same primary key -- violating the Java LSM invariant "PK is unique + # within a file". On the read side, ``_build_split_from_pack`` then + # marks any single-file group as ``raw_convertible=True`` + # (split_generator.py:99-100), which routes the split through the + # raw-convertible fast path and skips ``SortMergeReader`` entirely. + # The merge-engine dispatch this PR adds only fires inside + # ``SortMergeReader``, so partial-update semantics are lost. + # + # Fixing this requires either (a) giving ``KeyValueDataWriter`` a + # merge buffer that applies the merge function during flush + # (mirrors Java ``SortBufferWriteBuffer`` / ``MergeTreeWriter``), + # or (b) tightening ``raw_convertible`` to require proof that the + # file contains no duplicate keys. Both are write-/scan-path + # restructuring, well outside the scope of this read-side + # merge-engine port. Tracked for a follow-up PR. + # + # The two cases below are kept as ``expectedFailure`` so the gap + # is visible and will turn into a passing regression once the + # writer-side fix lands. + + @unittest.expectedFailure + def test_partial_update_two_write_arrows_single_commit(self): + """Two ``write_arrow`` calls + one ``prepare_commit``: each + carries a disjoint non-null field; result must be the per-field + merge. + + Currently fails: see module-level note above. The flushed file + keeps both rows verbatim and the read split goes through the + raw-convertible fast path, so neither dedupe nor partial-update + merge runs. + """ + table = self._create_pk_table('two_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + @unittest.expectedFailure + def test_partial_update_three_write_arrows_single_commit(self): + """Three ``write_arrow`` calls in a single commit must compose + into the union of non-null fields. Same expected-failure + condition as the two-write case above. + """ + table = self._create_pk_table('three_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + [{'id': 1, 'a': None, 'b': None, 'c': 'C'}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- deduplicate (regression) ---------------------------------------- + + def test_deduplicate_engine_unchanged(self): + """The default ``deduplicate`` engine must keep the latest row + intact, including its NULLs — exactly the pre-PR behaviour. + """ + table = self._create_pk_table('dedupe', merge_engine='deduplicate') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': None, 'c': None}], + ) + + # -- engines we don't support yet ------------------------------------ + + def test_aggregation_engine_raises_not_implemented(self): + """Until ``aggregation`` is ported, reading an aggregation table + must raise rather than silently produce dedupe results.""" + table = self._create_pk_table('agg_unsupported', + merge_engine='aggregation') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('aggregation', str(cm.exception)) + + def test_first_row_engine_raises_not_implemented(self): + """Until ``first-row`` is ported, reading a first-row table must + raise rather than silently produce dedupe results.""" + table = self._create_pk_table('first_row_unsupported', + merge_engine='first-row') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('first-row', str(cm.exception)) + + # -- partial-update + out-of-scope option combinations --------------- + # + # When a user pairs ``merge-engine: partial-update`` with any option + # this port doesn't implement (sequence-group, per-field aggregator + # override, ignore-delete, partial-update.remove-record-on-*), we + # must raise rather than silently run the simple last-non-null merge + # — otherwise we'd reproduce the same silent-corruption pattern this + # PR exists to close. + + def _assert_partial_update_unsupported(self, table_name, extra_options, + expected_keys): + table = self._create_pk_table( + table_name, extra_options=extra_options) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + msg = str(cm.exception) + self.assertIn("partial-update", msg) + for key in expected_keys: + self.assertIn(key, msg, + "expected option key '{}' in error: {}".format(key, msg)) + + def test_partial_update_with_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_seq_group', + {'fields.b.sequence-group': 'a'}, + ['fields.b.sequence-group'], + ) + + def test_partial_update_with_field_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_field_agg', + {'fields.a.aggregate-function': 'last_non_null_value'}, + ['fields.a.aggregate-function'], + ) + + def test_partial_update_with_default_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_default_agg', + {'fields.default-aggregate-function': 'last_non_null_value'}, + ['fields.default-aggregate-function'], + ) + + def test_partial_update_with_ignore_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_ignore_delete', + {'ignore-delete': 'true'}, + ['ignore-delete'], + ) + + def test_partial_update_with_remove_record_on_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrod', + {'partial-update.remove-record-on-delete': 'true'}, + ['partial-update.remove-record-on-delete'], + ) + + def test_partial_update_with_remove_record_on_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrosg', + {'partial-update.remove-record-on-sequence-group': 'true'}, + ['partial-update.remove-record-on-sequence-group'], + ) + + def test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self): + """Explicitly setting ignore-delete=false is equivalent to leaving + it unset and must not trip the guard.""" + table = self._create_pk_table( + 'pu_ignore_delete_false', + extra_options={'ignore-delete': 'false'}, + ) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py new file mode 100644 index 000000000000..60dfc7198dfb --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -0,0 +1,226 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Direct unit tests for ``PartialUpdateMergeFunction``. + +Drives the merge function with synthetic ``KeyValue`` instances so the +contract is pinned down without going through the full read pipeline. +The end-to-end behaviour on real PK tables is exercised separately in +``test_partial_update_e2e.py``. +""" + +import unittest + +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +def _kv(key, seq, row_kind, value): + """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple. + + ``key`` and ``value`` are tuples of primitives — the helper handles + layout (key, seq, row_kind_byte, value) so individual tests can stay + focused on the merge semantics. + """ + kv = KeyValue(key_arity=len(key), value_arity=len(value)) + kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value)) + return kv + + +def _result_value(kv): + """Extract the value tuple out of a KeyValue produced by get_result().""" + return tuple(kv.value.get_field(i) for i in range(kv.value_arity)) + + +def _result_key(kv): + return tuple(kv.key.get_field(i) for i in range(kv.key_arity)) + + +class PartialUpdateMergeFunctionTest(unittest.TestCase): + + def test_single_insert_returns_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + result = mf.get_result() + + self.assertIsNotNone(result) + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + self.assertEqual(result.sequence_number, 100) + self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value) + + def test_second_insert_overwrites_non_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('b', None)) + # Sequence number tracks the latest add(). + self.assertEqual(result.sequence_number, 101) + + def test_second_insert_fills_in_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_third_insert_continues_merge(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None))) + mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'b', 'c')) + + def test_later_null_does_not_clobber_earlier_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_reset_between_keys(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + first = mf.get_result() + self.assertEqual(_result_key(first), (1,)) + self.assertEqual(_result_value(first), ('a', 'x')) + + mf.reset() + mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y'))) + second = mf.get_result() + self.assertEqual(_result_key(second), (2,)) + self.assertEqual(_result_value(second), ('b', 'y')) + + def test_get_result_before_any_add_returns_none(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + self.assertIsNone(mf.get_result()) + + def test_update_after_is_treated_as_insert(self): + # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in + # non-sequence-group mode (both are "add" kinds). Mirror that. + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_delete_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 101, RowKind.DELETE, (None, None))) + self.assertIn('DELETE', str(cm.exception)) + self.assertIn('ignore-delete', str(cm.exception)) + + def test_update_before_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None))) + self.assertIn('UPDATE_BEFORE', str(cm.exception)) + + def test_result_is_decoupled_from_input_kv(self): + """The merge function must build a fresh result tuple — upstream + readers reuse a single KeyValue instance and call ``replace`` on + each iteration, so holding a reference to the input is unsafe. + """ + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x')) + mf.add(kv) + result = mf.get_result() + + # Mutate the input's underlying tuple to simulate a reused + # KeyValue being rebound to a different row. + kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil')) + + # The previously-returned result must NOT be affected. + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + + # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ---- + + def test_first_insert_with_null_for_not_null_field_raises(self): + """If the very first row writes null to a NOT NULL field, raise — + same input-validation Java does in updateNonNullFields().""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_subsequent_insert_with_null_for_not_null_field_raises(self): + """A later null on a NOT NULL field must also raise — Java checks + on every add(), not just the first one.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 2, RowKind.INSERT, (None, None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_null_for_nullable_field_is_absorbed(self): + """A null input on a nullable field is silently absorbed (existing + accumulator value wins) — the standard partial-update semantic.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, True]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y'))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'y')) + + def test_nullables_length_mismatch_raises(self): + with self.assertRaises(ValueError): + PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True]) + + def test_no_nullables_arg_skips_check(self): + """Backward-compat: callers that don't pass ``nullables`` get the + previous behaviour (no NOT-NULL validation).""" + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + # Would have raised had we declared the second field NOT NULL. + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', None)) + + +if __name__ == '__main__': + unittest.main()