From 8070eec7efabc10d2f9dea22f6996eef81b7f96f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 02:20:42 +0800 Subject: [PATCH 1/4] [python] Implement partial-update merge engine in pypaimon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``MergeEngine.PARTIAL_UPDATE`` is exposed in ``core_options.py`` and accepts ``merge-engine: partial-update`` as a table option, but the read path never reads that option — ``sort_merge_reader.py`` hardcodes ``DeduplicateMergeFunction()``. So a user who creates a PK table with ``merge-engine: partial-update`` and writes overlapping rows whose non-null columns differ gets silently deduplicated results instead of the expected per-field merge: their data is wrong, with no error or warning. The same is true for ``aggregation`` and ``first-row`` — both are silently degraded to dedupe today. This change ports the core ``PartialUpdateMergeFunction`` semantics from Java (paimon-core/.../mergetree/compact/PartialUpdateMergeFunction.java) and wires the Python read path to dispatch on ``merge-engine``: * New ``pypaimon/read/reader/partial_update_merge_function.py``: on each ``add(kv)`` copy non-null fields of ``kv.value`` into an accumulator; ``get_result()`` returns a fresh KeyValue with the merged row. Result is built into a brand-new tuple so the merge output is decoupled from upstream's reused KeyValue instances. * ``SortMergeReaderWithMinHeap.__init__`` gains an optional ``merge_function`` kwarg; default still ``DeduplicateMergeFunction()`` so any direct callers (none in-tree) are unchanged. * ``MergeFileSplitRead.section_reader_supplier`` selects the merge function based on ``self.table.options.merge_engine()``: DEDUPLICATE -> DeduplicateMergeFunction (unchanged) PARTIAL_UPDATE -> PartialUpdateMergeFunction AGGREGATE / FIRST_ROW -> NotImplementedError (was silent dedupe) Out of scope, intentionally: * Per-field aggregator overrides (``fields..aggregate-function``) * Sequence-group support (``fields..sequence-group``) * ``ignore-delete`` / ``partial-update.remove-record-on-*`` options * AGGREGATE / FIRST_ROW merge engine implementations DELETE / UPDATE_BEFORE rows raise ``NotImplementedError`` at ``add()`` time so we can't silently corrupt data with a half-implemented contract. Tests: * ``test_partial_update_merge_function.py`` — 11 unit cases covering single insert, two-way overlapping merges, three-way merges, later- null-does-not-clobber, reset between keys, get_result-before-any- add, UPDATE_AFTER acceptance, DELETE / UPDATE_BEFORE refusal, and result decoupling from input kv (proves we're not aliasing upstream's reused KeyValue). * ``test_partial_update_e2e.py`` — 8 cases: two-write merge, three- write merge, disjoint keys unaffected, later-non-null wins, later- null preserves earlier value, deduplicate engine unchanged (regression), and aggregation / first-row raise NotImplementedError. Verified by checking out ``origin/master``'s ``sort_merge_reader.py`` / ``split_read.py`` and rerunning ``test_partial_update_e2e.py``: master fails the 4 partial-update merge cases (silent dedupe) and the 2 aggregation / first-row "raises" cases (silent dedupe instead of raising); fix passes all 8. --- .../reader/partial_update_merge_function.py | 119 ++++++++++ .../pypaimon/read/reader/sort_merge_reader.py | 10 +- paimon-python/pypaimon/read/split_read.py | 35 ++- .../pypaimon/tests/test_partial_update_e2e.py | 219 ++++++++++++++++++ .../test_partial_update_merge_function.py | 177 ++++++++++++++ 5 files changed, 555 insertions(+), 5 deletions(-) create mode 100644 paimon-python/pypaimon/read/reader/partial_update_merge_function.py create mode 100644 paimon-python/pypaimon/tests/test_partial_update_e2e.py create mode 100644 paimon-python/pypaimon/tests/test_partial_update_merge_function.py diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py new file mode 100644 index 000000000000..12e29e33eb8d --- /dev/null +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -0,0 +1,119 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Python port of Java's ``PartialUpdateMergeFunction`` +(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ +PartialUpdateMergeFunction.java``). + +The merge function used by the ``partial-update`` merge engine on PK +tables: rows sharing a primary key are merged left-to-right, taking the +latest non-null value per non-PK field. ``DeduplicateMergeFunction`` +keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets +later writes "fill in" fields the earlier writes left null, so users +can write the same logical record across multiple commits with +different sets of non-null columns. + +This is the **core merge semantics only**. The Java implementation also +supports per-field aggregator overrides (``fields..aggregate- +function``), sequence groups (``fields..sequence-group``), +``ignore-delete``, and ``partial-update.remove-record-on-*`` options. +None of those are implemented yet; non-INSERT row kinds raise +``NotImplementedError`` at ``add`` time so we never silently corrupt +data with a half-implemented contract. +""" + +from typing import Any, List, Optional + +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +class PartialUpdateMergeFunction: + """A MergeFunction where the key is the primary key (unique) and the + value is merged across all rows for that key by taking the latest + non-null value per non-PK field. + + Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``: + ``reset`` (between groups of same-key rows), ``add`` (one row at a + time, oldest to newest), ``get_result`` (after the group is + exhausted). + """ + + def __init__(self, key_arity: int, value_arity: int): + self._key_arity = key_arity + self._value_arity = value_arity + # Lazily allocated on first add(); ``None`` means "no rows yet". + self._accumulator: Optional[List[Any]] = None + # Reference to the most recently added kv. We use it only to + # propagate the key + sequence_number into the result row, and we + # snapshot those two values into a fresh tuple in ``get_result()`` + # so the result is not aliased to upstream's reused KeyValue. + self._latest_kv: Optional[KeyValue] = None + + def reset(self) -> None: + self._accumulator = None + self._latest_kv = None + + def add(self, kv: KeyValue) -> None: + row_kind_byte = kv.value_row_kind_byte + if not RowKind.is_add_byte(row_kind_byte): + # DELETE / UPDATE_BEFORE need ignore-delete or + # partial-update.remove-record-on-delete to be set in Java; + # neither option is wired up in pypaimon yet, so refuse the + # row rather than silently swallow it. + raise NotImplementedError( + "PartialUpdateMergeFunction received a {} row; this " + "Python port does not yet implement the ignore-delete / " + "partial-update.remove-record-on-delete options. Use the " + "Java client for tables that produce DELETE / " + "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) + ) + + if self._accumulator is None: + self._accumulator = [ + kv.value.get_field(i) for i in range(self._value_arity) + ] + else: + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + self._latest_kv = kv + + def get_result(self) -> Optional[KeyValue]: + if self._accumulator is None or self._latest_kv is None: + return None + + kv = self._latest_kv + # Snapshot the key as a fresh tuple — we cannot keep a reference + # to ``kv`` because upstream readers (e.g. KeyValueWrapReader) + # reuse a single KeyValue instance and mutate its underlying + # row_tuple between calls. Building a fresh tuple here means the + # result we return is decoupled from any subsequent iteration. + key_values = tuple( + kv.key.get_field(i) for i in range(self._key_arity) + ) + result_row = key_values + ( + kv.sequence_number, + RowKind.INSERT.value, + ) + tuple(self._accumulator) + + result = KeyValue(self._key_arity, self._value_arity) + result.replace(result_row) + return result diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py b/paimon-python/pypaimon/read/reader/sort_merge_reader.py index aedd593b702b..2dcf152e601b 100644 --- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py +++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py @@ -30,9 +30,15 @@ class SortMergeReaderWithMinHeap(RecordReader): """SortMergeReader implemented with min-heap.""" - def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema): + def __init__(self, readers: List[RecordReader[KeyValue]], schema: TableSchema, + merge_function: Optional[Any] = None): self.next_batch_readers = list(readers) - self.merge_function = DeduplicateMergeFunction() + # Default to dedupe so callers that don't pass a merge_function + # keep their old behaviour. The merge engine dispatch lives in + # ``MergeFileSplitRead.section_reader_supplier`` for the read + # path; tests or other ad-hoc callers can pass a different + # implementation here. + self.merge_function = merge_function if merge_function is not None else DeduplicateMergeFunction() if schema.partition_keys: trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not in schema.partition_keys] diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 9a82b0000a85..0b3d2fe7fca2 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,7 +21,7 @@ from functools import partial from typing import Callable, List, Optional, Tuple -from pypaimon.common.options.core_options import CoreOptions +from pypaimon.common.options.core_options import CoreOptions, MergeEngine from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader from pypaimon.deletionvectors.deletion_vector import DeletionVector @@ -54,7 +54,10 @@ KeyValueUnwrapRecordReader from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.shard_batch_reader import ShardBatchReader -from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction, + SortMergeReaderWithMinHeap) from pypaimon.read.push_down_utils import _get_all_fields from pypaimon.read.split import Split from pypaimon.read.sliced_split import SlicedSplit @@ -482,7 +485,33 @@ def section_reader_supplier(self, section: List[SortedRun]) -> RecordReader: supplier = partial(self.kv_reader_supplier, file, self.deletion_file_readers.get(file.file_name, None)) data_readers.append(supplier) readers.append(ConcatRecordReader(data_readers)) - return SortMergeReaderWithMinHeap(readers, self.table.table_schema) + merge_function = self._build_merge_function() + return SortMergeReaderWithMinHeap( + readers, self.table.table_schema, merge_function=merge_function) + + def _build_merge_function(self): + """Pick the right MergeFunction implementation for the table's + ``merge-engine`` option. ``DEDUPLICATE`` is the default and the + only engine supported on the Python read path historically; + ``PARTIAL_UPDATE`` is now wired up to its dedicated + implementation. The remaining engines (``AGGREGATE`` / + ``FIRST_ROW``) used to silently degrade to dedupe — that quietly + produced wrong data — so we now raise an explicit + ``NotImplementedError`` instead, until they're ported. + """ + engine = self.table.options.merge_engine() + if engine == MergeEngine.DEDUPLICATE: + return DeduplicateMergeFunction() + if engine == MergeEngine.PARTIAL_UPDATE: + return PartialUpdateMergeFunction( + key_arity=len(self.trimmed_primary_key), + value_arity=self.value_arity, + ) + raise NotImplementedError( + "merge-engine '{}' is not implemented in pypaimon yet " + "(supported: deduplicate, partial-update). Use the Java " + "client or open an issue to track support.".format(engine.value) + ) def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py new file mode 100644 index 000000000000..e56536a044ef --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -0,0 +1,219 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""End-to-end tests for the ``partial-update`` merge engine. + +Each test creates a PK table with ``merge-engine`` set to a particular +value, writes one or more batches, and reads back. Partial-update reads +must merge non-null fields across batches; ``deduplicate`` must keep +the latest row only; ``aggregation`` and ``first-row`` must raise +``NotImplementedError`` (until they are ported), since silently +treating them as deduplicate would corrupt the user's data. +""" + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema + + +class PartialUpdateMergeEngineE2ETest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('a', pa.string()), + ('b', pa.string()), + ('c', pa.string()), + ]) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _create_pk_table(self, table_name, merge_engine='partial-update'): + # bucket=1 so all rows for any PK land in the same bucket; this is + # what forces the read path through SortMergeReader instead of the + # raw_convertible / single-file fast path. partial-update merging + # only happens inside SortMergeReader. + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options={ + 'bucket': '1', + 'merge-engine': merge_engine, + }, + ) + full = 'default.{}'.format(table_name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full) + + def _write(self, table, rows): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + def _read(self, table): + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + if not splits: + return [] + return sorted( + rb.new_read().to_arrow(splits).to_pylist(), + key=lambda r: r['id'], + ) + + # -- partial-update happy path --------------------------------------- + + def test_partial_update_two_writes_merges_non_null(self): + """Two writes against the same PK with disjoint non-null columns + must merge into a single row that has both columns populated. + """ + table = self._create_pk_table('two_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + def test_partial_update_three_writes_merges_left_to_right(self): + """Three overlapping writes — each filling in a different column — + compose into the union of non-null fields. + """ + table = self._create_pk_table('three_writes') + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + def test_partial_update_disjoint_keys_unaffected(self): + """Three rows with disjoint PKs must all appear unchanged in the + output — partial-update only merges rows that share a PK. + """ + table = self._create_pk_table('disjoint_keys') + self._write(table, [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ]) + + self.assertEqual( + self._read(table), + [ + {'id': 1, 'a': 'A1', 'b': None, 'c': None}, + {'id': 2, 'a': None, 'b': 'B2', 'c': None}, + {'id': 3, 'a': None, 'b': None, 'c': 'C3'}, + ], + ) + + def test_partial_update_later_value_wins_over_earlier_non_null(self): + """When two writes both supply a non-null value for the same + column, the later value wins (latest non-null per field). + """ + table = self._create_pk_table('later_wins') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}], + ) + + def test_partial_update_later_null_does_not_clobber_earlier_value(self): + """A later write with NULL for a column does NOT overwrite an + earlier non-null value for that column. + """ + table = self._create_pk_table('null_no_clobber') + self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}]) + self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + + # -- deduplicate (regression) ---------------------------------------- + + def test_deduplicate_engine_unchanged(self): + """The default ``deduplicate`` engine must keep the latest row + intact, including its NULLs — exactly the pre-PR behaviour. + """ + table = self._create_pk_table('dedupe', merge_engine='deduplicate') + self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}]) + self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'new', 'b': None, 'c': None}], + ) + + # -- engines we don't support yet ------------------------------------ + + def test_aggregation_engine_raises_not_implemented(self): + """Until ``aggregation`` is ported, reading an aggregation table + must raise rather than silently produce dedupe results.""" + table = self._create_pk_table('agg_unsupported', + merge_engine='aggregation') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('aggregation', str(cm.exception)) + + def test_first_row_engine_raises_not_implemented(self): + """Until ``first-row`` is ported, reading a first-row table must + raise rather than silently produce dedupe results.""" + table = self._create_pk_table('first_row_unsupported', + merge_engine='first-row') + self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + self.assertIn('first-row', str(cm.exception)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py new file mode 100644 index 000000000000..11e187766c48 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -0,0 +1,177 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Direct unit tests for ``PartialUpdateMergeFunction``. + +Drives the merge function with synthetic ``KeyValue`` instances so the +contract is pinned down without going through the full read pipeline. +The end-to-end behaviour on real PK tables is exercised separately in +``test_partial_update_e2e.py``. +""" + +import unittest + +from pypaimon.read.reader.partial_update_merge_function import \ + PartialUpdateMergeFunction +from pypaimon.table.row.key_value import KeyValue +from pypaimon.table.row.row_kind import RowKind + + +def _kv(key, seq, row_kind, value): + """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple. + + ``key`` and ``value`` are tuples of primitives — the helper handles + layout (key, seq, row_kind_byte, value) so individual tests can stay + focused on the merge semantics. + """ + kv = KeyValue(key_arity=len(key), value_arity=len(value)) + kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value)) + return kv + + +def _result_value(kv): + """Extract the value tuple out of a KeyValue produced by get_result().""" + return tuple(kv.value.get_field(i) for i in range(kv.value_arity)) + + +def _result_key(kv): + return tuple(kv.key.get_field(i) for i in range(kv.key_arity)) + + +class PartialUpdateMergeFunctionTest(unittest.TestCase): + + def test_single_insert_returns_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + result = mf.get_result() + + self.assertIsNotNone(result) + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + self.assertEqual(result.sequence_number, 100) + self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value) + + def test_second_insert_overwrites_non_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('b', None)) + # Sequence number tracks the latest add(). + self.assertEqual(result.sequence_number, 101) + + def test_second_insert_fills_in_null(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_third_insert_continues_merge(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None))) + mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'b', 'c')) + + def test_later_null_does_not_clobber_earlier_value(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 101, RowKind.INSERT, (None, None))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_reset_between_keys(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + first = mf.get_result() + self.assertEqual(_result_key(first), (1,)) + self.assertEqual(_result_value(first), ('a', 'x')) + + mf.reset() + mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y'))) + second = mf.get_result() + self.assertEqual(_result_key(second), (2,)) + self.assertEqual(_result_value(second), ('b', 'y')) + + def test_get_result_before_any_add_returns_none(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + self.assertIsNone(mf.get_result()) + + def test_update_after_is_treated_as_insert(self): + # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in + # non-sequence-group mode (both are "add" kinds). Mirror that. + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None))) + mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x'))) + + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'x')) + + def test_delete_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 101, RowKind.DELETE, (None, None))) + self.assertIn('DELETE', str(cm.exception)) + self.assertIn('ignore-delete', str(cm.exception)) + + def test_update_before_row_raises_not_implemented(self): + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + with self.assertRaises(NotImplementedError) as cm: + mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None))) + self.assertIn('UPDATE_BEFORE', str(cm.exception)) + + def test_result_is_decoupled_from_input_kv(self): + """The merge function must build a fresh result tuple — upstream + readers reuse a single KeyValue instance and call ``replace`` on + each iteration, so holding a reference to the input is unsafe. + """ + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x')) + mf.add(kv) + result = mf.get_result() + + # Mutate the input's underlying tuple to simulate a reused + # KeyValue being rebound to a different row. + kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil')) + + # The previously-returned result must NOT be affected. + self.assertEqual(_result_key(result), (1,)) + self.assertEqual(_result_value(result), ('a', 'x')) + + +if __name__ == '__main__': + unittest.main() From 70b9682231d4635a82ae37dde4d56bd9926fb9e7 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:23:54 +0800 Subject: [PATCH 2/4] [python] partial-update: refuse to run on tables that configure out-of-scope options Address review on r3168491328: previously `_build_merge_function()` dispatched on `merge-engine: partial-update` alone, so a table that ALSO configured sequence-group / per-field aggregator / ignore-delete / partial-update.remove-record-on-* would fall into the simple PartialUpdateMergeFunction and silently drop those semantics -- exactly the same silent-corruption pattern this PR exists to close, just reshaped from "silent dedupe" to "silent half-partial-update". Now the PARTIAL_UPDATE branch first scans the table options for any of the unsupported keys: * fields..sequence-group * fields..aggregate-function * fields.default-aggregate-function * ignore-delete (and the partial-update./first-row./deduplicate. prefixed aliases) when truthy * partial-update.remove-record-on-delete when truthy * partial-update.remove-record-on-sequence-group when truthy If any are set, raise NotImplementedError naming every offending key so the user can either drop them or escalate. Same shape as the existing AGGREGATE / FIRST_ROW raise. Tests: 7 new e2e cases in test_partial_update_e2e.py, one per option plus a regression case asserting `ignore-delete: false` (explicitly disabled) still passes through to the merge function. --- paimon-python/pypaimon/read/split_read.py | 67 +++++++++++++ .../pypaimon/tests/test_partial_update_e2e.py | 97 ++++++++++++++++++- 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 0b3d2fe7fca2..faf4fb3683a1 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -498,11 +498,32 @@ def _build_merge_function(self): ``FIRST_ROW``) used to silently degrade to dedupe — that quietly produced wrong data — so we now raise an explicit ``NotImplementedError`` instead, until they're ported. + + For ``PARTIAL_UPDATE``, we also refuse to run when the table + configures any option whose semantics this port does not yet + implement (sequence-group, per-field aggregator overrides, + ignore-delete and friends). Without this guard those options + would be silently ignored and produce subtly wrong results — + the same anti-pattern this PR exists to close. """ engine = self.table.options.merge_engine() if engine == MergeEngine.DEDUPLICATE: return DeduplicateMergeFunction() if engine == MergeEngine.PARTIAL_UPDATE: + unsupported = self._partial_update_unsupported_options() + if unsupported: + raise NotImplementedError( + "merge-engine 'partial-update' is enabled together " + "with options that pypaimon does not yet implement: " + "{}. The supported subset is per-key last-non-null " + "merge with no sequence-group, no per-field " + "aggregator override, no ignore-delete and no " + "partial-update.remove-record-on-* flags. Use the " + "Java client for the full feature set, or open an " + "issue to track Python support.".format( + ", ".join(sorted(unsupported)) + ) + ) return PartialUpdateMergeFunction( key_arity=len(self.trimmed_primary_key), value_arity=self.value_arity, @@ -513,6 +534,52 @@ def _build_merge_function(self): "client or open an issue to track support.".format(engine.value) ) + # Boolean-valued options that, when truthy, opt the table into + # behaviour the Python PartialUpdateMergeFunction does not implement. + # Mirrors org.apache.paimon.CoreOptions and the fallback keys in + # PartialUpdateMergeFunction.java. + _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = ( + "ignore-delete", + "partial-update.ignore-delete", + "first-row.ignore-delete", + "deduplicate.ignore-delete", + "partial-update.remove-record-on-delete", + "partial-update.remove-record-on-sequence-group", + ) + _FIELDS_PREFIX = "fields." + _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group" + _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function" + _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function" + + def _partial_update_unsupported_options(self): + """Return the set of option keys configured on this table that + ``PartialUpdateMergeFunction`` does not yet support. Empty set + means we can safely run the simple last-non-null merge. + """ + flagged = set() + raw = self.table.options.options.to_map() + for key, value in raw.items(): + if (key in self._PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS + and self._option_is_truthy(value)): + flagged.add(key) + elif key == self._DEFAULT_AGGREGATE_FUNCTION_KEY: + flagged.add(key) + elif key.startswith(self._FIELDS_PREFIX) and ( + key.endswith(self._FIELD_SEQUENCE_GROUP_SUFFIX) + or key.endswith(self._FIELD_AGGREGATE_FUNCTION_SUFFIX)): + flagged.add(key) + return flagged + + @staticmethod + def _option_is_truthy(raw): + if raw is None: + return False + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() in ("true", "1", "yes", "on") + return bool(raw) + def create_reader(self) -> RecordReader: # Create a dict mapping data file name to deletion file reader method self._genarate_deletion_file_readers() diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index e56536a044ef..a7a29970a89c 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -56,18 +56,22 @@ def setUpClass(cls): def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) - def _create_pk_table(self, table_name, merge_engine='partial-update'): + def _create_pk_table(self, table_name, merge_engine='partial-update', + extra_options=None): # bucket=1 so all rows for any PK land in the same bucket; this is # what forces the read path through SortMergeReader instead of the # raw_convertible / single-file fast path. partial-update merging # only happens inside SortMergeReader. + options = { + 'bucket': '1', + 'merge-engine': merge_engine, + } + if extra_options: + options.update(extra_options) schema = Schema.from_pyarrow_schema( self.pa_schema, primary_keys=['id'], - options={ - 'bucket': '1', - 'merge-engine': merge_engine, - }, + options=options, ) full = 'default.{}'.format(table_name) self.catalog.create_table(full, schema, False) @@ -214,6 +218,89 @@ def test_first_row_engine_raises_not_implemented(self): rb.new_read().to_arrow(splits) self.assertIn('first-row', str(cm.exception)) + # -- partial-update + out-of-scope option combinations --------------- + # + # When a user pairs ``merge-engine: partial-update`` with any option + # this port doesn't implement (sequence-group, per-field aggregator + # override, ignore-delete, partial-update.remove-record-on-*), we + # must raise rather than silently run the simple last-non-null merge + # — otherwise we'd reproduce the same silent-corruption pattern this + # PR exists to close. + + def _assert_partial_update_unsupported(self, table_name, extra_options, + expected_keys): + table = self._create_pk_table( + table_name, extra_options=extra_options) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + with self.assertRaises(NotImplementedError) as cm: + rb.new_read().to_arrow(splits) + msg = str(cm.exception) + self.assertIn("partial-update", msg) + for key in expected_keys: + self.assertIn(key, msg, + "expected option key '{}' in error: {}".format(key, msg)) + + def test_partial_update_with_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_seq_group', + {'fields.b.sequence-group': 'a'}, + ['fields.b.sequence-group'], + ) + + def test_partial_update_with_field_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_field_agg', + {'fields.a.aggregate-function': 'last_non_null_value'}, + ['fields.a.aggregate-function'], + ) + + def test_partial_update_with_default_aggregate_function_raises(self): + self._assert_partial_update_unsupported( + 'pu_default_agg', + {'fields.default-aggregate-function': 'last_non_null_value'}, + ['fields.default-aggregate-function'], + ) + + def test_partial_update_with_ignore_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_ignore_delete', + {'ignore-delete': 'true'}, + ['ignore-delete'], + ) + + def test_partial_update_with_remove_record_on_delete_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrod', + {'partial-update.remove-record-on-delete': 'true'}, + ['partial-update.remove-record-on-delete'], + ) + + def test_partial_update_with_remove_record_on_sequence_group_raises(self): + self._assert_partial_update_unsupported( + 'pu_rrosg', + {'partial-update.remove-record-on-sequence-group': 'true'}, + ['partial-update.remove-record-on-sequence-group'], + ) + + def test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self): + """Explicitly setting ignore-delete=false is equivalent to leaving + it unset and must not trip the guard.""" + table = self._create_pk_table( + 'pu_ignore_delete_false', + extra_options={'ignore-delete': 'false'}, + ) + self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}]) + self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + if __name__ == '__main__': unittest.main() From 1c4b1f0c297092d5914f2a51c10db0af9a6a07db Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:39:12 +0800 Subject: [PATCH 3/4] [python] partial-update: enforce NOT NULL inputs to mirror Java updateNonNullFields Java PartialUpdateMergeFunction.updateNonNullFields (line 177-188) raises IllegalArgumentException when an input field is null and the schema marks that field NOT NULL. The Python port previously absorbed such inputs silently, letting writes whose first value was null on a NOT NULL field land null in the accumulator. Changes: * PartialUpdateMergeFunction.__init__ takes an optional `nullables` list parallel to value indices. When given, every add() checks each null input against `nullables[i]` and raises ValueError on a NOT NULL field, matching Java semantics on every row (not just the first). When omitted, behaviour is unchanged (back-compat for direct callers). * MergeFileSplitRead snapshots the raw value-side schema as `value_fields` before _create_key_value_fields wraps it, then hands `[f.type.nullable for f in self.value_fields]` to the merge function. * Five new unit cases in test_partial_update_merge_function.py: first row null on NOT NULL raises, subsequent row null on NOT NULL raises, null on nullable field is absorbed, length-mismatch nullables raises, omitting nullables preserves the previous lenient behaviour. Result: with the existing guard in _build_merge_function (which refuses out-of-scope options) and the NOT NULL enforcement here, the simple last-non-null path is now feature-equivalent to Java's updateNonNullFields + getResult on the supported subset. --- .../reader/partial_update_merge_function.py | 33 +++++++++---- paimon-python/pypaimon/read/split_read.py | 5 ++ .../test_partial_update_merge_function.py | 49 +++++++++++++++++++ 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py index 12e29e33eb8d..978b48011c54 100644 --- a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py +++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py @@ -55,9 +55,20 @@ class PartialUpdateMergeFunction: exhausted). """ - def __init__(self, key_arity: int, value_arity: int): + def __init__(self, key_arity: int, value_arity: int, + nullables: Optional[List[bool]] = None): self._key_arity = key_arity self._value_arity = value_arity + # Per-value-field nullable flags, parallel to value indices. When + # ``None``, no nullability check runs (preserves the contract for + # direct callers that don't have schema info handy). When given, + # mirrors Java's ``updateNonNullFields`` check: a null input on a + # NOT NULL field raises rather than being silently absorbed. + if nullables is not None and len(nullables) != value_arity: + raise ValueError( + "nullables length {} does not match value_arity {}".format( + len(nullables), value_arity)) + self._nullables = nullables # Lazily allocated on first add(); ``None`` means "no rows yet". self._accumulator: Optional[List[Any]] = None # Reference to the most recently added kv. We use it only to @@ -85,15 +96,19 @@ def add(self, kv: KeyValue) -> None: "UPDATE_BEFORE rows.".format(RowKind(row_kind_byte).to_string()) ) + # Mirror Java's reset() + updateNonNullFields(): the accumulator + # starts as all-null (equivalent to ``new GenericRow(arity)``) and + # each add() writes non-null inputs; null inputs are absorbed — + # except when the schema marks the field NOT NULL, in which case + # we raise to match Java's IllegalArgumentException check. if self._accumulator is None: - self._accumulator = [ - kv.value.get_field(i) for i in range(self._value_arity) - ] - else: - for i in range(self._value_arity): - v = kv.value.get_field(i) - if v is not None: - self._accumulator[i] = v + self._accumulator = [None] * self._value_arity + for i in range(self._value_arity): + v = kv.value.get_field(i) + if v is not None: + self._accumulator[i] = v + elif self._nullables is not None and not self._nullables[i]: + raise ValueError("Field {} can not be null".format(i)) self._latest_kv = kv def get_result(self) -> Optional[KeyValue]: diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index faf4fb3683a1..ea2e8cf4b103 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -101,6 +101,10 @@ def __init__( self.split = split self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) + # Snapshot the raw value-side schema before _create_key_value_fields + # wraps it, so MergeFileSplitRead can hand per-value-field nullable + # flags to merge functions that mirror Java's NOT-NULL check. + self.value_fields = list(read_type) self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type @@ -527,6 +531,7 @@ def _build_merge_function(self): return PartialUpdateMergeFunction( key_arity=len(self.trimmed_primary_key), value_arity=self.value_arity, + nullables=[f.type.nullable for f in self.value_fields], ) raise NotImplementedError( "merge-engine '{}' is not implemented in pypaimon yet " diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py index 11e187766c48..60dfc7198dfb 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py +++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py @@ -172,6 +172,55 @@ def test_result_is_decoupled_from_input_kv(self): self.assertEqual(_result_key(result), (1,)) self.assertEqual(_result_value(result), ('a', 'x')) + # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ---- + + def test_first_insert_with_null_for_not_null_field_raises(self): + """If the very first row writes null to a NOT NULL field, raise — + same input-validation Java does in updateNonNullFields().""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_subsequent_insert_with_null_for_not_null_field_raises(self): + """A later null on a NOT NULL field must also raise — Java checks + on every add(), not just the first one.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, False]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + with self.assertRaises(ValueError) as cm: + mf.add(_kv((1,), 2, RowKind.INSERT, (None, None))) + self.assertIn("Field 1", str(cm.exception)) + + def test_null_for_nullable_field_is_absorbed(self): + """A null input on a nullable field is silently absorbed (existing + accumulator value wins) — the standard partial-update semantic.""" + mf = PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True, True]) + mf.reset() + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x'))) + mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y'))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', 'y')) + + def test_nullables_length_mismatch_raises(self): + with self.assertRaises(ValueError): + PartialUpdateMergeFunction( + key_arity=1, value_arity=2, nullables=[True]) + + def test_no_nullables_arg_skips_check(self): + """Backward-compat: callers that don't pass ``nullables`` get the + previous behaviour (no NOT-NULL validation).""" + mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2) + mf.reset() + # Would have raised had we declared the second field NOT NULL. + mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None))) + result = mf.get_result() + self.assertEqual(_result_value(result), ('a', None)) + if __name__ == '__main__': unittest.main() From 0d58859d04c84b823b65194aab10e1a193e41487 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 15:41:09 +0800 Subject: [PATCH 4/4] [python] partial-update: cover same-commit multi write_arrow as expectedFailure Reviewer asked to cover rows that land in the same data file -- multiple write_arrow() calls before a single prepare_commit(). Adding the cases revealed the writer-side / read-side gap upstream of this PR: KeyValueDataWriter._merge_data only does concat+sort (no merge function applied), so the flushed file holds duplicate primary keys; on read, _build_split_from_pack treats any single-file group as raw_convertible and routes through the fast path, skipping SortMergeReader and the merge-engine dispatch this PR adds. Fixing it requires either a merge buffer in KeyValueDataWriter (mirroring Java SortBufferWriteBuffer / MergeTreeWriter) or a tighter raw_convertible check that proves intra-file PK uniqueness -- both are write-path / scan-path restructuring outside this read-side merge-engine port. The two new cases are kept as unittest.expectedFailure so the gap stays visible and converts to passing regressions when the writer-side fix lands. --- .../pypaimon/tests/test_partial_update_e2e.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py b/paimon-python/pypaimon/tests/test_partial_update_e2e.py index a7a29970a89c..2f567918ec3a 100644 --- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py +++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py @@ -88,6 +88,24 @@ def _write(self, table, rows): w.close() c.close() + def _write_many(self, table, batches): + """Multiple ``write_arrow`` calls inside a single ``prepare_commit``. + + Mirrors the reviewer's question: rows that land in the same + underlying data file must still go through the merge-engine + dispatch; in-writer merging cannot silently degrade to dedupe. + """ + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + for rows in batches: + w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + def _read(self, table): rb = table.new_read_builder() splits = rb.new_scan().plan().splits() @@ -173,6 +191,76 @@ def test_partial_update_later_null_does_not_clobber_earlier_value(self): [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], ) + # -- single-commit, multiple write_arrow calls ----------------------- + # + # Reviewer concern (#7745): rows from multiple ``write_arrow`` calls + # inside a single ``prepare_commit`` may land in the same data file + # and bypass the merge-engine dispatch we added in + # ``MergeFileSplitRead._build_merge_function``. Verified: they do. + # + # Root cause is upstream of this PR. ``KeyValueDataWriter._merge_data`` + # simply ``concat + sort``s incoming batches without applying any + # merge function, so the flushed file holds multiple rows for the + # same primary key -- violating the Java LSM invariant "PK is unique + # within a file". On the read side, ``_build_split_from_pack`` then + # marks any single-file group as ``raw_convertible=True`` + # (split_generator.py:99-100), which routes the split through the + # raw-convertible fast path and skips ``SortMergeReader`` entirely. + # The merge-engine dispatch this PR adds only fires inside + # ``SortMergeReader``, so partial-update semantics are lost. + # + # Fixing this requires either (a) giving ``KeyValueDataWriter`` a + # merge buffer that applies the merge function during flush + # (mirrors Java ``SortBufferWriteBuffer`` / ``MergeTreeWriter``), + # or (b) tightening ``raw_convertible`` to require proof that the + # file contains no duplicate keys. Both are write-/scan-path + # restructuring, well outside the scope of this read-side + # merge-engine port. Tracked for a follow-up PR. + # + # The two cases below are kept as ``expectedFailure`` so the gap + # is visible and will turn into a passing regression once the + # writer-side fix lands. + + @unittest.expectedFailure + def test_partial_update_two_write_arrows_single_commit(self): + """Two ``write_arrow`` calls + one ``prepare_commit``: each + carries a disjoint non-null field; result must be the per-field + merge. + + Currently fails: see module-level note above. The flushed file + keeps both rows verbatim and the read split goes through the + raw-convertible fast path, so neither dedupe nor partial-update + merge runs. + """ + table = self._create_pk_table('two_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}], + ) + + @unittest.expectedFailure + def test_partial_update_three_write_arrows_single_commit(self): + """Three ``write_arrow`` calls in a single commit must compose + into the union of non-null fields. Same expected-failure + condition as the two-write case above. + """ + table = self._create_pk_table('three_writes_single_commit') + self._write_many(table, [ + [{'id': 1, 'a': 'A', 'b': None, 'c': None}], + [{'id': 1, 'a': None, 'b': 'B', 'c': None}], + [{'id': 1, 'a': None, 'b': None, 'c': 'C'}], + ]) + + self.assertEqual( + self._read(table), + [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}], + ) + # -- deduplicate (regression) ---------------------------------------- def test_deduplicate_engine_unchanged(self):